我可以根據(jù)在上一個數(shù)據(jù)流步驟中處理的數(shù)據(jù)將數(shù)據(jù)插入到不同的 bigQuery 數(shù)據(jù)集嗎?我正在創(chuàng)建一個數(shù)據(jù)流管道,它從 PubSub 訂閱中讀取數(shù)據(jù)并寫入大查詢表。其定義如下:def run(argv=None, save_main_session=True): options: PipelineOptions = PipelineOptions( project='project-id', runner='DataflowRunner', region='region', streaming=True, setup_file='dataflow/setup.py', autoscaling_algorithm='THROUGHPUT_BASED', job_name='telemetry-processing' ) with beam.Pipeline(options=options) as p: status = ( p | 'Get Status PubSub' >> beam.io.ReadFromPubSub( subscription='projects/project-id/subscriptions/subscription-id', with_attributes=True)) status_records = (status| 'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, nozzle_status_proto.NozzleStatus)) ) status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project- id:dataset-id.table-id') bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes())) bytes_status | 'Write to BQ BackUp' >> beam.io.WriteToBigQuery( 'project-id:dataset-id.backup-table-id')對于給定的輸入和輸出,它完全按照預期工作。我想要的是,關于 PubSubMessage 中的特定屬性,定義我的消息應該發(fā)送到哪個數(shù)據(jù)集。所以我需要改變的部分是:status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:dataset-id.table-id')我已經(jīng)嘗試提取所需的數(shù)據(jù)并像這樣使用它:status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:{data-from-previous-step}.table-id')但我們無法直接從 PCollection 獲取數(shù)據(jù)。我嘗試像這篇文章中那樣覆蓋 WriteToBigQuery(How can I write to Big Query using a runtime valueprovider in Apache Beam?),但我沒有收到錯誤,也沒有插入任何內(nèi)容。我不知道如何實現(xiàn)這一點。你知道我應該從哪里開始做這件事嗎?我是否必須為 n 個數(shù)據(jù)集創(chuàng)建 n 個管道?
1 回答

婷婷同學_
TA貢獻1844條經(jīng)驗 獲得超8個贊
WriteToBigQuery 的“table”參數(shù)可以是從元素到應寫入的表的函數(shù)。例如:
status_records | 'Write' >> beam.io.WriteToBigQuery( lambda e: 'dataset1.invalid_records' if is_invalid(e) else 'dataset2.good_records')
添加回答
舉報
0/150
提交
取消