1 回答

TA貢獻(xiàn)1858條經(jīng)驗(yàn) 獲得超8個(gè)贊
我認(rèn)為問題在于你有兩個(gè)獨(dú)立的管道試圖一起工作。您應(yīng)該將所有轉(zhuǎn)換作為單個(gè)管道的一部分執(zhí)行:
with beam.Pipeline(options=PipelineOptions()) as p:
side_input = p | "Reading from BQ side input" >> relational_db.ReadFromDB(
source_config=sideinput_bq_config,
table_name='abc',
query="SELECT * FROM abc")
my_pcoll = p | "Reading records from database" >> relational_db.ReadFromDB(
source_config=source_config,
table_name='xyzzy',
query="SELECT * FROM xyzzy",
) | beam.ParDo(
AppendCol(), beam.pvalue.AsIter(side_input))
添加回答
舉報(bào)