Spark 调优总结#
内容整理自:
三万字长文 | Spark 性能优化实战手册
开发调优#
-
避免创建重复 RDD
-
尽可能复用同一个 RDD
-
对多次使用的 RDD 进行持久化
-
尽量避免使用 shuffle 类算子
-
使用 map-side 预聚合的 shuffle 操作,比如使用 reduceByKey 或者 aggregateByKey 算子来替代掉 groupByKey 算子
-
使用高性能的算子,比如
mapPartitions替代普通map
、使用 foreachPartitions 替代 foreach、使用 filter 之后进行 coalesce 操作、使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作 -
广播大变量
-
使用 Kryo 优化序列化性能
-
优化数据结构:尽量使用字符串替换对象,使用原始类型 (int、Long) 替换字符串、使用数组替代集合
资源调优#
Spark 运行流程:
资源参数调优#
num-executors
: 用于设置 Spark 作业总共要用多少个 Executor 进程来执行executor-memory
: 该参数用于设置每个 Executor 进程的内存。Executor 内存的大小,很多时候直接决定了 Spark 作业的性能,而且跟常见的 JVM OOM 异常,也有直接的关联executor-cores
: 该参数用于设置每个 Executor 进程的 CPU core 数量。这个参数决定了每个 Executor 进程并行执行 task 线程的能力driver-memory
: 用于设置 Driver 进程的内存,Driver 的内存通常来说不设置,或者设置 1G 左右应该就够了,需要注意的是如果使用 collect 算子拉取所有数据到 driver 端,需要保证 Driver 有足够的内存spark.default.parallelism
: 用于设置每个 stage 的默认 task 数量,官网建议的设置原则为num-executors * executor-cores的2~3倍
较为合适,比如 Executor 的总 CPU core 数量为 300 个,那么设置 1000 个 task 是可以的spark.storage.memoryFraction
: 用于设置 RDD 持久化数据在 Executor 内存中能占的比例,默认是 0.6, 根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘spark.shuffle.memoryFraction
: 用于设置 shuffle 过程中一个 task 拉取到上个 stage 的 task 的输出后,进行聚合操作时能够使用的 Executor 内存的比例,默认是 0.2
资源参数参考示例#
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
数据倾斜调优#
- 绝大多数 task 执行很快,个别 task 却执行很慢,而 Spark 运行进度取决于耗时最长的那个 task。
- 在 shuffle 阶段,如果某个 key 对应的数据量特别大,就会发生数据倾斜
调优思路#
- 找到数据倾斜发生在哪个 stage 中:只要看到 Spark 代码中出现了一个 shuffle 类算子或者是 Spark SQL 的 SQL 语句中出现了会导致 shuffle 的语句(比如 group by 语句),那么就可以判定,以那个地方为界限划分出了前后两个 stage
- 通过 Spark Web UI 查看报错的那个 stage 的各个 task 的运行时间以及分配的数据量,然后通过 log 异常栈定位到具体代码
- 查看导致数据倾斜的 key 的数据分布情况,选择不同的方案解决:
- 如果是 Spark SQL 中的 group by、join 语句导致的数据倾斜,那么就查询一下 SQL 中使用的表的 key 分布情况。
- 如果是对 Spark RDD 执行 shuffle 算子导致的数据倾斜,那么可以在 Spark 作业中加入查看 key 分布的代码,比如 RDD.countByKey ()。然后对统计出来的各个 key 出现的次数,collect/take 到客户端打印一下,就可以看到 key 的分布情况
数据倾斜的解决方案#
解决方案一:过滤少数导致倾斜的 key#
如果我们判断那少数几个数据量特别多的 key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个 key。比如在 Spark SQL 中可以使用 where 子句过滤掉这些 key 或者在 Spark Core 中对 RDD 执行 filter 算子过滤掉这些 key
解决方案二:提高 shuffle 操作的并行度#
方案实现思路:
在对 RDD 执行 shuffle 算子时,给 shuffle 算子传入一个参数,比如 reduceByKey (1000),该参数就设置了这个 shuffle 算子执行时 shuffle read task 的数量。对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小
方案实现原理:
增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个 task 处理比原来更少的数据。举例来说,如果原本有 5 个 key,每个 key 对应 10 条数据,这 5 个 key 都是分配给一个 task 的,那么这个 task 就要处理 50 条数据。而增加了 shuffle read task 以后,每个 task 就分配到一个 key,即每个 task 就处理 10 条数据,那么自然每个 task 的执行时间都会变短了
解决方案三:两阶段聚合(局部聚合 + 全局聚合)#
实现思路:
进行两阶段聚合。第一次是局部聚合,先给每个 key 都打上一个随机数,比如 10 以内的随机数,此时原先一样的 key 就变成不一样的了,比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成 (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行 reduceByKey 等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了 (1_hello, 2) (2_hello, 2)。然后将各个 key 的前缀给去掉,就会变成 (hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如 (hello, 4)
实现代码:
// 第一步,给RDD中的每个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(new PairFunction<Tuple2<Long,Long>, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(Tuple2<Long, Long> tuple) throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
}
});
// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
// 第三步,去除RDD中每个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Long>(originalKey, tuple._2);
}
});
// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
方案优点:
对于聚合类的 shuffle 操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将 Spark 作业的性能提升数倍以上。
方案缺点:
仅仅适用于聚合类的 shuffle 操作,适用范围相对较窄。如果是 join 类的 shuffle 操作,还得用其他的解决方案
解决方案四:将 reduce join 转为 map join#
实现思路:
不使用 join 算子进行连接操作,而使用 Broadcast 变量与 map 类算子实现 join 操作,进而完全规避掉 shuffle 类的操作,彻底避免数据倾斜的发生和出现
实现原理:
普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。但是如果一个 RDD 是比较小的,则可以采用广播小 RDD 全量数据 + map 算子来实现与 join 同样的效果,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜
实现代码
// 首先将数据量比较小的RDD的数据,collect到Driver中来。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
// 可以尽可能节省内存空间,并且减少网络传输性能开销。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
// 对另外一个RDD执行map类操作,而不再是join类操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple) throws Exception {
// 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。
List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
// 可以将rdd1的数据转换为一个Map,便于后面进行join操作。
Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
for(Tuple2<Long, Row> data : rdd1Data) {
rdd1DataMap.put(data._1, data._2);
}
// 获取当前RDD数据的key以及value。
String key = tuple._1;
String value = tuple._2;
// 从rdd1数据Map中,根据key获取到可以join到的数据。
Row rdd1Value = rdd1DataMap.get(key);
return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
}
});
// 这里得提示一下。
// 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。
// 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。
// rdd2中每条数据都可能会返回多条join后的数据。
解决方案五:采样倾斜 key 并分拆 join 操作#
方案适用场景:
两个 RDD/Hive 表进行 join 的时候,如果数据量都比较大,那么此时可以看一下两个 RDD/Hive 表中的 key 分布情况。如果出现数据倾斜,是因为其中某一个 RDD/Hive 表中的少数几个 key 的数据量过大,而另一个 RDD/Hive 表中的所有 key 都分布比较均匀,那么采用这个解决方案是比较合适的。
方案实现思路:
- 对包含少数几个数据量过大的 key 的那个 RDD,通过 sample 算子采样出一份样本来,然后统计一下每个 key 的数量,计算出来数据量最大的是哪几个 key。
- 然后将这几个 key 对应的数据从原来的 RDD 中拆分出来,形成一个单独的 RDD,并给每个 key 都打上 n 以内的随机数作为前缀,而不会导致倾斜的大部分 key 形成另外一个 RDD。
- 接着将需要 join 的另一个 RDD,也过滤出来那几个倾斜 key 对应的数据并形成一个单独的 RDD,将每条数据膨胀成 n 条数据,这 n 条数据都按顺序附加一个 0~n 的前缀,不会导致倾斜的大部分 key 也形成另外一个 RDD。
- 再将附加了随机前缀的独立 RDD 与另一个膨胀 n 倍的独立 RDD 进行 join,此时就可以将原先相同的 key 打散成 n 份,分散到多个 task 中去进行 join 了。
- 而另外两个普通的 RDD 就照常 join 即可。
- 最后将两次 join 的结果使用 union 算子合并起来即可,就是最终的 join 结果。
方案实现原理:
对于 join 导致的数据倾斜,如果只是某几个 key 导致了倾斜,可以将少数几个 key 分拆成独立 RDD,并附加随机前缀打散成 n 份去进行 join,此时这几个 key 对应的数据就不会集中在少数几个 task 上,而是分散到多个 task 进行 join 了,如图:
解决方案六:使用随机前缀和扩容 RDD 进行 join#
方案适用场景:如果在进行 join 操作时,RDD 中有大量的 key 导致数据倾斜,那么进行分拆 key 也没什么意义,此时就只能使用最后一种方案来解决问题了。
方案实现思路:
- 类似 “解决方案五 “,首先查看 RDD/Hive 表中的数据分布情况,找到那个造成数据倾斜的 RDD/Hive 表,比如有多个 key 都对应了超过 1 万条数据。
- 然后将该 RDD 的每条数据都打上一个 n 以内的随机前缀。
- 同时对另外一个正常的 RDD 进行扩容,将每条数据都扩容成 n 条数据,扩容出来的每条数据都依次打上一个 0~n 的前缀。
- 最后将两个处理后的 RDD 进行 join 即可。
方案实现原理:
将原先一样的 key 通过附加随机前缀变成不一样的 key,然后就可以将这些处理后的 “不同 key” 分散到多个 task 中去处理,而不是让一个 task 处理大量的相同 key。该方案与 “解决方案五” 的不同之处就在于,上一种方案是尽量只对少数倾斜 key 对应的数据进行特殊处理,由于处理过程需要扩容 RDD,因此上一种方案扩容 RDD 后对内存的占用并不大;而这一种方案是针对有大量倾斜 key 的情况,没法将部分 key 拆分出来进行单独处理,因此只能对整个 RDD 进行数据扩容,对内存资源要求很高。
** 方案优点:** 对 join 类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。
** 方案缺点:** 该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个 RDD 进行扩容,对内存资源要求很高
shuffle 调优#
在 Spark 1.2 以后的版本中,默认的HashShuffleManager
改成了SortShuffleManager
。SortShuffleManager 相较于 HashShuffleManager 来说,有了一定的改进。主要就在于,每个 Task 在进行 shuffle 操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个 Task 就只有一个磁盘文件。在下一个 stage 的 shuffle read task 拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可
HashShuffleManager 的优化#
设置spark.shuffle.consolidateFiles
, 默认为 false。
开启consolidate机制
之后,会出现shuffleFileGroup
的概念。consolidate 机制允许不同的 task 复用同一批磁盘文件,这样就可以有效将多个 task 的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升 shuffle write 的性能
SortShuffleManager#
SortShuffleManager 的运行机制主要分成两种:
- 一种是普通运行机制
- 另一种是 bypass 运行机制。当 shuffle read task 的数量小于等于
spark.shuffle.sort.bypassMergeThreshold
参数的值时(默认为 200),就会启用 bypass 机制
shuffle 相关参数调优#
-
spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置 shuffle write task 的 BufferedOutputStream 的 buffer 缓冲大小。将数据写到磁盘文件之前,会先写入 buffer 缓冲中,待缓冲写满之后,才会溢写到磁盘 -
spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置 shuffle read task 的 buffer 缓冲大小,而这个 buffer 缓冲决定了每次能够拉取多少数据。如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小 -
spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task 从 shuffle write task 所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败 -
spark.shuffle.io.retryWait
默认值:5s
参数说明:代表了每次重试拉取数据的等待间隔。建议加大间隔时长(比如 60s),以增加 shuffle 操作的稳定性 -
spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了 Executor 内存中,分配给 shuffle read task 进行聚合操作的内存比例,默认是 20% -
spark.shuffle.manager
默认值:sort
参数说明:该参数用于设置 ShuffleManager 的类型。Spark 1.5 以后,有三个可选项:hash
、sort
和tungsten-sort
。HashShuffleManager 是 Spark 1.2 以前的默认选项,但是 Spark 1.2 以及之后的版本默认都是 SortShuffleManager 了。tungsten-sort 与 sort 类似,但是使用了 tungsten 计划中的堆外内存管理机制,内存使用效率更高