第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

通過相關(guān)管道處理 Dataflow/Apache Beam 中的拒絕

通過相關(guān)管道處理 Dataflow/Apache Beam 中的拒絕

繁花不似錦 2023-08-22 15:03:49
我有一個從 BigQuery 獲取數(shù)據(jù)并將其寫入 GCS 的管道,但是,如果我發(fā)現(xiàn)任何拒絕,我想將它們糾正到 Bigquery 表。我將拒絕收集到全局列表變量中,然后將該列表加載到 BigQuery 表中。當我在本地運行該過程時,該過程運行良好,因為管道以正確的順序運行。當我使用 dataflowrunner 運行它時,它不能保證順序(我希望 pipeline1 在 pipeline2 之前運行。有沒有辦法使用 python 在 Dataflow 中擁有依賴的管道?或者還請建議是否可以用更好的方法解決這個問題。提前致謝。with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:     data = (pipeline1               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))               | 'combine output to list' >> beam.combiners.ToList()               | 'tranform' >> beam.Map(lambda x: somefunction)  # Collecting rejects in the except block of this method to a global list variable               ....etc               | 'to gcs' >> beam.io.WriteToText(output)               )# Loading the rejects gathered in the above pipeline to Biquerywith beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:    rejects = (pipeline2                    | 'create pipeline' >> beam.Create(reject_list)                    | 'to json format' >> beam.Map(lambda data: {.....})                    | 'to bq' >> beam.io.WriteToBigQuery(....)                    )
查看完整描述

1 回答

?
皈依舞

TA貢獻1851條經(jīng)驗 獲得超3個贊

您可以執(zhí)行類似的操作,但只需 1 個管道,并在轉(zhuǎn)換中添加一些附加代碼。

應(yīng)該beam.Map(lambda x: somefunction)有兩個輸出:一個被寫入 GCS,一個被拒絕的元素最終將被寫入 BigQuery。

為此,您的轉(zhuǎn)換函數(shù)必須返回一個TaggedOutput.

第二個PCollection,然后您可以寫入 BigQuery。

您不需要Create在管道的第二個分支中有一個。

管道將是這樣的:

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:

?

? ? data = (pipeline1

? ? ? ? ? ? ? ?| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))

? ? ? ? ? ? ? ?| 'combine output to list' >> beam.combiners.ToList()

? ? ? ? ? ? ? ?| 'tranform' >> beam.Map(transform)? # Tagged output produced here


? ? pcoll_to_gcs = data.gcs_output

? ? pcoll_to_bq? = data.rejected


? ? pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)

? ? pcoll_to_bq? | "to bq" >> beam.io.WriteToBigQuery(....)

那么transform函數(shù)會是這樣的


def transform(element):

? if something_is_wrong_with_element:

? ? yield pvalue.TaggedOutput('rejected', element)


? transformed_element = ....


? yield pvalue.TaggedOutput('gcs_output', transformed_element)


查看完整回答
反對 回復 2023-08-22
  • 1 回答
  • 0 關(guān)注
  • 1620 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號