照片由 Dan Roizer 拍摄,来自 Unsplash
Apache Airflow 是数据领域中最受欢迎的编排工具之一,支持世界各地公司的工作流。然而,在生产环境中使用 Airflow,尤其是复杂环境中的使用者,都知道它偶尔会出现一些问题和奇怪的 bug。
在您需要管理的众多方面中,有一个关键指标常常被忽略:DAG(有向无环图)解析时间。监控并优化解析时间,以避免性能瓶颈,并确保编排的正确运行至关重要。接下来我们将在本文中进一步探讨这个问题。
说起来,本教程主要介绍 [**airflow-parse-bench**](https://github.com/AlvaroCavalcante/airflow-parse-bench)
,这是一个我开发的开源工具,帮助数据工程师监控和优化他们的Airflow环境,帮助他们获得降低代码复杂度和减少解析时间的见解。
关于 Airflow,DAG 解析时间常常被忽略。每当 Airflow 处理您的 Python 文件来动态构建 DAG 时,就会进行解析。
默认情况下,所有的DAGs每30秒都会被解析一次——这个解析间隔由配置变量[_min_file_processinterval]控制。这意味着每30秒,系统会读取、导入并处理dags
文件夹中的所有Python代码,生成包含待调度任务的DAG对象。成功处理的文件会被添加到[DAG Bag]中。
两个关键的Airflow(Airflow工具)组件负责处理这一过程:
- DagFileProcessorManager:检查哪些文件需要被处理。
- DagFileProcessorProcess:执行实际的文件解析工作。
这两个组件(通常称为DAG处理器)由Airflow的调度器共同执行,确保在触发之前更新DAG对象。然而,为了可扩展性和安全性,也可以将DAG处理器作为集群中的独立组件运行。
如果你的环境只有几十个DAG,解析过程不太可能引起任何问题。然而,生产环境中通常有几百甚至上千个DAG。在这种情况下,如果解析时间过长,可能会导致:
- 推迟DAG调度。
- 提升资源利用率。
- 环境心跳异常。
- 调度器故障。
- 过度占用CPU和内存,造成资源浪费。
怎样写出更好的DAG?试想一下拥有一个包含数百个DAG的环境,这些DAG中包含不必要的复杂解析规则。低效率很快就会变成严重的问题,影响整个Airflow环境的稳定性和性能。
在编写Airflow DAG的时候,需要注意一些重要的编写最佳实践,以优化代码。虽然你可以找到很多关于如何改进DAG的教程,但我将总结一些可以显著提升DAG性能的关键点。
限制最高层级的代码量导致DAG解析时间较长的一个常见原因是顶层代码效率低或过于复杂。每次调度器解析Airflow DAG文件时,都会执行该文件中的所有顶层代码。如果这些代码包含资源消耗大的操作,如数据库查询、API调用或动态任务生成,则会显著影响解析速度。
以下代码展示了一个未经优化的有向无环图(DAG)的例子:
在这种情况下,每次调度器解析文件时,顶层代码会被执行,从而发起 API 请求并处理 DataFrame,这可能显著影响解析时间。
另一个导致解析缓慢的重要因素是顶级的导入。每个在顶级导入的库都会在解析时加载到内存中,这可能需要较长时间。为了避免这种情况,你可以将导入语句移到函数或任务定义里。
下面这段代码展示了一个更好的相同DAG版本:
尽量不要在顶层代码里用 Xcoms(或直接保留英文)和变量仍然在讨论同一个话题,特别重要的是避免在代码中使用 Xcoms 和 Variables。正如 Google 官方文档中所述:
减少 Airflow DAG 解析时间:Google 官方文档
如果你在顶层代码中使用
Variable.get()
,每次解析.py
文件,Airflow 都会执行Variable.get()
,这会打开一个到数据库的会话。这可能会显著延长解析时间。
可以考虑使用JSON字典数据结构在单一数据库查询中获取多个变量,而不是多次调用Variable.get()
方法。或者,可以考虑使用Jinja模板,因为这种获取方式下的变量仅在任务执行期间处理,而不是在DAG解析时处理。
虽然这看起来很明显,但定期清理环境中不再需要的DAGs和其他文件等等始终很有必要。
- 移除不再使用的DAG:检查您的
dags
文件夹,并删除不再需要的DAG文件。 - 使用
.airflowignore
文件:指定Airflow应该忽略的文件,跳过解析过程。 - 审查已暂停的DAG:已暂停的DAG仍然会被调度器解析,消耗系统资源。如果不再需要这些DAG,可以考虑删除或存档。
最后,你可以通过更改一些Airflow配置来减少Scheduler使用的资源,比如。
**min_file_process_interval**
: 此设置控制Airflow每多少秒解析一次您的DAG文件。将它从默认的30秒增加,可以减少调度器的负载,但会导致DAG更新变慢。**dag_dir_list_interval**
: 此设置控制Airflow每多少秒扫描一次dags
目录以查找新DAG。如果您很少部署新DAG,则可以考虑增加此间隔以减少CPU的使用量。
我们已经讨论了很多关于创建优化的DAG的重要性,以维护健康的Airflow环境。但是,你是如何测量DAG的解析时间的呢?幸运的是,根据你的Airflow部署和操作系统,有几种方法可以实现这一点。
例如,如果你有一个Cloud Composer部署环境,你可以通过在Google Cloud CLI上执行以下命令轻松地获取DAG解析报告。
gcloud composer 环境运行 $ENVIRONMENT_NAME \ — 位置 $LOCATION \ dag 报告
虽然获取解析指标很简单,但衡量代码优化的效果却没那么容易。每次修改代码后,都需要重新上传更新的Python文件到云服务提供商,等待DAG解析完成,然后提取新的报告——这通常是一个缓慢且耗时的过程。
另一种可能的方法是,如果你用的是 Linux 或 Mac,可以运行这个命令来测量解析耗时:
(这里的命令具体取决于你使用的工具和环境)
time python airflow/example_dags/example.py
不过,虽然这种方法简单,但在实际操作中并不实用,无法系统性地测量和比较多个有向无环图的解析所需时间。
为了应对这些难题,我开发了一个名为**airflow-parse-bench**
的工具,用于简化测量和比较您DAG解析时间的过程,采用Airflow的原生解析方法。
这个工具使得记录解析时间、在您的 DAGs 中轻松进行标准比较以及存储和对比结果变得轻松。
如何安装库在安装之前,建议使用虚拟环境virtualenv以避免库冲突。设置好之后,可以通过运行以下命令来安装此包:
在命令行中运行以下命令来安装 airflow-parse-bench
:
pip install airflow-parse-bench
注意: 该命令只安装最基本的依赖项(与Airflow及其提供程序相关)。您需要自己安装任何DAG依赖的其他库。
例如,如果一个DAG任务使用boto3
来与AWS交互,确保你已经安装了boto3
。否则,你将会遇到解析错误。
之后,你需要初始化你的Airflow数据库。这可以通过执行以下命令来实现:
<your_command_here>
请将 <your_command_here>
替换为实际的初始化命令。
airflow db init
初始化 Airflow 数据库
另外,如果你的 DAG 中使用了 Airflow 变量,你也需要将它们在本地定义。但是,你不需要给这些变量赋真实的值(但可以留空),因为这些值在解析时并不重要。
airflow variables set MY_VARIABLE 'ANY TEST VALUE'
没有这个,你会遇到这样的错误。
「错误:未定义变量 MY_VARIABLE」
如何使用工具
安装库之后,你可以开始测量解析时间(parse时间)。例如,你有一个名为 dag_test.py
的文件,这包含了上面示例中提到的未优化的 DAG 代码。
要测量它的解析所需时间,只需运行:
运行以下命令来解析测试文件:
airflow-parse-bench --path dag_test.py
该执行产生了如下输出。
执行结果如下。图片由作者提供。
我们观察到我们的DAG用了0.61秒的解析耗时。如果我再运行一次该命令,会看到一些细微的差别,由于系统和环境因素,不同运行间解析耗时可能会有些微差异。
另一次相同DAG(有向无环图)执行的结果。作者提供图片。
为了更简洁地呈现数字,可以通过指定迭代的次数来聚合多次执行。
airflow-parse-bench --path dag_test.py --num-iterations 5
虽然这需要多花一点时间,但它会计算五次执行的平均解析时长。
现在,为了评估上述优化的影响,我将dag_test.py
中的代码替换成之前分享的优化版本。运行相同的命令后,得到了如下结果:
这是优化代码的解析结果。图片由作者提供。
正如所见,仅仅通过应用一些良好的实践,就能够将DAG解析时间减少了近 0.5秒,突显了我们所做的这些改变的重要性!
更深入地了解这个工具还有一些其他我觉得值得分享的有趣的特点,我想分享一下。
提醒一下,如果你在使用该工具时有任何疑问或遇到问题,可以访问GitHub上的完整文档。
另外,查看库支持的所有参数,运行即可。
airflow-parse-bench --help
多个DAG的测试
注:此处“DAG”通常保留原样,表示有向无环图。
在大多数情况下,你可能有几十个DAG需要测试解析时间。为了处理这种情况,我建了一个叫dags
的文件夹,并在里面放了四个Python文件。
只需在--path
参数中指定文件夹路径,即可测量该文件夹中所有DAG(有向无环图)的解析耗时。
运行以下命令来解析DAG路径:
airflow-parse-bench --path my_path/dags
运行这个命令会生成一个表格,列出文件夹中所有DAG的处理时间汇总。
测试多个DAG的解析速度,图片来源:作者。
默认情况下,表格数据是从最快到最慢的 DAG(有向无环图)进行排序的。不过,您可以通过添加 --order
参数来改变排序规则,从慢到快。
airflow-parse-bench --path 我的路径/my_path/dags --order 降序
排序顺序反转。作者提供图片。
跳过未改动的DAGs (有向无环图)--skip-unchanged
参数在开发阶段特别有用。正如其名,此选项会跳过那些自上次运行后未被修改的 DAG 的解析执行。
运行以下命令来解析基准:
airflow-parse-bench --path my_path/dags --skip-unchanged
如下图所示,当DAG保持不变时,输出结果表明解析耗时没有显示出差异。
对于未更改的文件,不显示任何差异。图片由作者提供。
把数据库重置一下所有DAG信息,包括度量指标和历史记录,都存储在一个本地的SQLite数据库中。如果你想清除所有存储的数据并从头再来,可以使用--reset-db
参数。
# 本命令用于解析并重置DAG路径的数据库,例如:my_path/dags
airflow-parse-bench --path my_path/dags --reset-db
此命令会重置数据库,并像第一次执行时那样处理 DAGs。
结论部分解析耗时是保持Airflow环境的可扩展性和高效性的重要指标,特别是当编排需求变得越来越复杂时,这一点尤为重要。
因此,[**airflow-parse-bench**](https://github.com/AlvaroCavalcante/airflow-parse-bench)
可以帮助数据工程师更好地创建 DAG,是一个重要的工具。通过本地测试 DAG 的解析时间,你可以轻松快速地找到代码中的瓶颈,从而使 DAG 运行得更快、更高效。
由于代码是在本地执行的,生成的解析时间将不同于您的 Airflow 集群中的解析时间。然而,如果能够在本地机器上减少解析时间,同样的结果可能在您的云端环境中实现。
最后,这个项目开放合作的!如果你有建议、想法或改进点,欢迎在Github上提交您的贡献。谢谢大家的支持!
参考文献 [最大化Cloud Composer的优势,同时减少解析时间 | Google Cloud 博客低DAG解析时间可以作为健康Cloud Composer环境的可靠指标。
](https://cloud.google.com/blog/products/data-analytics/reduce-airflow-dag-parse-times-in-cloud-composer?source=post_page-----146fcf4d27f7--------------------------------)
Cloud Composer就像引擎,Apache Airflow DAGs就像燃料。本指南提供了一些建议……cloud.google.com](https://cloud.google.com/blog/products/data-analytics/optimize-cloud-composer-via-better-airflow-dags?source=post_page-----146fcf4d27f7--------------------------------)
[调度器 - Apache Airflow 文档Apache Airflow 调度器会监控所有任务和 DAG,然后在它们的依赖条件满足后触发任务实例的运行……
airflow.apache.org](https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/scheduler.html?source=post_page-----146fcf4d27f7--------------------------------#fine-tuning-your-scheduler-performance) (关于如何调整调度器性能)
共同學習,寫下你的評論
評論加載中...
作者其他優(yōu)質文章