CodeWalk

Airflow DAG定义与调度原理

作者:孤独的心 · 2026-05-30 12:55

请介绍Apache Airflow中DAG的核心概念、DAG定义方式、调度器(Scheduler)的工作原理以及依赖管理和重跑机制。

回答

孤独的心

一、DAG核心概念:

DAG(Directed Acyclic Graph):

  • 有向无环图,表示工作流
  • 节点(Node)= Task(Operator实例)
  • 边(Edge)= 依赖关系(>>set_downstream()

Operator类型:

  • BashOperator:执行Shell命令
  • PythonOperator:执行Python函数
  • Sensor:等待外部条件满足
  • KubernetesPodOperator:K8s Pod
  • HiveOperator/SparkSubmitOperator

二、DAG定义示例:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
    'owner': 'data',
    'start_date': datetime(2025, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('etl_pipeline', schedule_interval='0 8 * * *', default_args=default_args)

t1 = BashOperator(task_id='extract', bash_command='echo extract', dag=dag)
t2 = BashOperator(task_id='transform', bash_command='echo transform', dag=dag)
t3 = BashOperator(task_id='load', bash_command='echo load', dag=dag)

t1 >> t2 >> t3

三、Scheduler工作原理:

  1. 解析DAG:定期扫描DAGS_FOLDER,解析Python文件生成DAG对象
  2. 生成DAG Run:根据schedule_interval和执行时间,创建新的DAG Run
  3. 创建Task Instance:为DAG Run中的每个Task创建Task Instance
  4. 依赖检查:检查Task的上游是否全部成功
  5. Task调度:将可执行的Task Instance放入队列
  6. Worker执行:Worker(Celery/K8s/Local)从队列取Task执行

四、重跑机制:

  • Clear:清除某个Task Instance的状态,使其重新执行
  • Backfill:重跑历史日期的DAG Run(使用CLIbackfill
  • Catchup:启动后是否补跑start_date到当前未执行的所有DAG Run

依赖管理:

  • trigger_ruleall_success/all_failed/one_success/all_done
  • depends_on_past:依赖上一个DAG Run的同一Task执行成功