3 回答

TA貢獻(xiàn)1827條經(jīng)驗(yàn) 獲得超4個(gè)贊
我知道我遲到了,但我會選擇第二種模式:“2 dags。一個(gè)用 TriggerDagOperator 感知和觸發(fā),一個(gè)進(jìn)程”,因?yàn)椋?/p>
每個(gè)文件都可以并行執(zhí)行
第一個(gè) DAG 可以選擇要處理的文件,重命名它(添加后綴 '_processing' 或?qū)⑵湟苿?dòng)到處理文件夾)
如果我是貴公司的新開發(fā)人員,我打開工作流,我想了解工作流的邏輯是什么,而不是上次處理了哪些文件是動(dòng)態(tài)構(gòu)建的
如果 dag 2 發(fā)現(xiàn)文件存在問題,則將其重命名(使用 '_error' 后綴或?qū)⑵湟苿?dòng)到錯(cuò)誤文件夾)
這是一種處理文件的標(biāo)準(zhǔn)方法,無需創(chuàng)建任何額外的運(yùn)算符
它使 de DAG 冪等且更易于測試。本文中的更多信息
重命名和/或移動(dòng)文件是在每個(gè) ETL 中處理文件的一種非常標(biāo)準(zhǔn)的方法。
順便說一句,我總是推薦這篇文章https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753。它沒有

TA貢獻(xiàn)1811條經(jīng)驗(yàn) 獲得超5個(gè)贊
似乎您應(yīng)該能夠使用 bash 運(yùn)算符運(yùn)行批處理器 dag 來清除文件夾,只需確保depends_on_past=True
在 dag 上進(jìn)行設(shè)置以確保在下次安排 dag 之前成功清除文件夾。

TA貢獻(xiàn)1856條經(jīng)驗(yàn) 獲得超11個(gè)贊
我發(fā)現(xiàn)這篇文章:https ://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13
其中使用了一個(gè)新的運(yùn)算符,即 TriggerMultiDagRunOperator。我認(rèn)為這符合我的需要。
添加回答
舉報(bào)