原创

Spark 核心篇 - 任务调度 1 Stage划分

上一篇分析到执行Action算子后,根据Rdd生成了任务,并将任务进行了提交,具体怎么分解这个任务,就是就是本篇的重点。

Stage 划分(也可以说是Job拆解)阶段主要工作还是由 DAGScheduler 完成,DAGScheduler 会先从给定的 RDD 反向进行遍历,根据依赖关系进行 stage 划分,这个也就是所谓的 stage 划分算法。

首先介绍一下相关的Stage,Stage是一个抽象类具体实现如下两种:

  • ResultStage:ResultStage在RDD的某些分区上应用函数来计算操作的结果。
  • ShuffleMapStage:ShuffleMapStage是执行DAG中为shuffle生成数据的中间阶段。它们发生在每次随机播放操作之前,并且可能包含多个流水线操作(例如,map和filter)。执行时,它们保存map输出文件,以后可以通过reduce任务提取这些文件

这是源码注释的翻译,具体点说ResultStage就是任务的最终结果,一个job中仅仅包含了一个ResultStage,所有的计算最终会汇聚成一个ResultStage,从下边的代码中也能看到,每个job只会调用一次createResultStage。

ShuffleMapStage就不同与ResultStage了,它代表了一个中间阶段,所以每个中间阶段会包含多个ShuffleMapStage。

Spark

先拿一张图作为示例,方便理解。

20190820182032327.png

在源码中相关工作是从handleJobSubmitted方法开始的:

  private[scheduler] def handleJobSubmitted(...) {
    var finalStage: ResultStage = null
    try {
            //根据finalRDD,创建ResultStage,只创建一个
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      .......
    }
    // Job submitted, clear internal data.
    barrierJobIdToNumTasksCheckFailures.remove(jobId)
        //创建一个DagScheduler中正在运行的job。job可以有两种类型:一种是计算result stage以执行操作
        //的结果作业,另一种是在提交任何下游阶段之前计算shufflemapstage的映射输出的映射阶段作业。
        // 这其中使用finalStage来区分两种类型
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    ......
        finalStage.addActiveJob(job)
    listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
  }

handleJobSubmitted方法中主要是根据提交的 finalRDD 创建一个ResultStage并为其创建一个job,这个job是当前这个任务独有的,他可以在多个 stage 之间共享。

为了更好的记住ResultStage是如何创建的,进入源码,在createResultStage方法内部,创建与提供的jobid关联的resultstage。

  private def createResultStage(...): ResultStage = {
    ......
        //调用getOrCreateParentStages方法,回溯查找依赖的 rdd,并创建 stage
        //获取或创建给定RDD的父stage列表。新阶段将使用提供的FirstJobID创建。
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

在getOrCreateParentStages方法中,其主要逻辑就是从当前 rdd 进行回溯,根据其dependencies属性找到ShuffleDependency类型的 rdd,
将他们都标记为parents,这个是在下一步中划分ShuffleMapStage的依据。

    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        getShuffleDependencies(rdd).map { shuffleDep =>
            getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
    }

以深度优先的方式遍历整个依赖关系,找到所有的ShuffleDependency类型,将其是作为parents。

  private[scheduler] def getShuffleDependencies(...): HashSet[ShuffleDependency[_, _, _]] = {
    ......
    waitingForVisit += rdd
        //为空时表示遍历到了最左侧的RDD
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.remove(0)
      if (!visited(toVisit)) {
        visited += toVisit
        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
                        //若为ShuffleDependency则需要再下一步中生成新的ShuffleMapStage
            parents += shuffleDep
          case dependency =>
                        //若不是的话,将其加入待遍历列表
            waitingForVisit.prepend(dependency.rdd)
        }
      }
    }
    parents
  }

getShuffleDependencies方法返回的其实是当前 rdd 的dependencies属性中所有 ShuffleDependency 类型的集合,他们是用来执行划分 stage 的重要依据,
其实也就是图中的 rddB、rddF、rddD,他们的每一个的接下来操作都是要进行 shuffle。

在找到了parents后,就开始查找或创建 stage,这一步可以说是完成了整个 stage 的划分,看源码:

  private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
        //如果shuffleidtomapstage中存在shuffle,则获取shuffle映射阶段。否则,如果shuffle map stage不存在,
        //则此方法将创建shuffle map stage,以及任何丢失的祖先shuffle map stage。
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        stage
      case None =>
        // 为所有丢失的祖先无序排列依赖项创建stage。
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
                        //创建ShuffleMapStage
            createShuffleMapStage(dep, firstJobId)
          }
        }
        // 最后,为给定 shuffle dependency创建 stage。
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }

这里需要先说明一下shuffleIdToMapStage,他是从shuffle dependency项ID映射到将为该依赖项生成数据的shufflemapstage。
仅包括属于当前正在运行的作业的stage(当需要shuffle阶段的作业完成时,映射将被删除,且shuffle数据的唯一记录将位于mapoutputtracker中)。

也就是说这里记录了当前 job 中已经创建好的 stage,直接从中获取即可,若没有创建好,那么就需要使用createShuffleMapStage进行创建。
不过在创建之前需要先去找到,尚未在shuffletomapstage中注册的祖先shuffle dependency。

  private def getMissingAncestorShuffleDependencies(rdd: RDD[_]): ListBuffer[ShuffleDependency[_, _, _]] = {
    ......
        waitingForVisit += rdd
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.remove(0)
      if (!visited(toVisit)) {
        visited += toVisit
        getShuffleDependencies(toVisit).foreach { shuffleDep =>
          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
            ancestors.prepend(shuffleDep)
            waitingForVisit.prepend(shuffleDep.rdd)
          } // Otherwise, the dependency and its ancestors have already been registered.
        }
      }
    }
        //找到所有为注册的祖先 stage
    ancestors
  }

在找到了所有的父节点后,就开始创建这个ShuffleMapStage。

  def createShuffleMapStage[K, V, C](
      shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
        ......
    val numTasks = rdd.partitions.length
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
        //创建新的ShuffleMapStage
    val stage = new ShuffleMapStage(
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
            ......
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }

截止目前图中所有的 stage 都已经完成了划分,并且注册到了shuffleIdToMapStage中,一个包含了三个ShuffleMapStage和一个 ResultStage ,
并且根据依赖关系,形成了一个新的关于 stage 的 DAG,同时每一个 stage 都已经绑定到了当前 job 上,使用唯一的 jobID 进行标识。

通俗一点讲:

  • 起点RDD的依赖列表,若遇到窄依赖,则继续遍历该窄依赖的父List[RDD]的依赖,直到碰到宽依赖;若碰到宽依赖(不管是起点RDD的宽依赖还是遍历多级依赖碰到的宽依赖),则以宽依赖RDD为起点再次重复上述过程。直到到达RDD依赖图的最左端,也就是遍历到了没有依赖的RDD,则进入下一步

  • RDD依赖图的最左端,即递归调用也到了最深得层数,getParentStagesAndId中,getParentStages第一次返回(第一次返回为空,因为最初的stage没有父stage),val id = nextStageId.getAndIncrement()也是第一次被调用,获得第一个stage的id,为0(注意,这个时候还没有创建第一个stage)。

创建第一个ShuffleMapStage。至此,这一层递归调用结束,返回到上一层递归中,这一层创建的所有的ShuffleMapStage会作为下一层stage的父List[stage]用来构造上一层的stages。上一层递归调用返回后,上一层创建的stage又将作为上上层的parent List[stage]来构造上上层的stages。依次类推,直到最后的ResultStage也被创建出来为止。整个stage的划分完成。

有一个需要注意的点是,无论对于ShuffleMapStage还是ResultStage来说,task的个数即该stage的finalRDD的partition的个数,仔细查看下上文中的newResultStage和newShuffleMapStage函数可以搞明白这点,不再赘述。

接下来的工作就是对整个划分好的 stage 进行提交。

sev7e0
Write by sev7e0
end
本文目录