Spark 调度系统#
一、主要工作流程#
两个核心:DAGScheduler
和TaskScheduler
build operator DAG
#
用户提交的 Job 将首先被转换为一系列RDD
并通过 RDD 之间的依赖关系 (Dependency
) 构建 DAG, 然后将 RDD 构成的 DAG 提交到调度系统
split graph into stage of tasks
#
DAGScheduler
负责接收由 RDD 构成的 DAG,将一系列 RDD 划分到不同的Stage
。根据 Stage 的不同类型 (ResultStage和 ShuffleMapStage
), 给 Stage 中未完成的 Partition
创建不同类型的 Task (ResultTask和 ShuffleMapTask
)。 每个 Stage 将因为未完成 Partition 的多少,创建零到多个 Task。DAGScheduler 最后将每个 Stage 中的 Task 以任务集合 (TaskSet
) 的形式提交给TaskSchduler
继续处理
launch tasks via cluster manager
#
使用集群管理器 (cluster manager
) 分配资源与任务调度,对于失败的任务还会有一定的重试与容错机制。TaskScheduler
负责从DAGScheduler
接收TaskSet
, 创建TaskSetManager
对 TaskSet 进行管理,并将此 TaskSetManager 添加到调度池
中,最后将对 Task 的调度交给后端接口 (SchedulerBackend
) 处理。SchedulerBackend 首先申请 TaskSheduler,按照 Task 调度算法 (FIFO和FAIR
),对调度池中所有 TaskSetManager 进行排序,然后对 TaskSet 按照最大本地性原则分配资源,最后在各个分配的节点上运行 TaskSet 中的 Task
execute tasks
#
执行任务,并将任务中间结果和最终结果存入存储体系
二、RDD 详解#
RDD 简单总结#
参考RDD 总结
分区计算器Partitioner
#
当存在 shuffle 依赖关系时, 利用Partitioner
来确定上下游 RDD 之间的分区依赖关系
abstract class Partitioner extends Serializable {
# 用于获取分区数量
def numPartitions: Int
# 用于将输入的key映射到下游RDD的某一个分区
def getPartition(key: Any): Int
}
三、Stage#
官方解释:
/**
* A stage is a set of parallel tasks all computing the same function that need to run as part
* of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
* by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
* DAGScheduler runs these stages in topological order.
*
* Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
* other stage(s), or a result stage, in which case its tasks directly compute a Spark action
* (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
* track the nodes that each output partition is on.
*/
Stage 是一系列并行计算且具有相同依赖的 task 的集合,在执行流程中DAGScheduler
会将一系列 RDD 划分到不同的Stage
并构建它们之间的依赖关系,使不存在依赖关系的 Stage 并行执行,保证依赖关系的 Stage 顺序执行。Stage 分为需要处理 Shuffle 的ShuffleMapStage
和最下游的ResultStage
,ResultStage 是最后执行的 Stage,比如执行count()
等action
算子的任务
Job 中所有 Stage 的提交过程包括反向驱动与正向提交#
反向驱动#
所谓反向驱动,就是从最下游的 ResultStage 开始,由 ResultStage 驱动所有父 Stage 的执行,这个驱动过程不断向祖先方向传递,直到最上游的 Stage 为止
正向提交#
正向提交,就是前代 Stage 先于后代 Stage 将 Task 提交给 TaskScheduler, “代代相传”,直到 ResultStage 最后一个将 Task 提交给 TaskScheduler
四、DAGScheduler#
介绍#
DAGScheduler
通过计算将 DAG 中一系列 RDD 划分到不同的 Stage,然后构建这些 Stage 之间的关系,最后将每个 Stage 按照Partition
切分为多个 Task,并以 Task 集合 (即TaskSet
) 的形式提交给底层的TaskScheduler
DAGScheduler 依赖的一些组件: DAGSchedulerEventProcessLoop
, JobListener
及ActiveJob
JobListener与JobWaiter
#
JobListener
用于监听作业中每个 Task 执行成功或失败,JobWaiter
继承实现了 JobListener, 用于等待整个 Job 执行完毕,然后调用给定的处理函数对返回结果进行处理并最终确定作业的成功或失败
源码实现的一些思考:
- 通过 Scala 的异步编程
Future/Promise
实现了任务状态的检测监听,这块还不太理解,等后面系统学习一下 Scala 再回顾一下 - JobWaiter 中的定义了一个
cancel()
方法取消 Job 执行实际上调用了 DAGScheduler 的cancelJob()
方法 - JobWaiter 中继承 JobListener 实现的 taskSucceeded () 方法中为了保证线程安全用到了
synchronized
给对象加锁,联想到 RDD 中的stateLock
,也用到了 synchronized 加锁
ActiveJob
#
ActiveJob
表示已经激活的 Job, 即 DAGScheduler 接收处理的 Job
DAGSchedulerEventProcessLoop
#
DAGSchedulerEventProcessLoop
是DAGScheduler
内部的事件循环处理器,实现原理类似于 LiveListenerBus
DAGScheduler 与 Job 的提交#
简要流程概括,这里以执行 count 算子为例:
-
首先执行 count 算子会创建调用 SparkContext 的 runJob () 方法
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
-
然后接着传递调用 DAGScheduler 的 runJob () 方法
def runJob[T, U: ClassTag](...): Unit = { .... dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) .... }
-
DAGScheduler 通过执行 submitJob () 方法提交 Job
def runJob[T, U](...): Unit = { val start = System.nanoTime // 执行submitJob()方法 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format(...)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format(...)) ... } }
-
submitJob () 生成一个 JobWaiter 的实例监听 Job 的执行情况,向 DAGSchedulerEventProcessLoop 发送 JobSubmitted 事件
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted(....))
-
当 eventProcessLoop 对象投递了 JobSubmitted 事件之后,对象内的
eventThread
线程实例对事件进行处理,不断从事件队列中取出事件,调用onReceive
函数处理事件,当匹配到 JobSubmitted 事件后,调用 DAGScheduler 的handleJobSubmitted
函数并传入 jobid、rdd 等参数来处理 Job -
handleJobSubmitted 执行过程
private[scheduler] def handleJobSubmitted(...) { var finalStage: ResultStage = null try { finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { .... } ..... val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) ..... jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) }
由源码分析可以概括执行过程如下
-
首先调用 createResultStage 方法创建 ResultStage, 这里就开始了 Stage 的构建过程, 详见下节 Stage 的构建过程
-
创建 ActiveJob
-
将 JobId 与刚创建的 ActiveJob 的对应关系放入 jobIdToActiveJob 中
-
将刚创建的 ActiveJob 放入 activeJobs 集合中
-
使 ResultStage 的_activeJob 属性持有刚创建的 ActiveJob
-
获取当前 Job 的所有 Stage 对应的 StageInfo (即数组 stageInfos)
-
向 listenerBus (LiveListenerBus) 投递 SparkListenerJobStart 事件,进而引发所有关注此事件的监听器执行相应的操作
-
调用 submitStage 方法提交 Stage, 所有 parent Stage 都计算完成,才能提交
submitStage 三种调用逻辑:
submitMissingTasks(stage,jobId.get)
:如果所有 parent stage 已经完成,则提交 stage 所包含的 tasksubmitStage(parent)
:有 parent stage 未完成,则递归提交abortStage
:无效 stage,直接停止
-
-
DAGScheduler 完成任务提交后,在判断哪些 Partition 需要计算,就会为 Partition 生成 Task,然后封装成 TaskSet,提交至 TaskScheduler。等待 TaskScheduler 最终向集群提交这些 Task,监听这些 Task 的状态
构建 Stage#
前面我们看到 handleJobSubmitted 执行过程中调用 createResultStage 方法创建 ResultStage
private def createResultStage(....): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
根据源码分析,详细的处理步骤如下:
-
调用
getOrCreateParentStages
方法获取所有父 Stage 的列表,父 Stage 主要是宽依赖对应的 Stage -
getOrCreateParentStages 处理步骤:
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }
- 调用 DAGScheduler 的
getShuffleDependencies
方法获取当前给到的 RDD 的所有 ShuffleDependency 的序列,逐个访问其依赖的非 Shuffle 的 RDD,获取所有非 Shuffle 的 RDD 的 ShuffleDependency 依赖 - 调用 DAGScheduler 的
getOrCreateShuffleMapStage
方法为每一个 ShuffleDependency 获取或者创建对应的 ShuffleMapStage, 并返回得到的 ShuffleMapStage 列表
- 调用 DAGScheduler 的
-
getOrCreateShuffleMapStage 方法处理步骤:
-
啊
-
生成 Stage 的身份标识 id, 并创建 ResultStage
-
将 ResultStage 注册到 stageIdToStage 中
-
调用 updateJobIdStageIdMaps 方法更新 Job 的身份标识与 ResultStage 及其所有祖先的映射关系