2 回答

TA貢獻(xiàn)1772條經(jīng)驗(yàn) 獲得超5個(gè)贊
IIUC,您可以使用窗口函數(shù)查找max(IF(TP=1, Date, NULL))每個(gè)id,然后按此閾值進(jìn)行過濾:
from pyspark.sql import Window, functions as F
w1 = Window.partitionBy('id')
df_new = df.withColumn('Date', F.to_timestamp('Date', 'yyyy-MM-dd HH:mm')) \
.withColumn('threshhold_date', F.expr("max(IF(TP=1, Date, NULL))").over(w1)) \
.filter('Date <= threshhold_date + interval 2 days')
df_new.show()
+---+----+-------------------+-------------------+
| id| TP| Date| threshhold_date|
+---+----+-------------------+-------------------+
| A9|Null|2010-05-05 12:00:00|2010-05-06 13:00:00|
| A9| 1|2010-05-05 13:00:00|2010-05-06 13:00:00|
| A9| 1|2010-05-06 13:00:00|2010-05-06 13:00:00|
| A1|Null|2010-01-01 12:00:00|2010-01-02 01:00:00|
| A1|Null|2010-01-01 13:00:00|2010-01-02 01:00:00|
| A1| 1|2010-01-02 01:00:00|2010-01-02 01:00:00|
| A1|Null|2010-01-02 02:00:00|2010-01-02 01:00:00|
+---+----+-------------------+-------------------+

TA貢獻(xiàn)2037條經(jīng)驗(yàn) 獲得超6個(gè)贊
您可以簡(jiǎn)單地過濾數(shù)據(jù)幀TP = 1, 并使用collect()[0]來獲取列的最大值Date作為變量。
使用以下命令向該變量添加 48 小時(shí)timedelta并過濾df:
from pyspark.sql.functions import *
from datetime import timedelta
date_var = df.filter(col("TP")==1).orderBy("date", ascending=False)\
.collect()[0]["date"] + timedelta(hours=48)
df.filter(col("Date")<=date_var).show()
添加回答
舉報(bào)