Airflow DAG定义与调度原理
请介绍Apache Airflow中DAG的核心概念、DAG定义方式、调度器(Scheduler)的工作原理以及依赖管理和重跑机制。
回答
孤独的心
一、DAG核心概念:
DAG(Directed Acyclic Graph):
- 有向无环图,表示工作流
- 节点(Node)= Task(Operator实例)
- 边(Edge)= 依赖关系(
>>或set_downstream())
Operator类型:
BashOperator:执行Shell命令PythonOperator:执行Python函数Sensor:等待外部条件满足KubernetesPodOperator:K8s PodHiveOperator/SparkSubmitOperator等
二、DAG定义示例:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'owner': 'data',
'start_date': datetime(2025, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('etl_pipeline', schedule_interval='0 8 * * *', default_args=default_args)
t1 = BashOperator(task_id='extract', bash_command='echo extract', dag=dag)
t2 = BashOperator(task_id='transform', bash_command='echo transform', dag=dag)
t3 = BashOperator(task_id='load', bash_command='echo load', dag=dag)
t1 >> t2 >> t3
三、Scheduler工作原理:
- 解析DAG:定期扫描
DAGS_FOLDER,解析Python文件生成DAG对象 - 生成DAG Run:根据
schedule_interval和执行时间,创建新的DAG Run - 创建Task Instance:为DAG Run中的每个Task创建Task Instance
- 依赖检查:检查Task的上游是否全部成功
- Task调度:将可执行的Task Instance放入队列
- Worker执行:Worker(Celery/K8s/Local)从队列取Task执行
四、重跑机制:
- Clear:清除某个Task Instance的状态,使其重新执行
- Backfill:重跑历史日期的DAG Run(使用CLI
backfill) - Catchup:启动后是否补跑
start_date到当前未执行的所有DAG Run
依赖管理:
trigger_rule:all_success/all_failed/one_success/all_done等depends_on_past:依赖上一个DAG Run的同一Task执行成功