原创

Spark 核心篇 - 任务调度 2 Stage 提交

上一篇Spark 核心篇 - 任务调度のStage划分-大数据施工现场分析到stage划分阶段,下面进入Spark任务调度的第三个阶段,stage提交阶段。

Spark

进入stage 提交阶段,接下来看看源码。

  private def submitStage(stage: Stage) {
        //根据stage获取job id
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      // 既不在waitingStages中,也不在runningStages中,还不在failedStages中
      // 说明未处理过
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        //调用getMissingParentStages()方法,获取stage还没有提交的parent
        val missing = getMissingParentStages(stage).sortBy(_.id)
        if (missing.isEmpty) {
                    //若missing为空代表当前stage的父stage都已经进行了提交,或者没有父stage。
                    //则表示当前task可以提交了。
          submitMissingTasks(stage, jobId.get)
        } else {
          for (parent <- missing) {
                        //遍历整个missing,递归调用,向前查找,全部提交。
            submitStage(parent)
          }
                    //讲这个stage加入等待队列
          waitingStages += stage
        }
      }
        //若没有找到对应的job,那么放弃提交这个stage
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }

submitStage方法逻辑没有太复杂,首先根据要提交的stage获取绑定的jobid,若没有找到,那么放弃提交这个stage。
若找到了,那么判断当前stage是否在三个集合中,分别是waitingStages、runningStages、failedStages,不在则说明stage没有提交。

首先调用getMissingParentStages()方法,获取stage还没有提交的parent,即missing;如果missing为空,说明该stage要么没有parent stage,要么其parent stages都已被提交,此时该stage就可以被提交,用于提交的方法submitMissingTasks()稍后分析。

在提交的过程中,最主要的还是如何获取missing,进入getMissingParentStages方法,先看源码

  private def getMissingParentStages(stage: Stage): List[Stage] = {
    waitingForVisit += stage.rdd
        ......
    def visit(rdd: RDD[_]) {
      if (!visited(rdd)) {
        visited += rdd
                //根据rdd获取缓存信息,若已经缓存了,那么将不在进行处理划分,同时在getCacheLocs内部
                //会根据rdd的存储级别做缓存处理。
        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
        if (rddHasUncachedPartitions) {
                  // 若没有缓存,循环rdd的dependencies
          for (dep <- rdd.dependencies) {
            dep match {
                            // 宽依赖 调用getOrCreateShuffleMapStage
              case shufDep: ShuffleDependency[_, _, _] =>
                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                if (!mapStage.isAvailable) {
                                    //判断当前stage是否可用,不可用的话尝试将其进行提交。
                  missing += mapStage
                }
                            // 窄依赖 压入待遍历的栈
              case narrowDep: NarrowDependency[_] =>
                waitingForVisit.prepend(narrowDep.rdd)
            }
          }
        }
      }
    }
    while (waitingForVisit.nonEmpty) {
      visit(waitingForVisit.remove(0))
    }
    missing.toList
  }

此处与stage划分阶段的getParentStages()方法、getAncestorShuffleDependencies()方法结构类似,最后拿到的missing是一个stage的集合,获取的方式也是和之前的方式一样,从最后的rdd向前查找。

在找到了missing后,若missing为空代表当前stage的父stage都已经进行了提交,或者没有父stage。则表示当前task可以提交了。则就进入了submitMissingTasks()方法,下一节再详细分析。

接下来用一张图简要说明具体的流程:

20190910233341754.png

  1. 在handleJobSubmitted方法中获取到了最后一个stage ResultStage3,通过submitStage进行该stage的提交;
  2. 再submitStage先创建Job实例,然后判断该stage是否存在父stage,由图可以看见他存在两个,stage 0和2,所以不能立即提交3,把3放入等待调度的stage中。
  3. 继续递归调用,可以知道stage0不存在父stage,而stage2存在一个父stage1,这样将2加入等待队列,将0和1使用submitMissingTask进行提交。
  4. 在Executor执行完成后通知DAGScheduler更新状态,检查运行状态,若有失败的任务那么重新执行。若全部完成则第二次提交stage2。
  5. stage2执行完成后,ResultStage3的父stage全部完成,可以进行提交ResultStage3。
sev7e0
Write by sev7e0
end
本文目录