原创

Spark 核心篇 - 任务调度 0 提交作业

上一篇文章Spark 核心篇 - RDD 实现的理解分析到,RDD 执行 Action 操作整个 Spark 任务进入调度阶段,这时候就要想一下,何为调度,为什么要调度?

  • 何为调度?

    调度也可以理解成为资源的调度和任务的调度,在像 Spark 这种大型分布式集群应用中,每台机器能够提供的资源,和任务的执行都是不稳定的,当一个 Spark 任务跑在集群中时,我们要考虑到机器资源的分配\优先在 RDD 所在的分区进行计算(数据本地性)\任务失败\任务推测执行等一系列的问题,解决这些问题就需要一个完善的调度系统进行调度. 我们这里主要提到的是分布式任务调度,后面再详细分析资源的调度.想要了解更多可以参考开源分布式任务调度框架.XXL-job\Airflow等.

Spark

回到 Spark, 先简单分析一下 Action 之后的一些流程。在 Action 算子执行时,SparkContext(SC) 会进行Job提交,此后调度流程进入DAGScheduler(DS).DS 会对 Job 进行解析,根据 DAG 图(linage)进行 Stage 的划分。之后将 Stage 进行提交,DS 继续将 Stage 进行解析,根据不同的 Stage 创建出不同的 Task,Task 创建完成后将 Task 构造成 TaskSet,交由TaskScheduler(TS)。由TS将 TaskSet 分配出去到每一个Executor,Executor 根据收到的 TaskSet 开始执行任务。

这一过程就是在 RDD 创建提交后,直到所有 Executor 开始执行任务前的大概流程。附上一张我自己觉得较为清晰的调用流程。

test

先记录图中主要涉及到的几个主要概念:

  • 作业(Job):RDD 中由Action 操作所生成的作业,其中可能包含了一个或者多个 Stage。
  • 调度阶段(Stage):每个 job 会因为 RDD 之间的依赖关系(lineage)拆分成多种任务集合,称为 Stage,也叫做(TaskSet),Stage 的划分是由 DAGScheduler 实现,Stage 是一个抽象类,他有两种具体实现Shuffle Map StageResult Stage
  • 任务(Task):Task 是 Driver 分发到 Executor 上的工作任务单元,也是 Spark 中任务执行的最小单元,它分为ResultTask,ShuffleMapTask
  • DAGScheduler:DS是 Stage 的调度器,主要负责接受 SC 提交的 Job,可以说是负责 Spark 任务的逻辑调度,它会根据Lineage 将 Job 进行拆解,形成新的 Stage,并将 Stage 提交个下一个调度器 TS。
  • TaskScheduler:TS 是 Task 的调度器,主要负责接受 DS 提交的 Stage,负责将划分好的具体任务进行调度,组装成 TaskSet 发送到 Work 节点,最后交给 Driver 申请到的 Executor 资源进行任务的执行。

整体调度流程分析:

提交作业

图中左上角,SC 调用 DAGScheduler 提供的 runJob 进行任务的提交,进入 runJob 源码:

  def runJob[T, U](...){
        ......
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        ......
      case scala.util.Failure(exception) =>
        ......
    }
  }

从源码中可以看到,runJob 调用了 submitJob 进行了提交,同时这里进行了阻塞产生了一个waiter对象,根据waiter返回的不同的结果进行不同的日志打印或者异常抛出。

这其中需要深入理解分析一下 submitJob ,进入源码:

  def submitJob[T, U](...): JobWaiter[U] = {
    // 略去相关校验
        ......
    val jobId = nextJobId.getAndIncrement()
    if (partitions.isEmpty) {
         //partitions为空表示无数据,发送任务结束的消息
    }
    assert(partitions.nonEmpty)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

在方法的最后可以看到调用了eventProcessLoop的 post 方法,简单介绍一下,DAGSchedulerEventProcessLoop是 DAGScheduler 的内部类,主要负责用来进行消息的发送与接收,在使用了 post 后他的 onReceive 将会接收到消息,之后通过doOnReceive进行模式匹配,源码如下:

  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
        //这里主要介绍上边调用到的JobSubmitted类型,其他的 case 的工作需要单独了解,
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
        ......
  }

至此整体的Job提交阶段已经完成,从handleJobSubmitted开始主要就是 Stage 的划分阶段,下一篇再详细介绍。

sev7e0
Write by sev7e0
end
本文目录