Livy シリーズ - Livy Session の詳細解説#
内容整理自:
- 簡書著者 - 牛肉丸粉不加葱 Apache Livy 文集
概要#
Livy には 2 種類のジョブがあり、それぞれ session と batch です。session と batch の作成プロセスは非常に異なり、batch の作成は対応する spark アプリの起動を終点とします。一方、session は対応する spark アプリを起動するだけでなく、共有 sparkContext をサポートして、個々の statements の提出と実行を受け入れる必要があります。私は session の作成を 2 つの大きなステップに分けます:
client端
:LivyServer 内で実行され、spark アプリを起動するまでリクエストを受け付けます(注意:ここでは client 端と呼ばれていますが、LivyServer 内で実行されています)server端(driver内部)
:session に対応する spark アプリ driver の起動
client 端#
全体の流れ#
- session に対応する spark アプリを起動
- driver と接続を確立
- Session の作成と初期化
session に対応する spark アプリを起動#
核心的なメソッドは:ContextLauncher#startDriver
で、2 つの大きなステップに分けられます:
- spark アプリを起動
- SparkSubmit の終了を待つ
spark アプリを起動#
session に対応する spark アプリの mainClass はRSCDriverBootstrapper
です。
startDriver()
メソッドを呼び出し、新しい SparkLauncher オブジェクトを作成し、設定、リソース、mainClass などの設定を行い、次に launch () メソッドを呼び出して SparkSubmit プロセスの対応する Process オブジェクト process を取得します。
SparkSubmit の終了を待つ#
SparkLauncher#launch () が返すプロセスは SparkSubmit プロセスであり、process を返した後に新しい ContextLauncher.ChildProcess オブジェクトを作成します。このプロセス内で新しいスレッドが起動され、SparkSubmit プロセスの終了を待ち続けます。このスレッド内のロジックは以下の通りです:
public void run() {
try {
// スレッドはこのプロセスが終了するまでブロックされます
int exitCode = child.waitFor();
// 異常終了の場合、Spark Appの起動に失敗したことを示し、例外をスローします
if (exitCode != 0) {
LOG.warn("Child process exited with code {}.", exitCode);
fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
}
} catch (InterruptedException ie) {
LOG.warn("Waiting thread interrupted, killing child process.");
Thread.interrupted();
child.destroy();
} catch (Exception e) {
LOG.warn("Exception while waiting for child process.", e);
}
}
driver との接続を確立#
session の最大の特徴は、SparkContext を共有できることで、ユーザーが提出した複数のコードスニペットが 1 つの SparkContext 上で実行できることです(IO 多重化、http 長接続を連想)。これには 2 つの利点があります:
- タスクの起動速度が大幅に向上します。yarn 上でアプリを起動するのは時間がかかりますが、session を使用すると、session を起動するのに相応の時間がかかる以外は、後に提出されたコードスニペットは即座に実行されます(
http長接続
を連想)。 - RDD、table の共有:永続化された RDD、table は後のコードスニペットで使用でき、異なるユーザーが同じ RDD、table 上で計算を行う必要があるシナリオで非常に便利です(IO 多重化、ネットワークの再利用を連想)。
driver は yarn によって任意のノードにスケジュールされる可能性があるため、LivyServer が自発的に driver と接続を確立することはできません。そのため、事前に client 端で RpcServer を作成し、driver の接続を待機します。したがって、最初に client 端で RpcServer を作成し、driver 端の接続を待ち、接続が成功した後に client が driver 端の情報を取得して driver 端の RpcServer に接続します。まとめると、3 つのステップになります:
-
client がその RpcServer 情報を driver に渡します。
時系列図の第 (5) ステップ:RSCClientFactory#createClient
、この呼び出しでorg.apache.livy.rsc.rpc.RpcServer
オブジェクトを作成し、メンバー server に割り当てます。 -
driver が client に接続し、RpcServer 情報を渡します(
RSCDriver#initializeServer
で実装)。
-
client が driver の rpcServer アドレス情報を受け取り、接続します。
Session の作成と初期化#
driver との接続が確立された後、rscClient、livyConf などの情報を使用して InteractiveSession オブジェクトを作成し、初期化します。プロセスは以下の通りです:
重要なステップ:
- session 情報を state store に保存し、livy server がクラッシュした後にリカバリできるようにします。
- driver に空の PingJob を送信して driver の状態が正常かどうかを確認します。PingJob が正常に実行されれば、driver の状態は正常であり、session を running 状態に設定します。エラーや失敗が発生した場合は、driver に問題が発生したことを示し、session の状態を error に設定します。
session の作成と初期化が成功裏に完了した後、session はSessionManager
に追加され、統一管理されます。SessionManager の主な責任は以下の通りです:
- すべての sessions を保持
- 期限切れの session をクリーンアップ
- state store から sessions を復元
server 端(driver 内部)#
概要#
図に示すように、driver 内部の起動プロセスは以下の 5 つのステップに分けられます:
- ReplDriver インスタンスを作成
- server を初期化
- SparkContext を初期化
- JobContextImpl インスタンスを作成し、jobs を実行
- 終了を待つ
ReplDriver インスタンスを作成#
ReplDriver は InteractiveSession に対応する Spark App driver で、livy server からのさまざまなリクエストを受け取り、処理します。また、RSCDriver のサブクラスでもあります。RSCDriver は:
- RSCClient の接続を待つ RpcServer server を保持
- SparkContext を初期化
- 様々なリクエストを処理:CancelJob、EndSession、JobRequest、BypassJobRequest、SyncJobRequest、GetBypassJobStatus
- add file リクエストを処理
server を初期化#
このステップは RSCDriver#initializeServer () で呼び出され、client に接続し、server 端の rpc アドレスを通知します。client は server の rpc アドレスを知った後、接続を行い、リクエストを送信します。
SparkContext を初期化#
3 つのステップに分かれます:
-
異なる kind に基づいて異なるタイプのコードインタープリタを作成
-
repl/Session を作成し、その主な責任は:
- interpreter を起動し、SparkContext を取得
- statements を非同期に実行するためのスレッドプールを保持(interpreter を通じて実行)
- statements を非同期にキャンセルするためのスレッドプールを保持
- 1 つの session 内のすべての statements を管理
-
interpreter#start
メソッドを呼び出して Session を起動します。
コードスニペットを実行する方法#
executeCodeFunc () メソッド分析
上図の第 9 ステップにある executeCodeFunc は、実際にコードスニペットを実行するための関数で、プロセスは以下の通りです。