Livy Series - Detailed Explanation of Livy Session#
Content organized from:
- Author on JianShu - Beef Ball Noodles without Green Onions Apache Livy Collection
Overview#
Livy has two types of jobs: session and batch. The creation processes of session and batch are quite different; the creation of a batch ends with the startup of the corresponding Spark app, while a session not only needs to start the corresponding Spark app but also supports sharing the SparkContext to accept and run individual statements. I divide the creation of a session into two major steps:
client side
: runs in LivyServer, accepts requests until the Spark app is started (note that although it's called the client side, it runs in LivyServer)server side (inside driver)
: the startup of the Spark app driver corresponding to the session
Client Side#
Overall Process#
- Start the Spark app corresponding to the session
- Establish a connection with the driver
- Create and initialize the session
Start the Spark App Corresponding to the Session#
The core method is: ContextLauncher#startDriver
, which can be divided into two major steps:
- Start the Spark app
- Wait for SparkSubmit to exit
Start the Spark App#
The mainClass of the Spark app corresponding to the session is RSCDriverBootstrapper
Call the startDriver()
method, create a new SparkLauncher object, configure resources, mainClass, etc., and then call the launch() method to obtain the corresponding Process object process of the SparkSubmit process.
Wait for SparkSubmit to Exit#
The process returned by SparkLauncher#launch() is the SparkSubmit process. After returning the process, a new ContextLauncher.ChildProcess object is created, which will start a new thread to wait for the SparkSubmit process to exit. The logic in that thread is as follows:
public void run() {
try {
// The thread blocks until this process exits
int exitCode = child.waitFor();
// If it exits abnormally, it indicates that the Spark App failed to start, an exception will be thrown
if (exitCode != 0) {
LOG.warn("Child process exited with code {}.", exitCode);
fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
}
} catch (InterruptedException ie) {
LOG.warn("Waiting thread interrupted, killing child process.");
Thread.interrupted();
child.destroy();
} catch (Exception e) {
LOG.warn("Exception while waiting for child process.", e);
}
}
Establish Connection with the Driver#
The biggest feature of a session is that it can share the SparkContext, allowing multiple code snippets submitted by users to run on a single SparkContext (similar to IO multiplexing, HTTP long connections). This has two benefits:
- Significantly speeds up the task startup time; starting an app on YARN is relatively time-consuming, while using a session, in addition to the considerable time needed to start the session, subsequent submitted code snippets will execute immediately (similar to
HTTP long connections
) - Shared RDDs and tables: Persistent RDDs and tables can be used by subsequent code snippets, which is very useful in scenarios where different users need to perform calculations on the same RDDs and tables (similar to IO multiplexing, network reuse)
Since the driver may be scheduled to start on any node by YARN, LivyServer cannot actively establish a connection with the driver. Instead, it pre-establishes an RpcServer on the client side to wait for the driver to connect. Therefore, the process is summarized in three steps:
-
The client passes its RpcServer information to the driver
Step (5) in the sequence diagram:RSCClientFactory#createClient
, in this call, aorg.apache.livy.rsc.rpc.RpcServer
object is created and assigned to the member server. -
The driver connects to the client and passes its RpcServer information (implemented in
RSCDriver#initializeServer
)
-
The client receives the driver rpcServer address information and connects
Creation and Initialization of the Session#
After establishing a connection with the driver, an InteractiveSession object is created and initialized using rscClient, livyConf, and other information. The process is as follows:
Key Steps:
- Store session information in the state store for recovery in case the Livy server crashes
- Send an empty PingJob to the driver to check if the driver's status is okay; if the PingJob executes successfully, it indicates that the driver is okay, and the session is set to running status; if there is an error or failure, it indicates that there is a problem with the driver, and the session's status is set to error
After successfully completing the creation and initialization of the session, the session will be added to the SessionManager
for unified management. The main responsibilities of the SessionManager include:
- Holding all sessions
- Cleaning up expired sessions
- Recovering sessions from the state store
Server Side (Inside Driver)#
Overview#
As shown in the figure, the startup process inside the driver can be divided into the following five steps:
- Create an instance of ReplDriver
- Initialize the server
- Initialize SparkContext
- Create an instance of JobContextImpl and execute jobs
- Wait for exit
Create an Instance of ReplDriver#
ReplDriver is the Spark App driver corresponding to the InteractiveSession, used to receive and process various requests from the Livy server. It is also a subclass of RSCDriver, RSCDriver:
- Holds the RpcServer server waiting for RSCClient to connect
- Initializes SparkContext
- Processes various requests: CancelJob, EndSession, JobRequest, BypassJobRequest, SyncJobRequest, GetBypassJobStatus
- Handles add file requests
Initialize the Server#
This step is called in RSCDriver#initializeServer(), used to connect the client and inform the server side of the RPC address. After the client learns the server RPC address, it will connect and send requests.
Initialize SparkContext#
Divided into 3 steps:
-
Create different types of code interpreters based on different kinds
-
Create repl/Session, whose main responsibilities are:
- Start the interpreter and obtain SparkContext
- Hold a thread pool to asynchronously execute statements (executed through the interpreter)
- Hold a thread pool to asynchronously cancel statements
- Manage all statements under a session
-
Call the
interpreter#start
method to start the Session
How to Execute Code Snippets#
Analysis of executeCodeFunc() Method
This is the function used to actually run code snippets, as shown in step 9 of the above figure. The process is as follows: