第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定
已解決430363個(gè)問題,去搜搜看,總會(huì)有你想問的

Google Cloud Composer DAG 沒有被觸發(fā)

Google Cloud Composer DAG 沒有被觸發(fā)

慕雪6442864 2023-05-16 14:58:15
我計(jì)劃從今天 2020/08/11 開始在東部標(biāo)準(zhǔn)時(shí)間 (NY) 周二至周六凌晨 04:00 運(yùn)行 DAG。編寫代碼并部署后,我預(yù)計(jì) DAG 會(huì)被觸發(fā)。我刷新了我的 Airflow UI 頁面幾次,但它仍然沒有觸發(fā)。我正在使用帶有 python 3 的 Airflow 版本 v1.10.9-composer。這是我的 DAG 代碼:"""This DAG executes a retrieval job"""# Required packages to execute DAGfrom __future__ import print_functionimport pendulumfrom airflow.models import DAGfrom airflow.models import Variablefrom datetime import datetime, timedeltafrom airflow.contrib.operators.ssh_operator import SSHOperatorfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.utils.trigger_rule import TriggerRulelocal_tz = pendulum.timezone("America/New_York")# DAG parametersdefault_args = {    'owner': 'Me',    'depends_on_past': False,    'start_date': datetime(2020, 8, 10, 4, tzinfo=local_tz),    'dagrun_timeout': None,    'email': Variable.get('email'),    'email_on_failure': True,    'email_on_retry': False,    'provide_context': True,    'retries': None,    'retry_delay': timedelta(minutes=5)}# create DAG object with Name and default_argswith DAG(        'retrieve_files',        schedule_interval='0 4 * * 2-6',        description='Retrieves files from sftp',        max_active_runs=1,        catchup=True,        default_args=default_args) as dag:    # Define tasks - below are dummy tasks and a task instantiated by SSHOperator- calling methods written in other py class    start_dummy = DummyOperator(        task_id='start',        dag=dag    )    end_dummy = DummyOperator(        task_id='end',        trigger_rule=TriggerRule.NONE_FAILED,        dag=dag    )    retrieve_file = SSHOperator(        ssh_conn_id="my_conn",        task_id='retrieve_file',        command='/usr/bin/python3  /path_to_file/getFile.py',        dag=dag)    dag.doc_md = __doc__    retrieve_file.doc_md = """\    #### Task Documentation    Connects to sftp and retrieves files.    """    start_dummy >> retrieve_file >> end_dummy
查看完整描述

1 回答

?
郎朗坤

TA貢獻(xiàn)1921條經(jīng)驗(yàn) 獲得超9個(gè)贊

參考官方文檔:

調(diào)度程序會(huì)在開始日期之后的一個(gè) schedule_interval 運(yùn)行您的作業(yè)。

如果您的 start_date 是 2020-01-01 并且 schedule_interval 是@daily,則第一次運(yùn)行將在 2020-01-02 上創(chuàng)建,即在您的開始日期過后。

為了在每天的特定時(shí)間(包括今天)運(yùn)行 DAG,start_date需要將時(shí)間設(shè)置為過去的時(shí)間,并且schedule_interval需要具有所需的時(shí)間格式cron。正確設(shè)置昨天的日期時(shí)間非常重要,否則觸發(fā)器將不起作用。

在這種情況下,我們應(yīng)該將 設(shè)置start_date為上一周的星期二,即:(2020, 8, 4)。由于每周運(yùn)行一次,因此從開始日期起應(yīng)該有 1 周的間隔。

讓我們看一下以下示例,它展示了如何在美國東部標(biāo)準(zhǔn)時(shí)間周二至周六凌晨 04:00 運(yùn)行作業(yè):

from datetime import datetime, timedelta

from airflow import models

import pendulum

from airflow.operators import bash_operator


local_tz = pendulum.timezone("America/New_York")


default_dag_args = {

? ? 'start_date': datetime(2020, 8, 4, 4, tzinfo=local_tz),

? ? 'retries': 0,

}


with models.DAG(

? ? ? ? 'Test',

? ? ? ? default_args=default_dag_args,

? ? ? ? schedule_interval='00 04 * * 2-6') as dag:

? ? ? ?# DAG code

? ? print_dag_run_conf = bash_operator.BashOperator(

? ? ? ? task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

我建議您查看start_date 文檔的處理方式。



查看完整回答
反對(duì) 回復(fù) 2023-05-16
  • 1 回答
  • 0 關(guān)注
  • 154 瀏覽
慕課專欄
更多

添加回答

舉報(bào)

0/150
提交
取消
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)