I need to create an indicator for each ID based a date condition using a second pyspark dataframe.
The indicator is 1 or 0, which tells me a sensor failed. The indicator is conditional on a second dataframe that has the first failure date and the last failure date. If the failure was recorded in fail_df, main_df rows should have a 1 on and between the first and last recorded failure from fail_df. When the sensor didn't fail within the main_df dates, it should store a value of 0.
main_df DataFrame
ID | Date |Value
-------------------------------------------------
P1 | 2016-10-01 |100
P1 | 2016-10-02 |200
P1 | 2016-12-16 |700
P1 | 2016-12-17 |800
P1 | 2016-12-18 |800
P2 | 2016-01-31 |700
P2 | 2016-02-01 |800
P2 | 2016-02-02 |900
Failure List DataFrame
ID | First Fail Date | Last Fail Date
-----------------------------------------------------
P1 | 2016-10-01 |2016-10-02 |
P2 | 2016-01-31 |2016-02-01 |
Desired DataFrame
ID | Date |Value | Failure_Indicator
-------------------------------------------------
P1 | 2016-10-01 |100 | 1
P1 | 2016-10-02 |200 | 1
P1 | 2016-12-16 |700 | 0
P1 | 2016-12-17 |800 | 0
P1 | 2016-12-18 |800 | 0
P2 | 2016-01-31 |700 | 1
P2 | 2016-02-01 |800 | 1
P2 | 2016-02-02 |900 | 0
What I have tried: AttributeError: 'GroupedData' object has no attribute 'withColumn'
df = main_df.groupBy('ID').withColumn(
'Failure_Indicator',
F.when((fail_df.col("First fail Date") >= main_df.Date) &
(fail_df.col("Last fail Date") >= main_df.Date), 1)
.otherwise(0))
Aucun commentaire:
Enregistrer un commentaire