Summary of Spark Tuning#
Content organized from:
Thirty Thousand Words Long Article | Spark Performance Optimization Practical Manual
Development Tuning#
-
Avoid creating duplicate RDDs
-
Reuse the same RDD as much as possible
-
Persist RDDs that are used multiple times
-
Try to avoid using shuffle-type operators
-
Use map-side pre-aggregation shuffle operations, such as using reduceByKey or aggregateByKey operators instead of groupByKey operator
-
Use high-performance operators, such as
mapPartitions instead of ordinary map
, use foreachPartitions instead of foreach, perform coalesce after filter, use repartitionAndSortWithinPartitions instead of repartition and sort operations -
Broadcast large variables
-
Use Kryo to optimize serialization performance
-
Optimize data structures: try to use strings instead of objects, use primitive types (int, Long) instead of strings, use arrays instead of collections
Resource Tuning#
Spark running process:
Resource Parameter Tuning#
num-executors
: Used to set the total number of Executor processes to be used for executing Spark jobsexecutor-memory
: This parameter is used to set the memory for each Executor process. The size of Executor memory often directly determines the performance of Spark jobs and is directly related to common JVM OOM exceptionsexecutor-cores
: This parameter is used to set the number of CPU cores for each Executor process. This parameter determines the ability of each Executor process to execute task threads in paralleldriver-memory
: Used to set the memory for the Driver process. Generally, the Driver's memory should not be set or should be set to around 1G. It is important to note that if the collect operator is used to pull all data to the driver side, the Driver must have enough memoryspark.default.parallelism
: Used to set the default number of tasks for each stage, the recommended setting principle from the official website is2~3 times num-executors * executor-cores
, for example, if the total number of CPU cores for Executors is 300, then setting 1000 tasks is acceptablespark.storage.memoryFraction
: Used to set the proportion of RDD persistent data that can occupy Executor memory, the default is 0.6. Depending on the different persistence strategies you choose, if memory is insufficient, data may not be persisted, or data may be written to diskspark.shuffle.memoryFraction
: Used to set the proportion of Executor memory that can be used for aggregation operations after a task pulls the output from the previous stage during the shuffle process, the default is 0.2
Resource Parameter Reference Example#
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
Data Skew Tuning#
- Most tasks execute quickly, but a few tasks execute very slowly, and the Spark running progress depends on the longest-running task.
- During the shuffle phase, if the amount of data corresponding to a certain key is particularly large, data skew will occur.
Tuning Ideas#
- Identify which stage the data skew occurs in: As long as you see a shuffle-type operator in the Spark code or a SQL statement in Spark SQL that causes shuffle (such as a group by statement), you can determine that this point divides the previous and subsequent stages.
- Use the Spark Web UI to check the running time and allocated data volume of each task in the error stage, and then locate the specific code through the log exception stack.
- Check the data distribution of the keys causing data skew and choose different solutions:
- If the data skew is caused by group by or join statements in Spark SQL, check the key distribution of the tables used in the SQL.
- If the data skew is caused by executing shuffle operators on Spark RDD, you can add code to check the key distribution in the Spark job, such as RDD.countByKey(). Then collect/take the counts of each key to the client for printing to see the key distribution.
Solutions for Data Skew#
Solution 1: Filter Out Few Keys Causing Skew#
If we determine that the few keys with particularly large data volumes are not particularly important to the execution and calculation results of the job, we can simply filter out those few keys. For example, in Spark SQL, you can use the where clause to filter out these keys or in Spark Core, use the filter operator on the RDD to filter out these keys.
Solution 2: Increase the Parallelism of Shuffle Operations#
Implementation Idea:
When executing shuffle operators on RDDs, pass a parameter to the shuffle operator, such as reduceByKey(1000), which sets the number of shuffle read tasks for this shuffle operator. For shuffle-type statements in Spark SQL, such as group by, join, etc., a parameter needs to be set, namely spark.sql.shuffle.partitions, which represents the parallelism of shuffle read tasks. The default value is 200, which is a bit small for many scenarios.
Implementation Principle:
Increasing the number of shuffle read tasks allows multiple keys originally assigned to one task to be distributed to multiple tasks, so that each task processes less data than before. For example, if there are originally 5 keys, each corresponding to 10 pieces of data, and all 5 keys are assigned to one task, then this task has to process 50 pieces of data. After increasing the shuffle read tasks, each task is assigned one key, meaning each task processes 10 pieces of data, thus naturally reducing the execution time of each task.
Solution 3: Two-Stage Aggregation (Local Aggregation + Global Aggregation)#
Implementation Idea:
Perform two-stage aggregation. The first stage is local aggregation, where each key is assigned a random number, such as a random number within 10. At this point, identical keys become different, for example, (hello, 1) (hello, 1) (hello, 1) (hello, 1) becomes (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1). Then, perform reduceByKey and other aggregation operations on the data with random numbers assigned, resulting in local aggregation results like (1_hello, 2) (2_hello, 2). After that, remove the prefixes from each key, resulting in (hello,2)(hello,2), and perform global aggregation again to obtain the final result, such as (hello, 4).
Implementation Code:
// Step 1: Assign a random prefix to each key in the RDD.
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(new PairFunction<Tuple2<Long,Long>, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(Tuple2<Long, Long> tuple) throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
}
});
// Step 2: Perform local aggregation on the keys with random prefixes.
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
// Step 3: Remove the random prefixes from each key in the RDD.
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Long>(originalKey, tuple._2);
}
});
// Step 4: Perform global aggregation on the RDD with removed random prefixes.
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
Advantages of the Solution:
For aggregation-type shuffle operations that cause data skew, the effect is very good. It can usually resolve data skew or at least significantly alleviate it, improving the performance of Spark jobs several times.
Disadvantages of the Solution:
It is only applicable to aggregation-type shuffle operations, and the scope of application is relatively narrow. If it is a join-type shuffle operation, other solutions must be used.
Solution 4: Convert Reduce Join to Map Join#
Implementation Idea:
Instead of using the join operator for connection operations, use Broadcast variables and map-type operators to implement join operations, thereby completely avoiding shuffle-type operations and thoroughly preventing the occurrence of data skew.
Implementation Principle:
Ordinary joins go through the shuffle process, and once a shuffle occurs, it means that data with the same key is pulled into a shuffle read task for joining, which is a reduce join. However, if one RDD is relatively small, you can use the full data of the small RDD as a broadcast variable + map operator to achieve the same effect as join, which is a map join. At this point, no shuffle operation occurs, and thus no data skew occurs.
Implementation Code:
// First, collect the data from the smaller RDD to the Driver.
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// Then use Spark's broadcast feature to convert the small RDD data into a broadcast variable, so that each Executor only has one copy of the RDD data.
// This can save memory space as much as possible and reduce network transmission performance overhead.
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
// Perform map-type operations on the other RDD instead of join-type operations.
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple) throws Exception {
// In the operator function, use the broadcast variable to access the rdd1 data in the local Executor.
List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
// Convert rdd1 data into a Map for easier join operations later.
Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
for(Tuple2<Long, Row> data : rdd1Data) {
rdd1DataMap.put(data._1, data._2);
}
// Get the key and value of the current RDD data.
String key = tuple._1;
String value = tuple._2;
// Get the joinable data from the rdd1 data Map based on the key.
Row rdd1Value = rdd1DataMap.get(key);
return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
}
});
// It should be noted that the above approach is only applicable when the keys in rdd1 are unique.
// If there are multiple identical keys in rdd1, then flatMap-type operations must be used, and join cannot be done with map; instead, all data in rdd1 must be traversed for the join.
// Each piece of data in rdd2 may return multiple pieces of joined data.
Solution 5: Sample Skewed Keys and Split Join Operations#
Applicable Scenario:
When joining two RDDs/Hive tables, if both data volumes are large, you can check the key distribution in both RDDs/Hive tables. If data skew occurs because a few keys in one RDD/Hive table have a large amount of data while all keys in the other RDD/Hive table are distributed relatively evenly, this solution is suitable.
Implementation Idea:
- Sample a portion of the RDD containing a few keys with large data volumes using the sample operator, then count the number of each key to determine which keys have the largest data volumes.
- Split the data corresponding to these keys from the original RDD to form a separate RDD, and assign a random number as a prefix to each key within n, while the majority of keys that do not cause skew form another RDD.
- Next, filter out the data corresponding to those skewed keys from the other RDD that needs to be joined and form a separate RDD, expanding each piece of data into n pieces, each appended with a prefix from 0 to n, while the majority of keys that do not cause skew form another RDD.
- Then, join the independent RDD with random prefixes added to the expanded independent RDD, which will scatter the original identical keys into n parts, distributing them across multiple tasks for joining.
- The other two ordinary RDDs can be joined as usual.
- Finally, merge the results of the two joins using the union operator to get the final join result.
Implementation Principle:
For data skew caused by joins, if only a few keys are causing the skew, you can split those few keys into independent RDDs and add random prefixes to scatter them into n parts for joining. At this point, the data corresponding to these few keys will not be concentrated on a few tasks but will be distributed across multiple tasks for joining, as shown in the figure:
Solution 6: Use Random Prefixes and Expand RDD for Joining#
Applicable Scenario: If there are a large number of keys in the RDD causing data skew during join operations, splitting the keys is not meaningful, and this last solution must be used to solve the problem.
Implementation Idea:
- Similar to "Solution 5", first check the data distribution in the RDD/Hive table to identify the RDD/Hive table causing data skew, for example, if multiple keys correspond to more than 10,000 pieces of data.
- Then assign a random prefix within n to each piece of data in that RDD.
- At the same time, expand the other normal RDD, turning each piece of data into n pieces, with each expanded piece of data sequentially appended with a prefix from 0 to n.
- Finally, join the two processed RDDs.
Implementation Principle:
By adding random prefixes to the originally identical keys, they become different keys, allowing these processed "different keys" to be distributed across multiple tasks for processing, rather than having one task handle a large amount of the same key. The difference between this solution and "Solution 5" is that the previous solution aims to handle only a few skewed keys specially, and since the processing requires expanding the RDD, the memory usage after expanding the RDD is not large; while this solution targets situations with a large number of skewed keys, it cannot split some keys for separate processing, so it must expand the entire RDD, which requires high memory resources.
Advantages of the Solution: It can handle most join-type data skews, and the effect is relatively significant, with very good performance improvement.
Disadvantages of the Solution: This solution is more about alleviating data skew rather than completely avoiding it. It also requires expanding the entire RDD, which demands high memory resources.
Shuffle Tuning#
Reference: Spark Shuffle Principles and Related Tuning
In versions after Spark 1.2, the default HashShuffleManager
was changed to SortShuffleManager
. SortShuffleManager has certain improvements compared to HashShuffleManager. The main improvement is that although each Task generates a large number of temporary disk files during the shuffle operation, it will ultimately merge (merge) all temporary files into one disk file, so each Task only has one disk file. When the shuffle read task of the next stage pulls its data, it only needs to read part of the data from each disk file based on the index.
Optimization of HashShuffleManager#
Set spark.shuffle.consolidateFiles
, default is false.
After enabling the consolidate mechanism
, the concept of shuffleFileGroup
will appear. The consolidate mechanism allows different tasks to reuse the same batch of disk files, which can effectively merge the disk files of multiple tasks to a certain extent, thereby significantly reducing the number of disk files and improving the performance of shuffle writes.
SortShuffleManager#
The running mechanism of SortShuffleManager is mainly divided into two types:
- One is the ordinary running mechanism
- The other is the bypass running mechanism. When the number of shuffle read tasks is less than or equal to the value of the
spark.shuffle.sort.bypassMergeThreshold
parameter (default is 200), the bypass mechanism will be enabled.
Shuffle Related Parameter Tuning#
-
spark.shuffle.file.buffer
Default value:32k
Parameter description: This parameter is used to set the buffer size of BufferedOutputStream for shuffle write tasks. Data will first be written to the buffer before being written to disk files, and will only be spilled to disk when the buffer is full. -
spark.reducer.maxSizeInFlight
Default value:48m
Parameter description: This parameter is used to set the buffer size for shuffle read tasks, and this buffer determines how much data can be pulled at a time. If the job has sufficient available memory resources, this parameter can be increased appropriately. -
spark.shuffle.io.maxRetries
Default value:3
Parameter description: When shuffle read tasks pull their data from the nodes where shuffle write tasks are located, if the pull fails due to network anomalies, it will automatically retry. This parameter represents the maximum number of retries. If the pull still fails within the specified number of attempts, it may cause the job to fail. -
spark.shuffle.io.retryWait
Default value:5s
Parameter description: Represents the waiting interval for each retry to pull data. It is recommended to increase the interval duration (for example, to 60s) to enhance the stability of shuffle operations. -
spark.shuffle.memoryFraction
Default value:0.2
Parameter description: This parameter represents the proportion of Executor memory allocated for shuffle read tasks to perform aggregation operations, with a default of 20%. -
spark.shuffle.manager
Default value:sort
Parameter description: This parameter is used to set the type of ShuffleManager. After Spark 1.5, there are three options:hash
,sort
, andtungsten-sort
. HashShuffleManager was the default option before Spark 1.2, but both Spark 1.2 and later versions default to SortShuffleManager. Tungsten-sort is similar to sort but uses the off-heap memory management mechanism planned in Tungsten, making memory usage more efficient.