Spark スケジューリングシステム#
一、主要な作業フロー#
二つのコア:DAGScheduler
とTaskScheduler
build operator DAG
#
ユーザーが提出した Job は最初に一連のRDD
に変換され、RDD 間の依存関係 (Dependency
) を通じて DAG を構築し、次に RDD で構成された DAG をスケジューリングシステムに提出します。
split graph into stage of tasks
#
DAGScheduler
は RDD で構成された DAG を受け取り、一連の RDD を異なるStage
に分割します。Stage の異なるタイプ (ResultStageと ShuffleMapStage
) に基づいて、未完了のPartition
に対して異なるタイプの Task (ResultTaskと ShuffleMapTask
) を作成します。各 Stage は未完了の Partition の数に応じて、ゼロから複数の Task を作成します。DAGScheduler は最後に各 Stage の Task をタスク集合 (TaskSet
) の形式でTaskScheduler
に提出して処理を続けます。
launch tasks via cluster manager
#
クラスターマネージャー (cluster manager
) を使用してリソースとタスクのスケジューリングを割り当て、失敗したタスクに対しては一定の再試行とフォールトトレランス機構があります。TaskScheduler
はDAGScheduler
からTaskSet
を受け取り、TaskSetManager
を作成して TaskSet を管理し、この TaskSetManager をスケジューリングプール
に追加し、最後に Task のスケジューリングをバックエンドインターフェース (SchedulerBackend
) に処理させます。SchedulerBackend は最初に TaskScheduler を申請し、Task スケジューリングアルゴリズム (FIFOとFAIR
) に従ってスケジューリングプール内のすべての TaskSetManager をソートし、次に TaskSet に最大のローカリティ原則に従ってリソースを割り当て、最後に各割り当てられたノードで TaskSet 内の Task を実行します。
execute tasks
#
タスクを実行し、タスクの中間結果と最終結果をストレージシステムに保存します。
二、RDD の詳細#
RDD の簡単なまとめ#
参考RDD まとめ
分区計算器Partitioner
#
shuffle 依存関係が存在する場合、Partitioner
を利用して上下流の RDD 間の分区依存関係を決定します。
abstract class Partitioner extends Serializable {
# 分区数を取得するためのメソッド
def numPartitions: Int
# 入力のkeyを下流のRDDの特定の分区にマッピングするためのメソッド
def getPartition(key: Any): Int
}
三、Stage#
公式の説明:
/**
* A stage is a set of parallel tasks all computing the same function that need to run as part
* of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
* by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
* DAGScheduler runs these stages in topological order.
*
* Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
* other stage(s), or a result stage, in which case its tasks directly compute a Spark action
* (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
* track the nodes that each output partition is on.
*/
Stage は一連の並列計算であり、同じ依存関係を持つタスクの集合であり、実行フローの中でDAGScheduler
は一連の RDD を異なるStage
に分割し、それらの間の依存関係を構築し、依存関係のない Stage を並行して実行し、依存関係のある Stage を順次実行します。Stage は Shuffle を処理する必要があるShuffleMapStage
と最下流のResultStage
に分かれ、ResultStage は最後に実行される Stage であり、例えばcount()
などのaction
演算子のタスクを実行します。
Job 内のすべての Stage の提出プロセスには逆ドライブと正ドライブが含まれます。#
逆ドライブ#
逆ドライブとは、最下流の ResultStage から始まり、ResultStage がすべての親 Stage の実行を駆動することを指します。この駆動プロセスは祖先の方向に伝播し、最上流の Stage に達するまで続きます。
正ドライブ#
正ドライブとは、前の Stage が後の Stage よりも先に Task を TaskScheduler に提出することを指し、「代代相伝」となり、ResultStage が最後に Task を TaskScheduler に提出します。
四、DAGScheduler#
イントロダクション#
DAGScheduler
は計算を通じて DAG 内の一連の RDD を異なる Stage に分割し、次にこれらの Stage 間の関係を構築し、最後に各 Stage をPartition
に分割して複数の Task として、タスク集合 (すなわちTaskSet
) の形式で下層のTaskScheduler
に提出します。
DAGScheduler が依存するいくつかのコンポーネント:DAGSchedulerEventProcessLoop
、JobListener
およびActiveJob
。
JobListenerとJobWaiter
#
JobListener
は作業内の各 Task の成功または失敗を監視するために使用され、JobWaiter
は JobListener を継承して実装され、全体の Job が完了するのを待ち、指定された処理関数を呼び出して返された結果を処理し、最終的に作業の成功または失敗を確定します。
ソースコード実装に関するいくつかの考察:
- Scala の非同期プログラミング
Future/Promise
を通じてタスクの状態の検出監視を実現しました。この部分はまだあまり理解していないので、後で Scala を学んだ後に再確認します。 - JobWaiter 内で定義された
cancel()
メソッドは Job の実行をキャンセルするために実際に DAGScheduler のcancelJob()
メソッドを呼び出します。 - JobWaiter 内で JobListener を継承して実装された taskSucceeded () メソッドでは、スレッドセーフを保証するために
synchronized
を使用してオブジェクトにロックをかけています。RDD 内のstateLock
を思い出させ、こちらもsynchronized
でロックをかけています。
ActiveJob
#
ActiveJob
はすでにアクティブな Job を表し、すなわち DAGScheduler が受け取って処理している Job です。
DAGSchedulerEventProcessLoop
#
DAGSchedulerEventProcessLoop
はDAGScheduler
内部のイベントループ処理器であり、実装原理は LiveListenerBus に似ています。
DAGScheduler と Job の提出#
簡単なフローを概括すると、ここでは count 演算子を実行する例を挙げます:
-
まず count 演算子を実行すると、SparkContext の runJob () メソッドが呼び出されます。
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
-
次に DAGScheduler の runJob () メソッドが呼び出されます。
def runJob[T, U: ClassTag](...): Unit = { .... dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) .... }
-
DAGScheduler は submitJob () メソッドを実行して Job を提出します。
def runJob[T, U](...): Unit = { val start = System.nanoTime // submitJob()メソッドを実行 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format(...)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format(...)) ... } }
-
submitJob () は JobWaiter のインスタンスを生成し、Job の実行状況を監視し、DAGSchedulerEventProcessLoop に JobSubmitted イベントを送信します。
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessLoop.post(JobSubmitted(....))
-
eventProcessLoop オブジェクトが JobSubmitted イベントを投げると、オブジェクト内の
eventThread
スレッドインスタンスがイベントを処理し、イベントキューからイベントを取り出してonReceive
関数を呼び出してイベントを処理します。JobSubmitted イベントに一致すると、DAGScheduler のhandleJobSubmitted
関数が呼び出され、jobid、rdd などのパラメータが渡されて Job が処理されます。 -
handleJobSubmitted の実行プロセス
private[scheduler] def handleJobSubmitted(...) { var finalStage: ResultStage = null try { finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { .... } ..... val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) ..... jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) }
ソースコードの分析から実行プロセスを概括すると以下のようになります。
-
最初に createResultStage メソッドを呼び出して ResultStage を作成します。ここで Stage の構築プロセスが始まります。詳細は次のセクションの Stage の構築プロセスを参照してください。
-
ActiveJob を作成します。
-
JobId と新しく作成された ActiveJob の対応関係を jobIdToActiveJob に格納します。
-
新しく作成された ActiveJob を activeJobs 集合に追加します。
-
ResultStage の_activeJob 属性が新しく作成された ActiveJob を保持します。
-
現在の Job のすべての Stage に対応する StageInfo (すなわち配列 stageInfos) を取得します。
-
listenerBus (LiveListenerBus) に SparkListenerJobStart イベントを投げ、すべてのこのイベントに関心を持つリスナーが対応する操作を実行します。
-
submitStage メソッドを呼び出して Stage を提出します。すべての親 Stage が計算を完了しなければ、提出できません。
submitStage の三つの呼び出しロジック:
submitMissingTasks(stage,jobId.get)
:すべての親 stage が完了している場合、stage に含まれる task を提出します。submitStage(parent)
:親 stage が未完了の場合、再帰的に提出します。abortStage
:無効な stage で、直接停止します。
-
-
DAGScheduler がタスクの提出を完了した後、どの Partition が計算を必要とするかを判断し、Partition に Task を生成し、TaskSet に封装して TaskScheduler に提出します。TaskScheduler が最終的にクラスタにこれらの Task を提出し、これらの Task の状態を監視します。
Stage の構築#
前述の handleJobSubmitted の実行プロセスで createResultStage メソッドが呼び出され、ResultStage が作成されます。
private def createResultStage(....): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
ソースコードの分析に基づく詳細な処理手順は以下の通りです:
-
getOrCreateParentStages
メソッドを呼び出してすべての親 Stage のリストを取得します。親 Stage は主に広い依存関係に対応する Stage です。 -
getOrCreateParentStages の処理手順:
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { getShuffleDependencies(rdd).map { shuffleDep => getOrCreateShuffleMapStage(shuffleDep, firstJobId) }.toList }
- DAGScheduler の
getShuffleDependencies
メソッドを呼び出して、現在の RDD のすべての ShuffleDependency のシーケンスを取得し、依存する非 Shuffle の RDD を一つずつ訪問し、すべての非 Shuffle の RDD の ShuffleDependency 依存を取得します。 - DAGScheduler の
getOrCreateShuffleMapStage
メソッドを呼び出して、各 ShuffleDependency に対して対応する ShuffleMapStage を取得または作成し、得られた ShuffleMapStage のリストを返します。
- DAGScheduler の
-
getOrCreateShuffleMapStage メソッドの処理手順:
-
ああ
-
Stage の ID を生成し、ResultStage を作成します。
-
ResultStage を stageIdToStage に登録します。
-
updateJobIdStageIdMaps メソッドを呼び出して Job の ID と ResultStage およびそのすべての祖先のマッピング関係を更新します。