Posted in

Cloud Composer 中的 Airflow DAG 和任务并发_AI阅读总结 — 包阅AI

包阅导读总结

1. 关键词:Airflow DAG、Cloud Composer、Concurrency、Task、Configuration

2. 总结:本文介绍了 Cloud Composer 中 Airflow DAG 和任务并发的相关配置,包括环境、安装、DAG、任务等方面的并发控制参数,还列举了一些故障排除场景及相应解决方案,并提供了后续学习的资源。

3. 主要内容:

– Cloud Composer 中控制并发的配置

– Composer 环境

– 工人最小/最大数量:影响并发任务执行数量

– Airflow 安装

– worker_concurrency:单个工人可拾取任务数量

– parallelism:整个安装中的最大运行任务数

– max_active_runs_per_dag:每个 DAG 的最大活跃运行数

– max_active_tasks_per_dag:每个 DAG 的最大活跃任务数

– DAG

– max_active_runs:DAG 的最大活跃运行数

– max_active_tasks:DAG 中并发运行的任务实例数量

– 任务

– max_active_tis_per_dag:控制每个任务的并发实例数

– 故障排除场景及解决方案

– 环境常达工人最大限制且任务队列高:增加工人数量或调整自动缩放值

– 任务调度延迟但环境未达最大工人数量:增加 worker_concurrency

– 并行运行同一 DAG 导致限流:增加 max_active_runs_per_DAG

– 单个 DAG 并行任务过多导致限流:增加 DAG 并发或调整相关配置

– 单个 DAG 同一任务并行过多导致限流:增加任务并发

– 任务不同时运行:并行取决于资源和环境配置

– 任务被限流:检查并发限制器图表和当前配置

– 传感器占用过多工人槽:根据检查间隔选择模式,或使用 deferrable=True

– 下一步

– 掌握相关并发配置以发挥 Cloud Composer 潜力,提供更多学习资源

思维导图:

文章地址:https://cloud.google.com/blog/products/data-analytics/airflow-dag-and-task-concurrency-in-cloud-composer/

文章来源:cloud.google.com

作者:Christian Yarros

发布时间:2024/7/25 0:00

语言:英文

总字数:2493字

预计阅读时间:10分钟

评分:87分

标签:数据工程,云计算,Airflow,Cloud Composer,任务并发


以下为原文内容

本内容来源于用户推荐转载,旨在分享知识与观点,如有侵权请联系删除 联系邮箱 media@ilingban.com

Summary

From top to bottom, these are the configurations that will provide full control over concurrency on Cloud Composer.

Composer environment

worker min/max count: more workers = more tasks that can be performed concurrently.

Airflow installation

worker_concurrency: higher concurrency = more tasks that get picked up by an individual worker. High value means that every worker can pick up a lot of tasks, so under certain circumstances the queue might never fill up, causing autoscaling to never trigger. Use Composer’s default value for most cases.

parallelism: maximum number of tasks running across an entire Airflow installation. parallelism=0 means infinite.

max_active_runs_per_dag: maximum number of active DAG runs, per DAG.

max_active_tasks_per_dag: maximum number of active DAG tasks, per DAG.

DAG

max_active_runs: maximum number of active runs for this DAG. The scheduler will not create new active DAG runs once this limit is hit. Defaults to core.max_active_runs_per_dag if not set.

max_active_tasks: the number of task instances allowed to run concurrently across all active runs of the DAG this is set on. If this setting is not defined, the value of the environment-level setting max_active_tasks_per_dag is assumed.

Task

max_active_tis_per_dag: controls the number of concurrent running task instances across dag_runs per task.

Troubleshooting scenarios

Scenario: Composer Environment frequently reaches maximum limits for workers, number of tasks in queue is consistently high, and DAGs do not meet their SLAs.

Solution: You can increase the number of workers in your Cloud Composer environment or introduce higher autoscaling min/max values.

Scenario: There are long inter-task scheduling delays, but at same time the environment does not scale up to its maximum number of workers

Solution: Increase worker concurrency ( [celery]worker_concurrency ). Worker concurrency must be set to a value that is higher than the expected maximum number of concurrent tasks, divided by the maximum number of workers in the environment.

Scenario: You run the same DAG many times in parallel, causing Airflow to throttle execution.

Solution: Increase max active runs per DAG (max_active_runs_per_dag, max_active_runs)

Scenario: A single DAG is running a large number of tasks in parallel, causing Airflow to throttle task execution

Solution: Increase DAG concurrency (max_active_tasks_per_dag, max_active_tasks) if you want to complete the single DAG as fast as possible. Decrease that DAG’s max_active_tasks value, or the environment level max_active_tasks_per_dag if you’d like other DAGs to run at the same time . Also, check to see if parallelism is not set to 0 (infinity).

Scenario: A single DAG is running the same task many times in parallel, causing Airflow to throttle the execution of that task.

Solution: Increase task concurrency. (max_active_tasks_per_dag, max_active_tasks, max_active_tis_per_dag)

Scenario: Tasks aren’t running at the same time.

Solution: In Airflow, parallelism depends on what resources are available to the airflow worker / airflow scheduler AND what your environment configuration is. There’s no guarantee that tasks will run at exactly the same time. All you can ensure is that Task A,B,C will complete before Task D

Scenario: Tasks are being throttled.

Solution: Check the Concurrency Limiters chart above and make a note of your current configurations.

Scenario: Sensors are taking up too many worker slots.

Solution: Sensor checking every n seconds (i.e. poke_interval < 60)? Use mode=poke. Sensor checking every n minutes (i.e. poke_interval >= 60)? Use mode=reschedule. A sensor in mode=reschedule will free up Airflow worker resources between poke intervals. For even better performance, opt to use deferrable=True for your Sensors. This will ignore the sensor mode and instead pass the poke_interval and process to the Airflow Triggerer, freeing up Airflow Worker resources for other tasks.

Next steps

Mastering Airflow DAG and task concurrency is essential for unlocking the full potential of Cloud Composer. By understanding the core concepts, configuring your environment effectively, and employing practical optimization strategies, you can orchestrate even the most complex data pipelines with confidence. For more information about Cloud Composer

To learn more about Cloud Composer, Apache Airflow, and the information discussed in this guide, consider exploring the following resources: