我可以根據(jù)在上一個(gè)數(shù)據(jù)流步驟中處理的數(shù)據(jù)將數(shù)據(jù)插入到不同的 bigQuery 數(shù)據(jù)集嗎?我正在創(chuàng)建一個(gè)數(shù)據(jù)流管道,它從 PubSub 訂閱中讀取數(shù)據(jù)并寫入大查詢表。其定義如下:def run(argv=None, save_main_session=True): options: PipelineOptions = PipelineOptions( project='project-id', runner='DataflowRunner', region='region', streaming=True, setup_file='dataflow/setup.py', autoscaling_algorithm='THROUGHPUT_BASED', job_name='telemetry-processing' ) with beam.Pipeline(options=options) as p: status = ( p | 'Get Status PubSub' >> beam.io.ReadFromPubSub( subscription='projects/project-id/subscriptions/subscription-id', with_attributes=True)) status_records = (status| 'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, nozzle_status_proto.NozzleStatus)) ) status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project- id:dataset-id.table-id') bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes())) bytes_status | 'Write to BQ BackUp' >> beam.io.WriteToBigQuery( 'project-id:dataset-id.backup-table-id')對(duì)于給定的輸入和輸出,它完全按照預(yù)期工作。我想要的是,關(guān)于 PubSubMessage 中的特定屬性,定義我的消息應(yīng)該發(fā)送到哪個(gè)數(shù)據(jù)集。所以我需要改變的部分是:status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:dataset-id.table-id')我已經(jīng)嘗試提取所需的數(shù)據(jù)并像這樣使用它:status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:{data-from-previous-step}.table-id')但我們無(wú)法直接從 PCollection 獲取數(shù)據(jù)。我嘗試像這篇文章中那樣覆蓋 WriteToBigQuery(How can I write to Big Query using a runtime valueprovider in Apache Beam?),但我沒(méi)有收到錯(cuò)誤,也沒(méi)有插入任何內(nèi)容。我不知道如何實(shí)現(xiàn)這一點(diǎn)。你知道我應(yīng)該從哪里開始做這件事嗎?我是否必須為 n 個(gè)數(shù)據(jù)集創(chuàng)建 n 個(gè)管道?
在數(shù)據(jù)流管道中動(dòng)態(tài)設(shè)置bigquery數(shù)據(jù)集
至尊寶的傳說(shuō)
2023-10-11 15:40:59
