1 回答

TA貢獻(xiàn)2011條經(jīng)驗(yàn) 獲得超2個(gè)贊
根據(jù)您希望如何聚合分組數(shù)據(jù) - 您可以執(zhí)行例如
先決條件(如果您尚未設(shè)置它們):
from pyspark.sql import functions as F
from pyspark.sql.functions import *
為:sum
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.sum(F.col("Difference")).alias("Difference"))
為:max
difference = streamingDataF.withColumn("Difference", expr("Column_A - Column_B")).drop("Column_A").drop("Column_B").groupBy("Column_C").agg(F.max(F.col("Difference")).alias("Difference"))
然后:
differenceStream = difference.writeStream\
.queryName("diff_aggr")\
.format("memory").outputMode("append")\
.start()
關(guān)鍵是 - 如果你這樣做,你也需要通過(guò)聚合來(lái)減少。如果你想把你的值排序在一起,試試groupBydf.sort(...)
添加回答
舉報(bào)