原创

Spark 核心篇 - 任务调度 3 提交Task

上一篇文章分析到stage提交,在该阶段后,在submitMissingTasks方法中会根据stage阶段的partition个数拆分对应个数的Task,这些task组成一个TaskSet提交到TaskScheduler进行处理。接下来进入源码详细分析。

Spark

DAGScheduler submitMissingTasks方法中,会根据不同的stage类型生成不同的task,其中ResultStage生成ResultTaskShuffleMapStage生成ShuffleMapTask,对于每个任务集包含了stage划分阶段
的多有任务,这些任务的处理逻辑完全一样,不同的是每个任务处理的数据不同,也就是下边源码中的第一行,找到的partition不同。

  private def submitMissingTasks(stage: Stage, jobId: Int) {
    // 针对不同的Stage类型 返回需要计算的 partition
    val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
    // Use the scheduling pool, job group, description, etc. from an ActiveJob associated
    // with this Stage
    val properties = jobIdToActiveJob(jobId).properties
    //将stage加入runningStages列表
    runningStages += stage
    stage match {
      case s: ShuffleMapStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
      case s: ResultStage =>
        outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
    }
    // 根据上边获取到的partitions去获取到当前在本地的数据,这样做是为了减少不必要的网络传输。
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    }
    ......
    // 通过使用新的尝试ID创建新的stageinfo来创建此阶段的新尝试。
    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
    // 如果partitionsToCompute不为空,则表示存在未执行的任务,获取任务执行的时间
    if (partitionsToCompute.nonEmpty) {
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
    // 发送消息,当前stage已经进行了提交
    listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
    ......
    //构建task
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptNumber,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
              stage.rdd.isBarrier())
          }
      }
    }
    ......
    // 以任务集的方式进行提交。
    if (tasks.nonEmpty) {
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      // 将任务标记为完成
      markStageAsFinished(stage, None)

      stage match {
        case stage: ShuffleMapStage =>
          logDebug(s"Stage ${stage} is actually done; " +
              s"(available: ${stage.isAvailable}," +
              s"available outputs: ${stage.numAvailableOutputs}," +
              s"partitions: ${stage.numPartitions})")
          markMapStageJobsAsFinished(stage)
        case stage : ResultStage =>
          logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
      }
      // 提交他的子stage,从这里开始递归调度
      submitWaitingChildStages(stage)
    }
  }

submitMissingTasks方法较长,这也表明他做了很多工作,我们可以看到,它维护了DAGScheduler中三个stage队列,同时他负责创建task,紧接着创建taskset,以及负责stage消息的广播等等。
这也是在结束掉任务调度开始阶段中DAGScheduler模块的收尾工作,接下来的一系列工作将交由askScheduler处理。

TaskScheduler之前提到过,他的主要工作就是接受DAGScheduler传递过来的TaskSet,然后将这些任务分发到各个Work节点运行,交由Work中的Executor进行执行。

  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      //创建任务集管理器,用于管理当前任务集的生命周期
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets.foreach { case (_, ts) =>
        ts.isZombie = true
      }
      stageTaskSets(taskSet.stageAttemptId) = manager
      // 将任务集的的管理器加入到调度池中,由系统统一调配
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    // 调用调度器的backend的reviveOffers方法进行资源分配并运行。
    backend.reviveOffers()
  }

这里需要重点掌握reviveOffers方法,他是来自 SchedulerBackend trait的实现类实现的方法,针对不同的启动模式将会有不同的实现类,常用的三个实现类

  • CoarseGrainedSchedulerBackend:用于mesos
  • StandaloneSchedulerBackend:用于standalone cluster模式
  • LocalSchedulerBackend: 在运行本地版本的spark时使用,其中executor、backend和master都在同一个jvm中运行。

不同的模式将会有不一样的通信方式,该方法会向DriverEndPoint或者LocalEndPoint发送消息,通过receive方法匹配并调用其makeOffers方法。

override def receive: PartialFunction[Any, Unit] = {
    case StatusUpdate(executorId, taskId, state, data, resources) =>
      .....
    case ReviveOffers =>
      makeOffers()
}
// Make fake resource offers on all executors
private def makeOffers() {
  // 此处加锁,防止在lunch的过程中Executor被杀死,产生错误
  val taskDescs = withLock {
    // 过滤掉已经被杀死的Executor,留下存活的
    if (executorIsAlive(executorId)) {
      val executorData = executorDataMap(executorId)
      // 构造一个WorkerOffer对象,标识当前Executor上可以使用的资源
      val workOffers = IndexedSeq(
        WorkerOffer(executorId, executorData.executorHost, executorData.freeCores,
          Some(executorData.executorAddress.hostPort),
          executorData.resourcesInfo.map { case (rName, rInfo) =>
            (rName, rInfo.availableAddrs.toBuffer)
          }))
      // 由群集管理器调用以在从属服务器上提供资源。
      // 通过询问活动任务集合按优先级排列的任务来作出响应。以循环方式向每个节点填充任务,以便在集群中平衡任务。
      // 构造传递给要执行的Executor的任务描述。并返回Task描述的集合
      scheduler.resourceOffers(workOffers)
    } else {
      Seq.empty
    }
  }
  if (taskDescs.nonEmpty) {
    launchTasks(taskDescs)
  }
}

makeOffers方法主要的工作就是构造TaskDescription,它包含了Executor执行时所需要的任务信息,也是Driver发送给Work所携带的数据。
TaskDescription源码中这样写。

/**
 * Description of a task that gets passed onto executors to be executed, usually created by
 * `TaskSetManager.resourceOffer`.
 *
 * TaskDescriptions and the associated Task need to be serialized carefully for two reasons:
 *
 *     (1) When a TaskDescription is received by an Executor, the Executor needs to first get the
 *         list of JARs and files and add these to the classpath, and set the properties, before
 *         deserializing the Task object (serializedTask). This is why the Properties are included
 *         in the TaskDescription, even though they're also in the serialized task.
 *     (2) Because a TaskDescription is serialized and sent to an executor for each task, efficient
 *         serialization (both in terms of serialization time and serialized buffer size) is
 *         important. For this reason, we serialize TaskDescriptions ourselves with the
 *         TaskDescription.encode and TaskDescription.decode methods.  This results in a smaller
 *         serialized size because it avoids serializing unnecessary fields in the Map objects
 *         (which can introduce significant overhead when the maps are small).
 */

简单解释其实就是说,该对象是在序列化的过程中要格外的仔细,因为他包含了Executor所需要的Jars文件,和他们的classpath,以及属性文件.
同时,为了更加高效的进行发送,在序列化之前先进行encode,这样做是为了减小对象的大小,因为它避免了对映射对象中不必要的字段进行序列化。

makeOffers方法的最后将taskDescs传递给了launchTasks,这也是task提交的最后阶段。看源码。

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    // 调用TaskDescription.encode进行编码,上边刚刚提到的
    val serializedTask = TaskDescription.encode(task)
    // 判断是不是大于RPC的最大限制
    if (serializedTask.limit() >= maxRpcMessageSize) {
      Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr =>
        ......
      }
    }
    else {
      // 根据id获取对应的Executor数据
      val executorData = executorDataMap(task.executorId)
      // 在这块进行资源的分配,在任务完成时释放。
      executorData.freeCores -= scheduler.CPUS_PER_TASK
      task.resources.foreach { case (rName, rInfo) =>
        assert(executorData.resourcesInfo.contains(rName))
        executorData.resourcesInfo(rName).acquire(rInfo.addresses)
      }
      // 使用executorEndpoint发送序列化后的Task信息。
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

launchTasks方法是Spark整个任务执行前Driver端最后一个处理步骤,方法主要功能就是将TaskDescription按照计划发送给Work,
在这之后的工作交由Work端处理,当然这只是任务提交阶段,后续可能会有任务失败恢复,等同样需要处理。

结合一张调用关系类图更加简单明了:

20190820150255492.png

sev7e0
Write by sev7e0
end
本文目录