Yige

Yige

Build

Sparkの進階 - Sparkスケジューリングシステム

Spark スケジューリングシステム#

一、主要な作業フロー#

二つのコア:DAGSchedulerTaskScheduler
-w930

build operator DAG#

ユーザーが提出した Job は最初に一連のRDDに変換され、RDD 間の依存関係 (Dependency) を通じて DAG を構築し、次に RDD で構成された DAG をスケジューリングシステムに提出します。

split graph into stage of tasks#

DAGSchedulerは RDD で構成された DAG を受け取り、一連の RDD を異なるStageに分割します。Stage の異なるタイプ (ResultStageと ShuffleMapStage) に基づいて、未完了のPartitionに対して異なるタイプの Task (ResultTaskと ShuffleMapTask) を作成します。各 Stage は未完了の Partition の数に応じて、ゼロから複数の Task を作成します。DAGScheduler は最後に各 Stage の Task をタスク集合 (TaskSet) の形式でTaskSchedulerに提出して処理を続けます。

launch tasks via cluster manager#

クラスターマネージャー (cluster manager) を使用してリソースとタスクのスケジューリングを割り当て、失敗したタスクに対しては一定の再試行とフォールトトレランス機構があります。TaskSchedulerDAGSchedulerからTaskSetを受け取り、TaskSetManagerを作成して TaskSet を管理し、この TaskSetManager をスケジューリングプールに追加し、最後に Task のスケジューリングをバックエンドインターフェース (SchedulerBackend) に処理させます。SchedulerBackend は最初に TaskScheduler を申請し、Task スケジューリングアルゴリズム (FIFOとFAIR) に従ってスケジューリングプール内のすべての TaskSetManager をソートし、次に TaskSet に最大のローカリティ原則に従ってリソースを割り当て、最後に各割り当てられたノードで TaskSet 内の Task を実行します。

execute tasks#

タスクを実行し、タスクの中間結果と最終結果をストレージシステムに保存します。

二、RDD の詳細#

RDD の簡単なまとめ#

参考RDD まとめ

分区計算器Partitioner#

shuffle 依存関係が存在する場合、Partitionerを利用して上下流の RDD 間の分区依存関係を決定します。

abstract class Partitioner extends Serializable {
  # 分区数を取得するためのメソッド
  def numPartitions: Int
  # 入力のkeyを下流のRDDの特定の分区にマッピングするためのメソッド
  def getPartition(key: Any): Int
}

三、Stage#

公式の説明:

 /**
 * A stage is a set of parallel tasks all computing the same function that need to run as part
 * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
 * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
 * DAGScheduler runs these stages in topological order.
 *
 * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
 * other stage(s), or a result stage, in which case its tasks directly compute a Spark action
 * (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
 * track the nodes that each output partition is on.
 */

Stage は一連の並列計算であり、同じ依存関係を持つタスクの集合であり、実行フローの中でDAGSchedulerは一連の RDD を異なるStageに分割し、それらの間の依存関係を構築し、依存関係のない Stage を並行して実行し、依存関係のある Stage を順次実行します。Stage は Shuffle を処理する必要があるShuffleMapStageと最下流のResultStageに分かれ、ResultStage は最後に実行される Stage であり、例えばcount()などのaction演算子のタスクを実行します。

Job 内のすべての Stage の提出プロセスには逆ドライブと正ドライブが含まれます。#

逆ドライブ#

逆ドライブとは、最下流の ResultStage から始まり、ResultStage がすべての親 Stage の実行を駆動することを指します。この駆動プロセスは祖先の方向に伝播し、最上流の Stage に達するまで続きます。

正ドライブ#

正ドライブとは、前の Stage が後の Stage よりも先に Task を TaskScheduler に提出することを指し、「代代相伝」となり、ResultStage が最後に Task を TaskScheduler に提出します。

四、DAGScheduler#

イントロダクション#

DAGSchedulerは計算を通じて DAG 内の一連の RDD を異なる Stage に分割し、次にこれらの Stage 間の関係を構築し、最後に各 Stage をPartitionに分割して複数の Task として、タスク集合 (すなわちTaskSet) の形式で下層のTaskSchedulerに提出します。
DAGScheduler が依存するいくつかのコンポーネント:DAGSchedulerEventProcessLoopJobListenerおよびActiveJob

JobListenerとJobWaiter#

JobListenerは作業内の各 Task の成功または失敗を監視するために使用され、JobWaiterは JobListener を継承して実装され、全体の Job が完了するのを待ち、指定された処理関数を呼び出して返された結果を処理し、最終的に作業の成功または失敗を確定します。

ソースコード実装に関するいくつかの考察:

  • Scala の非同期プログラミングFuture/Promiseを通じてタスクの状態の検出監視を実現しました。この部分はまだあまり理解していないので、後で Scala を学んだ後に再確認します。
  • JobWaiter 内で定義されたcancel()メソッドは Job の実行をキャンセルするために実際に DAGScheduler のcancelJob()メソッドを呼び出します。
  • JobWaiter 内で JobListener を継承して実装された taskSucceeded () メソッドでは、スレッドセーフを保証するためにsynchronizedを使用してオブジェクトにロックをかけています。RDD 内のstateLockを思い出させ、こちらもsynchronizedでロックをかけています。

ActiveJob#

ActiveJobはすでにアクティブな Job を表し、すなわち DAGScheduler が受け取って処理している Job です。

DAGSchedulerEventProcessLoop#

DAGSchedulerEventProcessLoopDAGScheduler内部のイベントループ処理器であり、実装原理は LiveListenerBus に似ています。

DAGScheduler と Job の提出#

簡単なフローを概括すると、ここでは count 演算子を実行する例を挙げます:

  • まず count 演算子を実行すると、SparkContext の runJob () メソッドが呼び出されます。

    def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
    
  • 次に DAGScheduler の runJob () メソッドが呼び出されます。

    def runJob[T, U: ClassTag](...): Unit = {
        ....
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        ....
      }
    
  • DAGScheduler は submitJob () メソッドを実行して Job を提出します。

    def runJob[T, U](...): Unit = {
        val start = System.nanoTime
        // submitJob()メソッドを実行
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
        waiter.completionFuture.value.get match {
          case scala.util.Success(_) =>
            logInfo("Job %d finished: %s, took %f s".format(...))
          case scala.util.Failure(exception) =>
            logInfo("Job %d failed: %s, took %f s".format(...))
            ...
        }
    }
    
  • submitJob () は JobWaiter のインスタンスを生成し、Job の実行状況を監視し、DAGSchedulerEventProcessLoop に JobSubmitted イベントを送信します。

    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(....))
    
  • eventProcessLoop オブジェクトが JobSubmitted イベントを投げると、オブジェクト内のeventThreadスレッドインスタンスがイベントを処理し、イベントキューからイベントを取り出してonReceive関数を呼び出してイベントを処理します。JobSubmitted イベントに一致すると、DAGScheduler のhandleJobSubmitted関数が呼び出され、jobid、rdd などのパラメータが渡されて Job が処理されます。

  • handleJobSubmitted の実行プロセス

    private[scheduler] def handleJobSubmitted(...) {
        var finalStage: ResultStage = null
        try {
          finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
        } catch {
            ....
        }
        .....
    
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    
        .....
    
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.setActiveJob(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
      }
    
    

    ソースコードの分析から実行プロセスを概括すると以下のようになります。

    1. 最初に createResultStage メソッドを呼び出して ResultStage を作成します。ここで Stage の構築プロセスが始まります。詳細は次のセクションの Stage の構築プロセスを参照してください。

    2. ActiveJob を作成します。

    3. JobId と新しく作成された ActiveJob の対応関係を jobIdToActiveJob に格納します。

    4. 新しく作成された ActiveJob を activeJobs 集合に追加します。

    5. ResultStage の_activeJob 属性が新しく作成された ActiveJob を保持します。

    6. 現在の Job のすべての Stage に対応する StageInfo (すなわち配列 stageInfos) を取得します。

    7. listenerBus (LiveListenerBus) に SparkListenerJobStart イベントを投げ、すべてのこのイベントに関心を持つリスナーが対応する操作を実行します。

    8. submitStage メソッドを呼び出して Stage を提出します。すべての親 Stage が計算を完了しなければ、提出できません。
      submitStage の三つの呼び出しロジック:
      submitMissingTasks(stage,jobId.get):すべての親 stage が完了している場合、stage に含まれる task を提出します。

      submitStage(parent):親 stage が未完了の場合、再帰的に提出します。

      abortStage:無効な stage で、直接停止します。

  • DAGScheduler がタスクの提出を完了した後、どの Partition が計算を必要とするかを判断し、Partition に Task を生成し、TaskSet に封装して TaskScheduler に提出します。TaskScheduler が最終的にクラスタにこれらの Task を提出し、これらの Task の状態を監視します。

Stage の構築#

前述の handleJobSubmitted の実行プロセスで createResultStage メソッドが呼び出され、ResultStage が作成されます。

private def createResultStage(....): ResultStage = {
    checkBarrierStageWithDynamicAllocation(rdd)
    checkBarrierStageWithNumSlots(rdd)
    checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

ソースコードの分析に基づく詳細な処理手順は以下の通りです:

  • getOrCreateParentStagesメソッドを呼び出してすべての親 Stage のリストを取得します。親 Stage は主に広い依存関係に対応する Stage です。

  • getOrCreateParentStages の処理手順:

    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        getShuffleDependencies(rdd).map { shuffleDep =>
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
    }
    
    1. DAGScheduler のgetShuffleDependenciesメソッドを呼び出して、現在の RDD のすべての ShuffleDependency のシーケンスを取得し、依存する非 Shuffle の RDD を一つずつ訪問し、すべての非 Shuffle の RDD の ShuffleDependency 依存を取得します。
    2. DAGScheduler のgetOrCreateShuffleMapStageメソッドを呼び出して、各 ShuffleDependency に対して対応する ShuffleMapStage を取得または作成し、得られた ShuffleMapStage のリストを返します。
  • getOrCreateShuffleMapStage メソッドの処理手順:

  • ああ

  • Stage の ID を生成し、ResultStage を作成します。

  • ResultStage を stageIdToStage に登録します。

  • updateJobIdStageIdMaps メソッドを呼び出して Job の ID と ResultStage およびそのすべての祖先のマッピング関係を更新します。

ResultStage の提出#

まだ計算されていない Task の提出#

Task 実行結果の処理#

参考リンク#

参考リンク#

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。