1 回答

TA貢獻1851條經(jīng)驗 獲得超3個贊
您可以執(zhí)行類似的操作,但只需 1 個管道,并在轉(zhuǎn)換中添加一些附加代碼。
應(yīng)該beam.Map(lambda x: somefunction)
有兩個輸出:一個被寫入 GCS,一個被拒絕的元素最終將被寫入 BigQuery。
為此,您的轉(zhuǎn)換函數(shù)必須返回一個TaggedOutput
.
第二個PCollection
,然后您可以寫入 BigQuery。
您不需要Create
在管道的第二個分支中有一個。
管道將是這樣的:
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
?
? ? data = (pipeline1
? ? ? ? ? ? ? ?| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
? ? ? ? ? ? ? ?| 'combine output to list' >> beam.combiners.ToList()
? ? ? ? ? ? ? ?| 'tranform' >> beam.Map(transform)? # Tagged output produced here
? ? pcoll_to_gcs = data.gcs_output
? ? pcoll_to_bq? = data.rejected
? ? pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
? ? pcoll_to_bq? | "to bq" >> beam.io.WriteToBigQuery(....)
那么transform函數(shù)會是這樣的
def transform(element):
? if something_is_wrong_with_element:
? ? yield pvalue.TaggedOutput('rejected', element)
? transformed_element = ....
? yield pvalue.TaggedOutput('gcs_output', transformed_element)
添加回答
舉報