第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會有你想問的

氣流 - 創(chuàng)建 dag 和任務(wù)動態(tài)地為一個(gè)對象創(chuàng)建管道

氣流 - 創(chuàng)建 dag 和任務(wù)動態(tài)地為一個(gè)對象創(chuàng)建管道

慕碼人2483693 2024-01-15 17:09:42
在氣流中,我想將一些表從 pg 導(dǎo)出到 BQ。task1: get the max id from BQtask2: export the data from PG (id>maxid)task3: GCS to BQ stagetask4: BQ stage to BQ main但有一個(gè)小挑戰(zhàn),日程間隔不同。所以我創(chuàng)建了一個(gè) JSON 文件來告訴同步間隔。因此,如果是 2 分鐘,那么它將使用 DAG upsert_2mins,否則將使用 10 分鐘間隔 ( upsert_10mins) 。我使用這個(gè)語法來動態(tài)生成它。JSON 配置文件:{    "tbl1": ["update_timestamp", "2mins", "stg"],    "tbl2": ["update_timestamp", "2mins", "stg"]}它實(shí)際上創(chuàng)建了 dag,但問題是來自 Web UI,我能夠看到最后一個(gè)表的任務(wù)。但它必須顯示 2 個(gè)表的任務(wù)。
查看完整描述

1 回答

?
慕森卡

TA貢獻(xiàn)1806條經(jīng)驗(yàn) 獲得超8個(gè)贊

您的代碼正在創(chuàng)建 2 個(gè) dags,每個(gè)表一個(gè),但用第二個(gè)覆蓋第一個(gè)。


我的建議是將 JSON 文件的格式更改為:


{

    "2mins": [

                "tbl1": ["update_timestamp", "stg"],

                "tbl2": ["update_timestamp", "stg"]

             ],

    "10mins": [

                "tbl3": ["update_timestamp", "stg"],

                "tbl4": ["update_timestamp", "stg"]

             ]

}

讓您的代碼迭代計(jì)劃并為每個(gè)表創(chuàng)建所需的任務(wù)(您將需要兩個(gè)循環(huán)):


# looping on the schedules to create two dags

for schedule, tables in config.items():


cron_time = '*/10 * * * *'


if schedule== '2mins':

    cron_time = '*/20 * * * *'


dag_id = 'upsert_every_{}'.format(schedule)


dag = DAG(

    dag_id ,

    default_args=default_args,

    description='Incremental load - Every 10mins',

    schedule_interval=cron_time,

    catchup=False,

    max_active_runs=1,

    doc_md = docs

)


# Looping over the tables to create the tasks for 

# each table in the current schedule

for table_name, table_config in tables.items():

    max_ts = PythonOperator(

        task_id="get_maxts_{}".format(table_name),

        python_callable=get_max_ts,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )


    export_gcs = PythonOperator(

        task_id='export_gcs_{}'.format(table_name),

        python_callable=pgexport,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )


    stg_load = PythonOperator(

        task_id='stg_load_{}'.format(table_name),

        python_callable=stg_bqimport,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )    


    merge = PythonOperator(

        task_id='merge_{}'.format(table_name),

        python_callable=prd_merge,

        op_kwargs={'tablename':table_name, 'dag': dag},

        provide_context=True,

        dag=dag

    )

    

    # Tasks for the same table will be chained

    max_ts >> export_gcs >> stg_load >> merge


# DAG is created among the global objects

globals()[dag_id] = dag


查看完整回答
反對 回復(fù) 2024-01-15
  • 1 回答
  • 0 關(guān)注
  • 134 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號