第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問(wèn)題,去搜搜看,總會(huì)有你想問(wèn)的

優(yōu)化 pyspark 中的行訪(fǎng)問(wèn)和轉(zhuǎn)換

優(yōu)化 pyspark 中的行訪(fǎng)問(wèn)和轉(zhuǎn)換

慕尼黑5688855 2022-12-14 17:36:37
我在 S3 存儲(chǔ)桶中有一個(gè)以 jason 形式存在的大型數(shù)據(jù)集 (5GB)。我需要轉(zhuǎn)換數(shù)據(jù)的模式,并使用 ETL 腳本將轉(zhuǎn)換后的數(shù)據(jù)寫(xiě)回 S3。所以我使用爬蟲(chóng)來(lái)檢測(cè)架構(gòu)并將數(shù)據(jù)加載到 pyspark 數(shù)據(jù)框中,然后更改架構(gòu)?,F(xiàn)在我遍歷數(shù)據(jù)框中的每一行并將其轉(zhuǎn)換為字典。刪除空列,然后將字典轉(zhuǎn)換為字符串并寫(xiě)回 S3。以下是代碼:#df is the pyspark dataframecolumns = df.columnsprint(columns)s3 = boto3.resource('s3')cnt = 1for row in df.rdd.toLocalIterator():    data = row.asDict(True)    for col_name in columns:        if data[col_name] is None:            del data[col_name]    content = json.dumps(data)    object = s3.Object('write-test-transaction-transformed', str(cnt)).put(Body=content)    cnt = cnt+1print(cnt)我用過(guò) toLocalIterator。上面代碼的執(zhí)行是串行執(zhí)行的嗎?如果是那么如何優(yōu)化它?有沒(méi)有更好的方法來(lái)執(zhí)行上述邏輯?
查看完整描述

3 回答

?
波斯汪

TA貢獻(xiàn)1811條經(jīng)驗(yàn) 獲得超4個(gè)贊

假設(shè),數(shù)據(jù)集中的每一行都是 json 字符串格式


import pyspark.sql.functions as F


def drop_null_cols(data):

    import json

    content = json.loads(data)

    for key, value in list(content.items()):

        if value is None:

            del content[key]


    return json.dumps(content)


drop_null_cols_udf = F.udf(drop_null_cols, F.StringType())


df = spark.createDataFrame(

    ["{\"name\":\"Ranga\", \"age\":25, \"city\":\"Hyderabad\"}",

     "{\"name\":\"John\", \"age\":null, \"city\":\"New York\"}",

     "{\"name\":null, \"age\":31, \"city\":\"London\"}"],

    "string"

).toDF("data")


df.select(

    drop_null_cols_udf("data").alias("data")

).show(10,False)

如果輸入數(shù)據(jù)框有 cols 并且輸出只需要不是 null cols json


df = spark.createDataFrame(

        [('Ranga', 25, 'Hyderabad'),

         ('John', None, 'New York'),

         (None, 31, 'London'),

        ],

        ['name', 'age', 'city']

    )


df.withColumn(

    "data", F.to_json(F.struct([x for x in df.columns]))

).select(

    drop_null_cols_udf("data").alias("data")

).show(10, False)


#df.write.format("csv").save("s3://path/to/file/) -- save to s3

結(jié)果


+-------------------------------------------------+

|data                                             |

+-------------------------------------------------+

|{"name": "Ranga", "age": 25, "city": "Hyderabad"}|

|{"name": "John", "city": "New York"}             |

|{"age": 31, "city": "London"}                    |

+-------------------------------------------------+


查看完整回答
反對(duì) 回復(fù) 2022-12-14
?
智慧大石

TA貢獻(xiàn)1946條經(jīng)驗(yàn) 獲得超3個(gè)贊

我將遵循以下方法(用 scala 編寫(xiě),但可以在 python 中以最小的變化實(shí)現(xiàn))-


找到數(shù)據(jù)集計(jì)數(shù)并將其命名為totalCount

val totalcount = inputDF.count()

查找count(col)所有數(shù)據(jù)框列,并將字段映射到它們的計(jì)數(shù)


這里對(duì)于輸入數(shù)據(jù)框的所有列,計(jì)算計(jì)數(shù)

請(qǐng)注意,count(anycol)返回提供的列全部非空的行數(shù)。例如 - 如果一列有 10 行值,如果說(shuō)有 5 個(gè)值,null則計(jì)數(shù)(列)變?yōu)?5

獲取第一行Map[colName, count(colName)]稱(chēng)為fieldToCount

val cols = inputDF.columns.map { inputCol =>

      functions.count(col(inputCol)).as(inputCol)

    }

// Returns the number of rows for which the supplied column are all non-null.

    // count(null) returns 0

    val row = dataset.select(cols: _*).head()

    val fieldToCount = row.getValuesMap[Long]($(inputCols))

獲取要?jiǎng)h除的列


在此處使用步驟#2 中創(chuàng)建的 Map,并將計(jì)數(shù)小于 totalCount 的列標(biāo)記為要?jiǎng)h除的列

從輸入數(shù)據(jù)框中選擇所有列,并count == totalCount根據(jù)要求以任何格式將處理后的輸出數(shù)據(jù)框保存在任何地方。

請(qǐng)注意,this approach will remove all the column having at least one null value

val fieldToBool = fieldToCount.mapValues(_ < totalcount)

val processedDF = inputDF.select(fieldToBool.filterNot(_._2).map(_.1) :_*)

// save this processedDF anywhere in any format as per requirement

我相信這種方法會(huì)比您目前使用的方法表現(xiàn)更好


查看完整回答
反對(duì) 回復(fù) 2022-12-14
?
Cats萌萌

TA貢獻(xiàn)1805條經(jīng)驗(yàn) 獲得超9個(gè)贊

我解決了上面的問(wèn)題。我們可以簡(jiǎn)單地查詢(xún)數(shù)據(jù)框的空值。df = df.filter(df.column.isNotNull()) 從而刪除所有存在 null 的行。所以如果有 n 列,我們需要 2^n 次查詢(xún)來(lái)篩選出所有可能的組合。在我的例子中,有 10 列,所以總共有 1024 個(gè)查詢(xún),這是可以接受的,因?yàn)?sql 查詢(xún)是并行化的。



查看完整回答
反對(duì) 回復(fù) 2022-12-14
  • 3 回答
  • 0 關(guān)注
  • 149 瀏覽
慕課專(zhuān)欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢(xún)優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)