Spark DAG与Stage划分机制
请详细说明Spark中DAG(有向无环图)的生成过程,以及DAGScheduler如何将DAG划分为Stage。
回答
小字辈
DAG生成流程:
- 用户代码中每调用一个Transformation算子(如map/filter/join),生成一个新的RDD
- RDD通过依赖关系(Dependency)链接成RDD Lineage图
- 当调用Action算子(如collect/count/saveAsTextFile)时触发Job提交
- SparkContext将执行链提交给DAGScheduler
DAGScheduler任务:
- 解析RDD之间的依赖关系
- **从后往前(逆拓扑排序)**遍历RDD链
- 遇到**宽依赖(ShuffleDependency)**就划分Stage
- 窄依赖合并到同一Stage(pipeline优化)
Stage划分详细示例:
textFile("hdfs://...") // RDD0: HDFS文件
.flatMap(_.split(" ")) // RDD1: 窄依赖
.map((_, 1)) // RDD2: 窄依赖
.reduceByKey(_+_) // RDD3: 宽依赖 ← Stage边界
.filter(_._2 > 10) // RDD4: 窄依赖
.saveAsTextFile("...") // Action
划分结果:
- Stage0(ShuffleMapStage):RDD0→RDD1→RDD2(窄依赖管道)
- Stage1(ShuffleMapStage):RDD3(Reduce端)
- Stage2(ResultStage):RDD4 + Action
Stage执行特点:
- Stage内的Task完全独立、无依赖,可并行执行
- 前序Stage全部完成后,后续Stage才开始(Fork/Join模型)
- DAGScheduler会考虑数据本地性,将Task调度到数据所在节点