如何優(yōu)化Apache Airflow:23個(gè)實(shí)用技巧提升工作效率與可維護(hù)性
Apache Airflow 最佳实践指南:23 个提高性能和可靠性的贴士
如果你使用Apache Airflow,你就会明白维护高效且可靠的任务流程可能是个不小的挑战。即使是经验丰富的团队也可能因为老旧的做法或代码组织不合理而遇到问题。
在这篇文章里,我们将分享优化Airflow使用的23个必备技巧,确保更高的性能、更好的可扩展性和更简单的维护。从文件夹结构到高级并发策略等方面,这些最佳实践能帮助你和你的团队避免常见错误和陷阱,并显著改进工作流程和简化维护。
让我们直接进入主题,看看如何在实际中落实这些改进!
开始聊天作为一名数据工程师,我有这样的经验:我们常常错误地假设业内最佳实践有着普遍的共识。最近在与Datacoves合作的一个企业项目中,当我们审查Airflow的一个仓库时,很明显需要对Airflow的最佳实践进行复习,这不仅对大家有益,而且必不可少。这提醒我们,即使是经验丰富的专业人士,也可以从回顾和加强核心原则中获益。
技术的不断变化以及即使是在经验丰富的团队中也可能出现的知识鸿沟,强调了持续学习和定期复习旧课的重要性。这不仅确保团队保持一致并高效运作,还帮助缓解因过时做法或误解可能引起的问题。
在深入了解具体技巧和最佳实践之前,理解Apache Airflow运作中的一个关键原则至关重要。这将帮助你理解为什么存在某些最佳实践,以及它们如何影响流程的可靠性和效率。
当你的Airflow环境在运行时,调度器每30秒会检查一次你的DAG文件
这意味着每30秒,Airflow 会扫描你的DAGs来确定并执行所有顶层的任务。
我会在后面再提到这一点,所以请记住这一点。另外,这些最佳实践不仅是为了节省成本,它们还旨在让新成员更容易融入团队并理解DAGs。
让我们先来谈谈Airflow仓库的组织结构。
- 文件夹结构
为了确保清晰度和方便导航在Apache Airflow项目中,这种结构对于组织工作流和理解数据如何通过管道流动至关重要。一个组织良好的文件夹层次结构反映了任务的逻辑流程,以及数据从原始来源转换为处理完毕、可供业务使用的输出的过程。在接下来的第 2 节中,你可以看到一个理想的 Airflow 仓库结构示例。
2. 如何命名在开发时,使用有意义的文件和文件夹名称以清晰地传达其用途是很重要的。除非缩写已被广泛认知或遵循了现有的命名规范,否则应避免使用缩写。每个文件都应该有一个独特且描述性强的名称,反映其具体任务、流程或模型。例如,不要将多个DAG文件简单命名为类似azure_extract<source>_daily.py这样的名称,而是可以考虑通过文件夹结构来组织这些文件,以消除冗余。如果没有这样的结构,找到特定文件会变得比较困难,因为有意义的部分往往被埋在名称的中间。下面是一个理想状态下Airflow仓库应该如何组织的例子。
project_name/
# this is the root directory of the project
├── dags
# dag is a Directed Acyclic Graph, used to define the workflow in Airflow
│ ├── erp_system/
# ERP system related DAG files
│ ├── sellout_daily.py
│ └── sellin_daily.py
├── api_from_partner/
# APIs provided by partners, used to gather data from external sources
│ └── customer_daily.py
├── templates/
# HTML templates used for generating emails
│ └── email_template.html
├── utils/
# utility functions and configurations
├── constants/
# constants and default arguments used in the project
│ ├── constants.py
│ └── default_args.py
├── mailing/
# functions for sending emails
│ └── send_email.py
├── operators/
# custom operators defined for the project
└── my_meaningful_named_operator.py
# a custom operator with a meaningful name
通过遵循这两条规则,仓库将保持井井有条,使你的日常任务更加高效。现在我们来谈谈DAG文件,这里才是真正有价值的部分。
3 导入:逻辑地组织您的导入项以提高可读性和效率。首先导入标准 Python 库,接着导入 Airflow 特定的库,最后导入任何第三方或自定义库。直接在任务函数内部导入该模块。这样可以避免不必要的每30秒导入该模块,从而避免在任务执行时浪费资源,因为该模块仅在此时需要。有关此方法的示例,请参阅第11节。
import os
import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from utils.custom_package import custom_function
4: 常量
在文件顶部声明所有用于定义DAG的全局变量,并使用大写字母区分它们和局部变量。这可能包括默认参数、DAG设置或常量。如果这些全局常量被多个DAG使用,应将其集中在一个单独的文件里,例如 constants.py,以促进重用和维护。对于特定于任务的其他变量,请在 @task 装饰器内导入它们,以保持DAG文件的简洁和高效。请参阅第2节中的示例了解如何放置 constants.py 文件,并在下面提供了一个关于如何设置全局变量的实例。
# 设置环境变量
ENV = "{{ var.value.environment }}"
# 获取当前脚本的绝对路径
CUR_DIR = os.path.abspath(os.path.dirname(__file__))
# 获取日志记录器
LOGGER = logging.getLogger(__name__)
5. DAG 模式
The Airflow DAG 装饰器,引入于 Airflow 2.0,符合现代 Python 实践,通过采用基于装饰器的 API。这种方法确保您的 DAG 与最新的 Airflow 功能和约定保持一致。通过将 DAG 封装在一个函数中,它促进了模块化,使 DAG 更易于管理和测试,同时也更便于重复使用。此外,它消除了定义 DAG 时显式使用 ‘WITH’ 块的需要,并消除了每个任务中的重复 dag=dag 指定,从而使代码更加干净和易读。对于新的开发项目,这种方法应成为标准做法。下面的图示展示了 DAG 装饰器的用法。
从 utils.default_args.default_args 导入 default_args
@dag(
dag_id="my_dag_decorator_example",
default_args=default_args,
description="此DAG是使用DAG装饰器定义的",
schedule="0 2 * * *", # 每天早上2点
tags=["data_engineer_team", "load", "daily"],
catchup=False, # 是否追加运行=False
)
6. 默认参数
为了防止重复并保持任务间的一致性,可以在 _default_args
字典中定义所有任务的通用参数。如果这些通用参数被多个 DAG 共用,可以将它们放在一个单独的文件(如 _default_args.py
)中,以提高可重用性和可维护性。有关如何放置此文件并有效结构化它的示例,请参阅第 2 节。以下是一个代码片段,展示如何有效组织它。
# 这是用于airflow DAGs(有向无环图)的标准配置参数(default_args)
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 2, 7),
'retries': 5,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(minutes=60),
'email': [],
'email_on_failure': True,
'email_on_retry': False,
'depends_on_past': False
}
7. DRY(不要重复自己,编程原则之一)
在你的DAG文件夹内创建一个utils/或python_scripts/文件夹,用来存放常用的函数。这种方法有助于保持代码的DRY(不要重复自己)原则,并通过集中可重用的逻辑来促进代码的模块化。请参考第2节中的示例,了解如何放置该目录以及如何有效组织它。
8. 文档(此处指...)使用 _doc_md
参数来用 Markdown 记录您的 DAG。这使文档可以直接在 Airflow UI 中访问,为未来的开发人员提供 DAG 的目的、功能和工作流程的清晰解释。这一做法有助于提高可维护性,并使新团队成员更容易上手。以下是一个此最佳实践的实际示例。
@dag(
dag_id="my_documented_dag_example",
default_args=default_args,
description="有意义的描述:",
schedule="0 3 * * *", # 每天早上3点
tags=["analytics_team", "transform", "daily"],
catchup=False,
doc_md="""
### 此DAG执行多个数据转换操作
**步骤:**
1. **生成数据** – 生成初始数据集
2. **处理数据** – 应用转换和业务逻辑处理。
3. **存储结果** – 保存最终结果供后续使用。
"""
)
9 监控:
利用您组织首选的告警系统来接收任务失败和其他关键事件的通知。配置告警,确保在失败时发出通知,但默认将_email_on_retry_设置为False,以减少因重试产生的不必要的告警。作为标准做法,所有DAG都应该启用_alert_on_failure_功能,以确保能够及时收到通知。这种方法有助于自动化监控,减少对人工检查的依赖,提高效率并增强可靠性。
10. DAG 标签为确保清晰的组织和明确的归属,请为所有DAG正确打标签。其中应有一个标签用来标识所属团队或个人,格式为:_名称team。使用有意义的标签来逻辑分组DAGs,避免使用过于具体(仅适用于单个DAG)或过于宽泛(适用于所有DAG)的标签。这样做可以提高发现的便利性,简化管理,并帮助团队快速识别和管理他们的工作流。
tags=["DE团队,", "关键", "ingest", "hourly"]
11. 任务超时时限:当任务执行时间超出预定时间限制时
确保所有任务都设置了超时值,以避免浪费资源。这可以通过在 _defaultargs 字典中添加超时参数来高效处理,如第6节所示。如果没有超时设置,任务可能会导致未被注意到的云成本增加。因为任务一旦运行无限期就不会触发失败警报,设置超时不仅确保及时通知,还帮助控制成本。
@task(execution_timeout=timedelta(seconds=60)) # 设置超时时间为60秒
def example_task_with_timeout():
import time
print("任务开始运行")
time.sleep(90) # 模拟一个长时间运行的任务,这里等待90秒
print("任务已完成")
12. DAGs (有向无环图) 超时问题
使用 _dagruntimeout 参数来为整个 DAG 设置超时时间。这与任务级别的超时类似,但针对整个 DAG。它确保整个 DAG 的运行总时间不超过设定的限制。这能防止 DAG 无限制地运行,从而避免资源浪费和不必要的成本增加。通过设置 _dagruntimeout,您可以增加一层额外的控制,以防止工作流中出现意外延迟或问题。
@dag(
dag_id='dag_with_dagrun_timeout',
start_date=datetime(2025, 3, 7), # DAG的开始日期为
schedule_interval=None,
dagrun_timeout=timedelta(minutes=15), # DAG运行超时时间为15分钟
catchup=False,
description='一个带有dagrun超时参数的DAG'
)
13. 任务重试次数
为任务设置 retries 参数以定义适当的重试次数,有助于避免由于暂时或预料中的问题导致的不必要的失败。建议在 default_args 字典中配置 retries 参数,以确保所有任务的最小重试次数保持一致。此外,设置 _retrydelay 以在重试之间引入延迟,这有助于减轻由间歇性或临时问题引发的失败。请参阅第 6 节以了解如何有效设置这些参数。
14. 任务代码管理不要把任何代码放在任务或操作之外。所有的逻辑都应该放在任务或操作内部,以保持清晰、避免副作用并优化性能。正如之前提到的,Airflow 每 30 秒解析一次 DAG。如果代码放在顶层(即任务或操作之外),它将在每次解析周期中执行。这不仅会增加解析时间,消耗不必要的资源,还可能导致成本增加,而实际上并没有运行所需的作业逻辑。这里是一个违反此规则的代码例子:
import pendulum
from airflow import DAG
from airflow.decorators import task
def expensive_api_call():
print("来自Airflow的问候!") my_expensive_response = expensive_api_call() with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
@task()
def print_expensive_api_call():
print(my_expensive_response)
为了解决这个问题,将 _expensive_apicall() 函数移到任务装饰器中,以确保它仅在任务执行期间运行,而不是在解析DAG时运行。你可以按照以下方式重构代码。
import pendulum # 导入 pendulum 库
from airflow import DAG # 从 airflow 导入 DAG
from airflow.decorators import task # 从 airflow.decorators 导入 task
def expensive_api_call():
print("来自Airflow的问候!") # 问候来自Airflow
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag: # 设置一个名为 example_python_operator 的DAG, 它没有预定的调度, 起始日期为2024年1月1日, 时区为UTC, 并且不会进行追赶补发
@task()
def print_expensive_api_call():
my_expensive_response = expensive_api_call()
print(my_expensive_response) # 定义一个名为 print_expensive_api_call 的任务, 它调用 expensive_api_call 并打印响应
15. 并发与并行计算
管理和明确界定 Airflow 环境配置(如并发和并行设置)至关重要,以避免资源耗尽或瓶颈问题。Airflow 允许你在 DAG 运行中同时可执行的任务数、可同时发生的 DAG 运行数上进行控制,这些控制可以在三个层级上应用:Airflow 环境层级(影响整个系统的行为)、DAG 层级和任务层级。不过,我强烈建议你进行一个关键配置:停止使用默认池,转而定义你自己的自定义池。
对于不熟悉此概念的人来说,让我举个例子。想象一个包含10个独立任务的DAG,这些任务可以并行执行。如果该DAG配置了一个池,其中包含7个槽,即使有足够的资源同时运行这10个任务,由于_池_的限制,一次也只能运行7个任务。
_池_在多个DAG访问同一个共享资源时特别有用。有时,过多的同时连接可能导致错误。通过将访问同一资源的任务分配到一个槽位有限的共享池里,可以确保这些任务永远不会同时执行,不论它们何时安排。这样可以保持稳定,防止冲突。
@task(
pool="my_pool")
# 供"my_pool"池的任务分配
16. 变量获取
尽量避免在Airflow中使用Variable.get()
。默认情况下,Airflow每30秒解析一次你的DAG代码,这意味着每次调用Variable.get()
都会触发一次数据库查询来获取变量值。这不仅会带来不必要的开销,还会导致成本上升,特别是如果变量是从按请求收费的外部服务获取的。此外,直接将值硬编码到任务或操作中可能会影响DAG的可维护性和安全性。
更好的选择是使用Jinja 模板引擎。Jinja 允许你定义参数并在运行时动态生成值,从而减少频繁访问数据库的需要,并提高效率和性能。
话说,Variable.get() 仍然可以在某些场景中使用,例如当它被放在任务里、在任务装饰器里,或者在 DAG 文件中那些在定义时不会被执行的函数内部。这样可以保证变量只在任务执行时获取,而不是在 DAG 定期解析时。
以下是一个有效展示如何使用Jinja模板的示例:
@task
def example_task(my_vars):
print(f"这是一个Airflow变量:{my_vars}")
print("任务已完成")
example_task("{{ var.value.my_vars }}")
17 任务组
在 Airflow 中使用 TaskGroup 可以帮助你创建一个更加整洁和有条理的 DAG,特别是在处理复杂工作流时。这可以让 DAG 的结构更加简化,也更容易管理。在 Airflow UI 中,TaskGroup 中的任务会显示为一个可折叠的框,你可以展开它来查看里面的各个任务。这种嵌套特性不仅提高了可读性,也使得浏览和调试复杂的工作流程更加简单。
@task_group(group_id='my_task_group') # 指定任务组ID为'my_task_group'
def my_task(): # 定义任务函数
t1 = EmptyOperator(task_id='task_a') # 创建任务a的空操作符
t2 = EmptyOperator(task_id='task_b') # 创建任务b的空操作符
t1 >> t2 # 将t1连接到t2
18. 让你的DAG(有向无环图)模块化且保持整洁
避免在DAG文件中直接嵌入SQL查询、bash命令或大段脚本。相反,通过将此类代码拆分到独立的模块或外部文件(如Python脚本或SQL文件)里,来保持DAG逻辑的简洁和模块化。然后可以使用适当的导入或操作符来运行这些外部资源。这样做不仅提高了DAG的可读性,还增强了代码的复用性并简化了维护。此外,直接在DAG文件中嵌入大量代码会增加解析时间,可能会影响性能。
19. 更好地利用数据集通过TriggerDagOperator和ExternalTaskSensor来处理DAG (有向无环图)依赖关系在建立DAG之间的依赖关系时,建议使用数据集作为主要方法。数据集提供了一种更灵活和可扩展的解决方案来管理跨DAG的依赖关系,通常更容易维护和监控,并且它们支持数据驱动工作流程的原生支持。这使得它们比手动方法(如_TriggerDagRunOperator_或ExternalTaskSensor)更高效,后者需要更多的实现和管理精力。请参考下面的示例。它展示了如何配置触发另一个DAG的任务的输出outlet(这里指出口或输出)。
从airflow.datasets导入类Dataset
@task(
task_id="my_dataset_task outlet", # 任务ID
outlets=[Dataset("my_dataset")] # 添加数据集输出
)
# 数据集名称
数据集: "my_dataset"
这个示例展示了如何为一个其任务依赖于另一个 DAG 中任务生成的数据集的 DAG 设置任务计划。
from airflow.datasets import Dataset # 引入Airflow中的Dataset模块
这是一个定义了使用my_dataset数据集调度的DAG的任务定义。
@dynamic图(
dag_id="my_dag_with_dataset_schedule",
schedule=[Dataset("my_dataset")]),
)
20. 可延迟的操作
如果你需要使用像 TriggerDagRunOperator , ExternalTaskSensor 或任何其他 可延迟的操作符,请确保将 deferrable 参数设置为 True 。这可以确保在等待任务完成的过程中释放工作资源,避免不必要的资源消耗。否则,如果不启用 deferrable 模式,工作者会在等待依赖任务完成时保持空闲并占用资源,这会带来效率上的损失。
触发子DAG任务 = TriggerDagRunOperator(
任务ID="触发子dag",
触发的DAG ID="子dag",
等待完成=True,
deferrable=True
)
21. 确保任务的原子性
每个DAG中的任务都应该被设计为执行一个特定的单一动作。这个原则被称为原子性,有助于避免多步骤任务在最后一步失败的情况,使你需要从头开始重新运行整个任务。通过将任务分解成更小、更细粒度的步骤,你可以更好地控制重试、日志记录和故障恢复,使工作流更加健壮且易于管理。
22. 代码质量所有 Python 代码都应使用 flake8 或类似工具进行规范检查,以确保代码质量和遵循风格指南的一致性。为了确保这一做法被执行,可以考虑将自动代码审查工具集成到您的 CI/CD 管道中。这有助于尽早发现问题,并保持良好的可读性和可维护性。
23. 在拉取请求中使用良好的实践检查表。在提交DAG以供审查时,请在您的拉取请求模板中包含一个最佳实践清单。此清单应涵盖关键指南,例如将逻辑拆分并模块化,使用时间相关的参数,有效管理依赖关系,以及确保任务的独立性和不可分割性。通过这样做,这可以促进贡献的一致性,简化代码审查过程,使未来的维护更简单。这种方法不仅促进编码标准的执行,还有助于审阅者和贡献者保持一致的期望。以下是按照此结构组织您的DAG检查表的方法。
[ ] 是否首先导入Python标准库,然后是Airflow特定的库?
[ ] 所有全局变量(常量)是否都用大写声明在文件开头?
[ ] DAG是否使用装饰器声明?
[ ] `default_args`字典是否正确地配置了常见的和可重用的参数?
[ ] 可重用函数是否存储在`utils/`目录下?
[ ] 报警系统是否已正确配置?
[ ] DAG是否被适当标记,包括负责的团队?
[ ] 所有任务和DAG是否有设置超时时间?
[ ] 任务的`retries`参数是否设置了必要的最小配置?
[ ] 所有代码是否都在任务或操作符内?
[ ] 并发和并行设置是否正确定义?
[ ] 是否使用Jinja模板而不是`Variable.get()`或硬编码的值?
[ ] 是否使用TaskGroup来组织复杂的流程?
[ ] DAG是否模块化?复杂的逻辑、SQL或bash脚本是否分别放在外部文件中?
[ ] 是否使用Dataset而不是`TriggerDagRunOperator`或`ExternalTaskSensor`来处理DAG之间的依赖?
[ ] 是否将可延期操作符的`deferrable`参数设置为`True`?
[ ] 所有任务是否原子化,只执行一个特定的动作?
[ ] 所有操作符是否都使用了一致的资源池?
结论
我理解一次应用所有这些技巧可能会让你感到不知所措。然而,采纳这些最佳实践更像是一种持续的过程,而不是一次性任务。例如,您可以从为所有新开发项目遵循这些指南开始,同时识别并优先处理现有DAG中需要改进的关键部分。逐步更新工作流中最重要的部分,使其逐步符合这些最佳实践。
通过持续地应用这些最佳实践,数据工程师们和团队可以确保其Airflow环境保持高效、可扩展和成本效益。实施这些最佳实践将帮助您构建稳定的和易于维护的管道,同时避免不必要的资源浪费和性能瓶颈。
[1] Apache Airflow。顶层 Python。(2025年3月发布)https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code
共同學(xué)習(xí),寫下你的評論
評論加載中...
作者其他優(yōu)質(zhì)文章