第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

Pyspark:多個數(shù)組的交集

Pyspark:多個數(shù)組的交集

侃侃爾雅 2022-10-06 19:49:36
我有以下測試數(shù)據(jù),必須借助pyspark檢查以下語句(數(shù)據(jù)實際上非常大:700000筆交易,每筆交易有10+個產(chǎn)品):import pandas as pdimport datetimedata = {'date': ['2014-01-01', '2014-01-02', '2014-01-03', '2014-01-04', '2014-01-05', '2014-01-06'],     'customerid': [1, 2, 2, 3, 4, 3], 'productids': ['A;B', 'D;E', 'H;X', 'P;Q;G', 'S;T;U', 'C;G']}data = pd.DataFrame(data)data['date'] = pd.to_datetime(data['date'])“某個客戶 ID 在 x 天內(nèi)存在的交易的特征是購物車中至少有一件相同的產(chǎn)品?!钡侥壳盀橹?,我有以下方法(例如 x = 2):spark = SparkSession.builder \    .master('local[*]') \    .config("spark.driver.memory", "500g") \    .appName('my-pandasToSparkDF-app') \    .getOrCreate()spark.conf.set("spark.sql.execution.arrow.enabled", "true")spark.sparkContext.setLogLevel("OFF")df=spark.createDataFrame(data)x = 2win = Window().partitionBy('customerid').orderBy(F.col("date").cast("long")).rangeBetween(-(86400*x), Window.currentRow)test = df.withColumn("productids", F.array_distinct(F.split("productids", "\;")))\    .withColumn("flat_col", F.array_distinct(F.flatten((F.collect_list("productids").over(win))))).orderBy(F.col("date"))test = test.toPandas()因此,從我們查看過去 2 天的每筆交易中,按 customerid 分組,相應(yīng)的產(chǎn)品匯總在“flat_col”列中。但我真正需要的是相同ID的購物籃的交集。只有這樣我才能判斷是否有常見的產(chǎn)品。因此,“flat_col”的第五行中應(yīng)該有 ['G'],而不是 ['P', 'Q', 'G', 'C']。同樣,[] 應(yīng)該出現(xiàn)在“flat_col”的所有其他行中。太感謝了!
查看完整描述

2 回答

?
互換的青春

TA貢獻1797條經(jīng)驗 獲得超6個贊

您可以在不使用in 的情況下實現(xiàn)這一點self-join(因為連接shuffle在大數(shù)據(jù)中是昂貴的操作)。使用的功能。higher order functionsspark 2.4filter,transform,aggregate


df=spark.createDataFrame(data)


x = 2


win = Window().partitionBy('customerid').orderBy(F.col("date").cast("long")).rangeBetween(-(86400*x), Window.currentRow)

test = df.withColumn("productids", F.array_distinct(F.split("productids", "\;")))\

    .withColumn("flat_col", F.flatten(F.collect_list("productids").over(win)))\

    .withColumn("occurances", F.expr("""filter(transform(productids, x->\

     IF(aggregate(flat_col, 0,(acc,t)->acc+IF(t=x,1,0))>1,x,null)),y->y!='null')"""))\

    .drop("flat_col").orderBy("date").show()


+-------------------+----------+----------+----------+

|               date|customerid|productids|occurances|

+-------------------+----------+----------+----------+

|2014-01-01 00:00:00|         1|    [A, B]|        []|

|2014-01-02 00:00:00|         2|    [D, E]|        []|

|2014-01-03 00:00:00|         2|    [H, X]|        []|

|2014-01-04 00:00:00|         3| [P, Q, G]|        []|

|2014-01-05 00:00:00|         4| [S, T, U]|        []|

|2014-01-06 00:00:00|         3|    [C, G]|       [G]|

+-------------------+----------+----------+----------+


查看完整回答
反對 回復(fù) 2022-10-06
?
呼喚遠方

TA貢獻1856條經(jīng)驗 獲得超11個贊

自加入是有史以來最好的把戲


from pyspark.sql.functions import concat_ws, collect_list

spark.createDataFrame(data).registerTempTable("df")

sql("SELECT date, customerid, explode(split(productids, ';')) productid FROM df").registerTempTable("altered")

df = sql("SELECT al.date, al.customerid, al.productid productids, altr.productid flat_col FROM altered al left join altered altr on altr.customerid = al.customerid and al.productid = altr.productid and al.date != altr.date and datediff(al.date,altr.date) <=2 and datediff(al.date,altr.date) >=-2")

df.groupBy("date", "customerid").agg(concat_ws(",", collect_list("productids")).alias('productids'), concat_ws(",", collect_list("flat_col")).alias('flat_col')).show()

火花輸出

查看完整回答
反對 回復(fù) 2022-10-06
  • 2 回答
  • 0 關(guān)注
  • 139 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號