Spark Scheduling System#
I. Main Workflow#
Two cores: DAGScheduler
and TaskScheduler
build operator DAG
#
The Job submitted by the user will first be converted into a series of RDD
s and a DAG will be constructed through the dependencies between RDDs (Dependency
), and then the DAG composed of RDDs will be submitted to the scheduling system.
split graph into stage of tasks
#
DAGScheduler
is responsible for receiving the DAG composed of RDDs, dividing a series of RDDs into different Stages
. Depending on the type of Stage (ResultStage and ShuffleMapStage
), it creates different types of Tasks (ResultTask and ShuffleMapTask
) for the unfinished Partitions
in the Stage. Each Stage will create zero to multiple Tasks based on the number of unfinished Partitions. Finally, DAGScheduler
submits the Tasks in each Stage to TaskScheduler
for further processing in the form of a task set (TaskSet
).
launch tasks via cluster manager
#
Using a cluster manager to allocate resources and schedule tasks, there will also be a certain retry and fault tolerance mechanism for failed tasks. TaskScheduler
is responsible for receiving TaskSet
from DAGScheduler
, creating TaskSetManager
to manage the TaskSet, and adding this TaskSetManager to the scheduling pool
, finally handing over the scheduling of Tasks to the backend interface (SchedulerBackend
). SchedulerBackend first applies for TaskScheduler, sorts all TaskSetManagers in the scheduling pool according to the task scheduling algorithm (FIFO and FAIR
), then allocates resources to TaskSets based on the maximum locality principle, and finally runs the Tasks in the TaskSet on the allocated nodes.
execute tasks
#
Execute tasks and store intermediate results and final results in the storage system.
II. Detailed Explanation of RDD#
Simple Summary of RDD#
Refer to RDD Summary
Partitioner
#
When there is a shuffle dependency, use Partitioner
to determine the partition dependency between upstream and downstream RDDs.
abstract class Partitioner extends Serializable {
# Used to get the number of partitions
def numPartitions: Int
# Used to map the input key to a certain partition of the downstream RDD
def getPartition(key: Any): Int
}
III. Stage#
Official explanation:
/**
* A stage is a set of parallel tasks all computing the same function that need to run as part
* of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
* by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
* DAGScheduler runs these stages in topological order.
*
* Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
* other stage(s), or a result stage, in which case its tasks directly compute a Spark action
* (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
* track the nodes that each output partition is on.
*/
A Stage is a collection of parallel tasks that have the same dependencies. During the execution process, DAGScheduler
divides a series of RDDs into different Stages
and builds the dependencies between them, allowing Stages without dependencies to execute in parallel while ensuring that Stages with dependencies execute in order. Stages are divided into ShuffleMapStage
that require Shuffle processing and the downstream ResultStage
, with ResultStage being the last executed Stage, such as executing tasks like count()
and other action
operators.
The submission process of all Stages in a Job includes reverse driving and forward submission.#
Reverse Driving#
Reverse driving refers to starting from the downstream ResultStage, where the ResultStage drives the execution of all parent Stages, and this driving process continues to propagate towards the ancestors until reaching the upstream Stage.
Forward Submission#
Forward submission means that the predecessor Stages submit Tasks to TaskScheduler before the successor Stages, "passing down through generations," until the last ResultStage submits its Task to TaskScheduler.
IV. DAGScheduler#
Introduction#
DAGScheduler
divides a series of RDDs into different Stages through computation, then builds the relationships between these Stages, and finally splits each Stage into multiple Tasks according to Partitions
, submitting them to the underlying TaskScheduler
in the form of a Task set (i.e., TaskSet
). Some components that DAGScheduler
depends on include: DAGSchedulerEventProcessLoop
, JobListener
, and ActiveJob
.
JobListener and JobWaiter
#
JobListener
is used to listen for the success or failure of each Task in the job, while JobWaiter
, which inherits from JobListener, is used to wait for the entire Job to complete, then calls a given processing function to handle the return results and ultimately determine the success or failure of the job.
Some thoughts on the source code implementation:
- The task status detection listener is implemented through Scala's asynchronous programming
Future/Promise
, which I don't fully understand yet; I will review it after studying Scala later. - The
cancel()
method defined in JobWaiter to cancel Job execution actually calls thecancelJob()
method of DAGScheduler. - In the
taskSucceeded()
method implemented in JobWaiter that inherits JobListener,synchronized
is used to lock the object for thread safety, which reminds me ofstateLock
in RDD, which also uses synchronized locking.
ActiveJob
#
ActiveJob
represents an activated Job, i.e., a Job that has been received and processed by DAGScheduler.
DAGSchedulerEventProcessLoop
#
DAGSchedulerEventProcessLoop
is the internal event loop processor of DAGScheduler
, and its implementation principle is similar to LiveListenerBus.
Submission of DAGScheduler and Job#
A brief summary of the process, taking the execution of the count operator as an example:
-
First, executing the count operator will create a call to the runJob() method of SparkContext.
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
-
Then it continues to call the runJob() method of DAGScheduler.
def runJob[T, U: ClassTag](...): Unit = { .... dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) .... }
-
DAGScheduler submits the Job by executing the submitJob() method.
def runJob[T, U](...): Unit = { val start = System.nanoTime // Execute submitJob() method val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format(...)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format(...)) ... } }
-
submitJob() generates an instance of JobWaiter to listen to the execution status of the Job and sends a JobSubmitted event to DAGSchedulerEventProcessLoop.
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted(....))
-
After the eventProcessLoop object posts the JobSubmitted event, the
eventThread
thread instance within the object processes the event, continuously retrieving events from the event queue and calling theonReceive
function to handle events. When it matches the JobSubmitted event, it calls thehandleJobSubmitted
function of DAGScheduler, passing in jobId, rdd, and other parameters to process the Job. -
Execution process of handleJobSubmitted:
private[scheduler] def handleJobSubmitted(...) { var finalStage: ResultStage = null try { finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { .... } ..... val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) ..... jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) }
From the source code analysis, the execution process can be summarized as follows:
-
First, call the createResultStage method to create the ResultStage, this is where the Stage construction process begins, see the next section for the Stage construction process.
-
Create ActiveJob.
-
Place the relationship between JobId and the newly created ActiveJob into jobIdToActiveJob.
-
Place the newly created ActiveJob into the activeJobs collection.
-
Make the ResultStage's _activeJob attribute hold the newly created ActiveJob.
-
Get the StageInfo of all Stages corresponding to the current Job (i.e., the array stageInfos).
-
Post the SparkListenerJobStart event to listenerBus (LiveListenerBus), which triggers all listeners concerned with this event to perform corresponding actions.
-
Call the submitStage method to submit the Stage; all parent Stages must be completed before submission.
The submitStage has three calling logics:
submitMissingTasks(stage, jobId.get)
:If all parent stages have been completed, submit the tasks contained in the stage.submitStage(parent)
:If there are parent stages that are not completed, submit recursively.abortStage
:Invalid stage, stop directly.
-
-
After DAGScheduler completes the task submission, it determines which Partitions need to be computed, generates Tasks for the Partitions, and then packages them into a TaskSet to submit to TaskScheduler. It waits for TaskScheduler to finally submit these Tasks to the cluster and listens to the status of these Tasks.
Constructing Stages#
Earlier, we saw that the handleJobSubmitted execution process calls the createResultStage method to create the ResultStage.
private def createResultStage(....): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
Based on the source code analysis, the detailed processing steps are as follows:
-
Call the
getOrCreateParentStages
method to get the list of all parent Stages, where the parent Stages are mainly the stages corresponding to wide dependencies. -
Processing steps of getOrCreateParentStages:
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }
- Call DAGScheduler's
getShuffleDependencies
method to get the sequence of all ShuffleDependencies for the given RDD, and access each of its dependent non-shuffle RDDs to obtain all non-shuffle RDD's ShuffleDependency dependencies. - Call DAGScheduler's
getOrCreateShuffleMapStage
method to obtain or create the corresponding ShuffleMapStage for each ShuffleDependency, and return the obtained list of ShuffleMapStages.
- Call DAGScheduler's
-
Processing steps of getOrCreateShuffleMapStage:
-
Ah
-
Generate the identity id of the Stage and create ResultStage.
-
Register the ResultStage in stageIdToStage.
-
Call updateJobIdStageIdMaps method to update the mapping relationship between the Job's identity and the ResultStage and all its ancestors.