mardi 8 septembre 2020

PySpark - Conditional Create Column with GroupBy

I need to create an indicator for each ID based a date condition using a second pyspark dataframe.

The indicator is 1 or 0, which tells me a sensor failed. The indicator is conditional on a second dataframe that has the first failure date and the last failure date. If the failure was recorded in fail_df, main_df rows should have a 1 on and between the first and last recorded failure from fail_df. When the sensor didn't fail within the main_df dates, it should store a value of 0.

main_df DataFrame

ID         |  Date      |Value 
-------------------------------------------------
P1         | 2016-10-01 |100
P1         | 2016-10-02 |200
P1         | 2016-12-16 |700
P1         | 2016-12-17 |800
P1         | 2016-12-18 |800
P2         | 2016-01-31 |700
P2         | 2016-02-01 |800
P2         | 2016-02-02 |900

Failure List DataFrame

ID         |  First Fail Date      | Last Fail Date
-----------------------------------------------------
P1         | 2016-10-01            |2016-10-02  |
P2         | 2016-01-31            |2016-02-01  |

Desired DataFrame

ID         |  Date      |Value  | Failure_Indicator     
-------------------------------------------------
P1         | 2016-10-01 |100    | 1
P1         | 2016-10-02 |200    | 1
P1         | 2016-12-16 |700    | 0
P1         | 2016-12-17 |800    | 0
P1         | 2016-12-18 |800    | 0
P2         | 2016-01-31 |700    | 1
P2         | 2016-02-01 |800    | 1
P2         | 2016-02-02 |900    | 0

What I have tried: AttributeError: 'GroupedData' object has no attribute 'withColumn'

df = main_df.groupBy('ID').withColumn(
    'Failure_Indicator',
    F.when((fail_df.col("First fail Date") >= main_df.Date) & 
           (fail_df.col("Last fail Date") >= main_df.Date), 1)
     .otherwise(0))

Aucun commentaire:

Enregistrer un commentaire