第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

在 Airflow 中的組件之間傳輸數(shù)據(jù)

在 Airflow 中的組件之間傳輸數(shù)據(jù)

瀟湘沐 2021-12-29 20:19:25
我對 Airflow 很陌生,并且已經(jīng)閱讀了大部分文檔。從文檔中,我了解到 DAG 中組件之間的小數(shù)據(jù)可以使用 XCom 類共享。DAG 中發(fā)布數(shù)據(jù)的組件必須推送,訂閱數(shù)據(jù)的組件必須拉取。但是,我對推和拉的語法部分不是很清楚。我指的是關(guān)于文檔的XCom 部分并開發(fā)了一個代碼模板。假設(shè)我有以下代碼,它只有兩個組件,一個 pusher 和一個 puller。pusher 發(fā)布 puller 必須消耗的當前時間并寫入日志文件。from datetime import datetimefrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorlog_file_location = '/usr/local/airflow/logs/time_log.log'default_args = {'owner':'apache'}dag = DAG('pushpull', default_args = default_args)def push_function():    #push this data on the DAG as key-value pair    return(datetime.now()) #current timedef pull_function():    with open(log_file_location, 'a') as logfile:        current_time = '' #pull data from the pusher as key - value pair        logfile.writelines('current time = '+current_time)    logfile.close()with dag:    t1 = PythonOperator(        task_id = 'pusher',         python_callable = push_function)    t2 = PythonOperator(        task_id = 'puller',         python_callable = pull_function)    t2.set_upstream(t1)我需要 Airflow 大師在兩種語法上的幫助:如何從推送功能連同鍵推送數(shù)據(jù)如何獲得 pull 函數(shù)使用 key 拉取數(shù)據(jù)。
查看完整描述

1 回答

?
天涯盡頭無女友

TA貢獻1831條經(jīng)驗 獲得超9個贊

使用密鑰推送到 Xcom 的示例:


def push_function(**context):

    msg='the_message'

    print("message to push: '%s'" % msg)

    task_instance = context['task_instance']

    task_instance.xcom_push(key="the_message", value=msg)

使用密鑰拉到 Xcom 的示例:


def pull_function(**kwargs):

    ti = kwargs['ti']

    msg = ti.xcom_pull(task_ids='push_task',key='the_message')

    print("received message: '%s'" % msg)

示例 DAY:


from datetime import datetime, timedelta

from airflow.models import DAG

from airflow.operators.python_operator import PythonOperator


DAG = DAG(

  dag_id='simple_xcom',

  start_date=datetime(2017, 10, 26),

  schedule_interval=timedelta(1)

)


def push_function(**context):

    msg='the_message'

    print("message to push: '%s'" % msg)

    task_instance = context['task_instance']

    task_instance.xcom_push(key="the_message", value=msg)


push_task = PythonOperator(

    task_id='push_task', 

    python_callable=push_function,

    provide_context=True,

    dag=DAG)


def pull_function(**kwargs):

    ti = kwargs['ti']

    msg = ti.xcom_pull(task_ids='push_task',key='the_message')

    print("received message: '%s'" % msg)


pull_task = PythonOperator(

    task_id='pull_task', 

    python_callable=pull_function,

    provide_context=True,

    dag=DAG)


push_task >> pull_task


查看完整回答
反對 回復 2021-12-29
  • 1 回答
  • 0 關(guān)注
  • 325 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號