第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

在數(shù)據(jù)流管道中動態(tài)設置bigquery數(shù)據(jù)集

在數(shù)據(jù)流管道中動態(tài)設置bigquery數(shù)據(jù)集

至尊寶的傳說 2023-10-11 15:40:59
我可以根據(jù)在上一個數(shù)據(jù)流步驟中處理的數(shù)據(jù)將數(shù)據(jù)插入到不同的 bigQuery 數(shù)據(jù)集嗎?我正在創(chuàng)建一個數(shù)據(jù)流管道,它從 PubSub 訂閱中讀取數(shù)據(jù)并寫入大查詢表。其定義如下:def run(argv=None, save_main_session=True):    options: PipelineOptions = PipelineOptions(        project='project-id',        runner='DataflowRunner',        region='region',        streaming=True,        setup_file='dataflow/setup.py',        autoscaling_algorithm='THROUGHPUT_BASED',        job_name='telemetry-processing'    )    with beam.Pipeline(options=options) as p:        status = (                p                 | 'Get Status PubSub' >> beam.io.ReadFromPubSub(            subscription='projects/project-id/subscriptions/subscription-id',            with_attributes=True))        status_records = (status| 'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, nozzle_status_proto.NozzleStatus)) )        status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project- id:dataset-id.table-id')         bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes()))         bytes_status | 'Write to BQ BackUp' >> beam.io.WriteToBigQuery(        'project-id:dataset-id.backup-table-id')對于給定的輸入和輸出,它完全按照預期工作。我想要的是,關于 PubSubMessage 中的特定屬性,定義我的消息應該發(fā)送到哪個數(shù)據(jù)集。所以我需要改變的部分是:status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:dataset-id.table-id')我已經(jīng)嘗試提取所需的數(shù)據(jù)并像這樣使用它:status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:{data-from-previous-step}.table-id')但我們無法直接從 PCollection 獲取數(shù)據(jù)。我嘗試像這篇文章中那樣覆蓋 WriteToBigQuery(How can I write to Big Query using a runtime valueprovider in Apache Beam?),但我沒有收到錯誤,也沒有插入任何內(nèi)容。我不知道如何實現(xiàn)這一點。你知道我應該從哪里開始做這件事嗎?我是否必須為 n 個數(shù)據(jù)集創(chuàng)建 n 個管道?
查看完整描述

1 回答

?
婷婷同學_

TA貢獻1844條經(jīng)驗 獲得超8個贊

WriteToBigQuery 的“table”參數(shù)可以是從元素到應寫入的表的函數(shù)。例如:

status_records | 'Write' >> beam.io.WriteToBigQuery(
  lambda e: 'dataset1.invalid_records' if is_invalid(e) else 'dataset2.good_records')


查看完整回答
反對 回復 2023-10-11
  • 1 回答
  • 0 關注
  • 105 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網(wǎng)微信公眾號