Yige

Yige

Build

Livyシリーズ - Livyセッションの詳細解説

Livy シリーズ - Livy Session の詳細解説#

内容整理自:

  1. 簡書著者 - 牛肉丸粉不加葱 Apache Livy 文集

概要#

Livy には 2 種類のジョブがあり、それぞれ session と batch です。session と batch の作成プロセスは非常に異なり、batch の作成は対応する spark アプリの起動を終点とします。一方、session は対応する spark アプリを起動するだけでなく、共有 sparkContext をサポートして、個々の statements の提出と実行を受け入れる必要があります。私は session の作成を 2 つの大きなステップに分けます:

  • client端:LivyServer 内で実行され、spark アプリを起動するまでリクエストを受け付けます(注意:ここでは client 端と呼ばれていますが、LivyServer 内で実行されています)
  • server端(driver内部):session に対応する spark アプリ driver の起動

client 端#

全体の流れ#

image.png

  • session に対応する spark アプリを起動
  • driver と接続を確立
  • Session の作成と初期化

session に対応する spark アプリを起動#

核心的なメソッドは:ContextLauncher#startDriverで、2 つの大きなステップに分けられます:

  • spark アプリを起動
  • SparkSubmit の終了を待つ

spark アプリを起動#

session に対応する spark アプリの mainClass はRSCDriverBootstrapperです。
image.png

startDriver()メソッドを呼び出し、新しい SparkLauncher オブジェクトを作成し、設定、リソース、mainClass などの設定を行い、次に launch () メソッドを呼び出して SparkSubmit プロセスの対応する Process オブジェクト process を取得します。

SparkSubmit の終了を待つ#

SparkLauncher#launch () が返すプロセスは SparkSubmit プロセスであり、process を返した後に新しい ContextLauncher.ChildProcess オブジェクトを作成します。このプロセス内で新しいスレッドが起動され、SparkSubmit プロセスの終了を待ち続けます。このスレッド内のロジックは以下の通りです:

public void run() {
  try {
    // スレッドはこのプロセスが終了するまでブロックされます
    int exitCode = child.waitFor();
    // 異常終了の場合、Spark Appの起動に失敗したことを示し、例外をスローします
    if (exitCode != 0) {
      LOG.warn("Child process exited with code {}.", exitCode);
      fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
    }
  } catch (InterruptedException ie) {
    LOG.warn("Waiting thread interrupted, killing child process.");
    Thread.interrupted();
    child.destroy();
  } catch (Exception e) {
    LOG.warn("Exception while waiting for child process.", e);
  }
}

driver との接続を確立#

session の最大の特徴は、SparkContext を共有できることで、ユーザーが提出した複数のコードスニペットが 1 つの SparkContext 上で実行できることです(IO 多重化、http 長接続を連想)。これには 2 つの利点があります:

  • タスクの起動速度が大幅に向上します。yarn 上でアプリを起動するのは時間がかかりますが、session を使用すると、session を起動するのに相応の時間がかかる以外は、後に提出されたコードスニペットは即座に実行されます(http長接続を連想)。
  • RDD、table の共有:永続化された RDD、table は後のコードスニペットで使用でき、異なるユーザーが同じ RDD、table 上で計算を行う必要があるシナリオで非常に便利です(IO 多重化、ネットワークの再利用を連想)。

driver は yarn によって任意のノードにスケジュールされる可能性があるため、LivyServer が自発的に driver と接続を確立することはできません。そのため、事前に client 端で RpcServer を作成し、driver の接続を待機します。したがって、最初に client 端で RpcServer を作成し、driver 端の接続を待ち、接続が成功した後に client が driver 端の情報を取得して driver 端の RpcServer に接続します。まとめると、3 つのステップになります:

  • client がその RpcServer 情報を driver に渡します。
    時系列図の第 (5) ステップ:RSCClientFactory#createClient、この呼び出しでorg.apache.livy.rsc.rpc.RpcServerオブジェクトを作成し、メンバー server に割り当てます。

  • driver が client に接続し、RpcServer 情報を渡します(RSCDriver#initializeServerで実装)。
    image.png

  • client が driver の rpcServer アドレス情報を受け取り、接続します。

Session の作成と初期化#

driver との接続が確立された後、rscClient、livyConf などの情報を使用して InteractiveSession オブジェクトを作成し、初期化します。プロセスは以下の通りです:
image.png

重要なステップ:

  • session 情報を state store に保存し、livy server がクラッシュした後にリカバリできるようにします。
  • driver に空の PingJob を送信して driver の状態が正常かどうかを確認します。PingJob が正常に実行されれば、driver の状態は正常であり、session を running 状態に設定します。エラーや失敗が発生した場合は、driver に問題が発生したことを示し、session の状態を error に設定します。

session の作成と初期化が成功裏に完了した後、session はSessionManagerに追加され、統一管理されます。SessionManager の主な責任は以下の通りです:

  • すべての sessions を保持
  • 期限切れの session をクリーンアップ
  • state store から sessions を復元

server 端(driver 内部)#

概要#

image.png

図に示すように、driver 内部の起動プロセスは以下の 5 つのステップに分けられます:

  • ReplDriver インスタンスを作成
  • server を初期化
  • SparkContext を初期化
  • JobContextImpl インスタンスを作成し、jobs を実行
  • 終了を待つ

ReplDriver インスタンスを作成#

ReplDriver は InteractiveSession に対応する Spark App driver で、livy server からのさまざまなリクエストを受け取り、処理します。また、RSCDriver のサブクラスでもあります。RSCDriver は:

  • RSCClient の接続を待つ RpcServer server を保持
  • SparkContext を初期化
  • 様々なリクエストを処理:CancelJob、EndSession、JobRequest、BypassJobRequest、SyncJobRequest、GetBypassJobStatus
  • add file リクエストを処理

server を初期化#

このステップは RSCDriver#initializeServer () で呼び出され、client に接続し、server 端の rpc アドレスを通知します。client は server の rpc アドレスを知った後、接続を行い、リクエストを送信します。

image.png

SparkContext を初期化#

3 つのステップに分かれます:

  • 異なる kind に基づいて異なるタイプのコードインタープリタを作成

  • repl/Session を作成し、その主な責任は:

    1. interpreter を起動し、SparkContext を取得
    2. statements を非同期に実行するためのスレッドプールを保持(interpreter を通じて実行)
    3. statements を非同期にキャンセルするためのスレッドプールを保持
    4. 1 つの session 内のすべての statements を管理
  • interpreter#startメソッドを呼び出して Session を起動します。

コードスニペットを実行する方法#

image.png

executeCodeFunc () メソッド分析
上図の第 9 ステップにある executeCodeFunc は、実際にコードスニペットを実行するための関数で、プロセスは以下の通りです。
image.png

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。