Spark 調優總結#
內容整理自:
三萬字長文 | Spark 性能優化實戰手冊
開發調優#
-
避免創建重複 RDD
-
儘可能重用同一個 RDD
-
對多次使用的 RDD 進行持久化
-
儘量避免使用 shuffle 類算子
-
使用 map-side 預聚合的 shuffle 操作,比如使用 reduceByKey 或者 aggregateByKey 算子來替代掉 groupByKey 算子
-
使用高性能的算子,比如
mapPartitions替代普通map
、使用 foreachPartitions 替代 foreach、使用 filter 之後進行 coalesce 操作、使用 repartitionAndSortWithinPartitions 替代 repartition 與 sort 類操作 -
廣播大變量
-
使用 Kryo 優化序列化性能
-
優化數據結構:儘量使用字符串替換對象,使用原始類型 (int、Long) 替換字符串、使用數組替代集合
資源調優#
Spark 運行流程:
資源參數調優#
num-executors
: 用於設置 Spark 作業總共要用多少個 Executor 進程來執行executor-memory
: 該參數用於設置每個 Executor 進程的內存。Executor 內存的大小,很多時候直接決定了 Spark 作業的性能,而且跟常見的 JVM OOM 異常,也有直接的關聯executor-cores
: 該參數用於設置每個 Executor 進程的 CPU core 數量。這個參數決定了每個 Executor 進程並行執行 task 線程的能力driver-memory
: 用於設置 Driver 進程的內存,Driver 的內存通常來說不設置,或者設置 1G 左右應該就夠了,需要注意的是如果使用 collect 算子拉取所有數據到 driver 端,需要保證 Driver 有足夠的內存spark.default.parallelism
: 用於設置每個 stage 的默認 task 數量,官網建議的設置原則為num-executors * executor-cores的2~3倍
較為合適,比如 Executor 的總 CPU core 數量為 300 個,那麼設置 1000 個 task 是可以的spark.storage.memoryFraction
: 用於設置 RDD 持久化數據在 Executor 內存中能占的比例,默認是 0.6, 根據你選擇的不同的持久化策略,如果內存不夠時,可能數據就不會持久化,或者數據會寫入磁碟spark.shuffle.memoryFraction
: 用於設置 shuffle 過程中一個 task 拉取到上個 stage 的 task 的輸出後,進行聚合操作時能夠使用的 Executor 內存的比例,默認是 0.2
資源參數參考示例#
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
數據傾斜調優#
- 絕大多數 task 執行很快,個別 task 卻執行很慢,而 Spark 運行進度取決於耗時最長的那個 task。
- 在 shuffle 階段,如果某個 key 對應的數據量特別大,就會發生數據傾斜
調優思路#
- 找到數據傾斜發生在哪個 stage 中:只要看到 Spark 代碼中出現了一個 shuffle 類算子或者是 Spark SQL 的 SQL 語句中出現了會導致 shuffle 的語句(比如 group by 語句),那麼就可以判定,以那個地方為界限劃分出了前後兩個 stage
- 通過 Spark Web UI 查看報錯的那個 stage 的各個 task 的運行時間以及分配的數據量,然後通過 log 異常棧定位到具體代碼
- 查看導致數據傾斜的 key 的數據分佈情況,選擇不同的方案解決:
- 如果是 Spark SQL 中的 group by、join 語句導致的數據傾斜,那麼就查詢一下 SQL 中使用的表的 key 分佈情況。
- 如果是對 Spark RDD 執行 shuffle 算子導致的數據傾斜,那麼可以在 Spark 作業中加入查看 key 分佈的代碼,比如 RDD.countByKey ()。然後對統計出來的各個 key 出現的次數,collect/take 到客戶端打印一下,就可以看到 key 的分佈情況
數據傾斜的解決方案#
解決方案一:過濾少數導致傾斜的 key#
如果我們判斷那少數幾個數據量特別多的 key,對作業的執行和計算結果不是特別重要的話,那麼乾脆就直接過濾掉那少數幾個 key。比如在 Spark SQL 中可以使用 where 子句過濾掉這些 key 或者在 Spark Core 中對 RDD 執行 filter 算子過濾掉這些 key
解決方案二:提高 shuffle 操作的並行度#
方案實現思路:
在對 RDD 執行 shuffle 算子時,給 shuffle 算子傳入一個參數,比如 reduceByKey (1000),該參數就設置了這個 shuffle 算子執行時 shuffle read task 的數量。對於 Spark SQL 中的 shuffle 類語句,比如 group by、join 等,需要設置一個參數,即 spark.sql.shuffle.partitions,該參數代表了 shuffle read task 的並行度,該值默認是 200,對於很多場景來說都有點過小
方案實現原理:
增加 shuffle read task 的數量,可以讓原本分配給一個 task 的多個 key 分配給多個 task,從而讓每個 task 處理比原來更少的數據。舉例來說,如果原本有 5 個 key,每個 key 對應 10 條數據,這 5 個 key 都是分配給一個 task 的,那麼這個 task 就要處理 50 條數據。而增加了 shuffle read task 以後,每個 task 就分配到一個 key,即每個 task 就處理 10 條數據,那麼自然每個 task 的執行時間都會變短了
解決方案三:兩階段聚合(局部聚合 + 全局聚合)#
實現思路:
進行兩階段聚合。第一次是局部聚合,先給每個 key 都打上一個隨機數,比如 10 以內的隨機數,此時原先一樣的 key 就變成不一樣的了,比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成 (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機數後的數據,執行 reduceByKey 等聚合操作,進行局部聚合,那麼局部聚合結果,就會變成了 (1_hello, 2) (2_hello, 2)。然後將各個 key 的前綴給去掉,就會變成 (hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果了,比如 (hello, 4)
實現代碼:
// 第一步,給RDD中的每個key都打上一個隨機前綴。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(new PairFunction<Tuple2<Long,Long>, String, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(Tuple2<Long, Long> tuple) throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
}
});
// 第二步,對打上隨機前綴的key進行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
// 第三步,去除RDD中每個key的隨機前綴。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Long>(originalKey, tuple._2);
}
});
// 第四步,對去除了隨機前綴的RDD進行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
方案優點:
對於聚合類的 shuffle 操作導致的數據傾斜,效果是非常不錯的。通常都可以解決掉數據傾斜,或者至少是大幅度緩解數據傾斜,將 Spark 作業的性能提升數倍以上。
方案缺點:
僅僅適用於聚合類的 shuffle 操作,適用範圍相對較窄。如果是 join 類的 shuffle 操作,還得用其他的解決方案
解決方案四:將 reduce join 轉為 map join#
實現思路:
不使用 join 算子進行連接操作,而使用 Broadcast 變量與 map 類算子實現 join 操作,進而完全規避掉 shuffle 類的操作,徹底避免數據傾斜的發生和出現
實現原理:
普通的 join 是會走 shuffle 過程的,而一旦 shuffle,就相當於會將相同 key 的數據拉取到一個 shuffle read task 中再進行 join,此時就是 reduce join。但是如果一個 RDD 是比較小的,則可以採用廣播小 RDD 全量數據 + map 算子來實現與 join 同樣的效果,也就是 map join,此時就不會發生 shuffle 操作,也就不會發生數據傾斜
實現代碼
// 首先將數量比較小的RDD的數據,collect到Driver中來。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然後使用Spark的廣播功能,將小RDD的數據轉換成廣播變量,這樣每個Executor就只有一份RDD的數據。
// 可以儘可能節省內存空間,並且減少網絡傳輸性能開銷。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
// 對另外一個RDD執行map類操作,而不再是join類操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple) throws Exception {
// 在算子函數中,通過廣播變量,獲取到本地Executor中的rdd1數據。
List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
// 可以將rdd1的數據轉換為一個Map,便於後面進行join操作。
Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
for(Tuple2<Long, Row> data : rdd1Data) {
rdd1DataMap.put(data._1, data._2);
}
// 獲取當前RDD數據的key以及value。
String key = tuple._1;
String value = tuple._2;
// 從rdd1數據Map中,根據key獲取到可以join到的數據。
Row rdd1Value = rdd1DataMap.get(key);
return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
}
});
// 這裡得提示一下。
// 上面的做法,僅僅適用於rdd1中的key沒有重複,全部是唯一的場景。
// 如果rdd1中有多個相同的key,那麼就得用flatMap類的操作,在進行join的時候不能用map,而是得遍歷rdd1所有數據進行join。
// rdd2中每條數據都可能會返回多條join後的數據。
解決方案五:採樣傾斜 key 並分拆 join 操作#
方案適用場景:
兩個 RDD/Hive 表進行 join 的時候,如果數據量都比較大,那麼此時可以看一下兩個 RDD/Hive 表中的 key 分佈情況。如果出現數據傾斜,是因為其中某一個 RDD/Hive 表中的少數幾個 key 的數據量過大,而另一個 RDD/Hive 表中的所有 key 都分佈比較均勻,那麼採用這個解決方案是比較合適的。
方案實現思路:
- 對包含少數幾個數據量過大的 key 的那個 RDD,通過 sample 算子採樣出一份樣本來,然後統計一下每個 key 的數量,計算出來數據量最大的是哪幾個 key。
- 然後將這幾個 key 對應的數據從原來的 RDD 中拆分出來,形成一個單獨的 RDD,並給每個 key 都打上 n 以內的隨機數作為前綴,而不會導致傾斜的大部分 key 形成另外一個 RDD。
- 接著將需要 join 的另一個 RDD,也過濾出來那幾個傾斜 key 對應的數據並形成一個單獨的 RDD,將每條數據膨脹成 n 條數據,這 n 條數據都按順序附加一個 0~n 的前綴,不會導致傾斜的大部分 key 也形成另外一個 RDD。
- 再將附加了隨機前綴的獨立 RDD 與另一個膨脹 n 倍的獨立 RDD 進行 join,此時就可以將原先相同的 key 打散成 n 份,分散到多個 task 中去進行 join 了。
- 而另外兩個普通的 RDD 就照常 join 即可。
- 最後將兩次 join 的結果使用 union 算子合併起來即可,就是最終的 join 結果。
方案實現原理:
對於 join 導致的數據傾斜,如果只是某幾個 key 導致了傾斜,可以將少數幾個 key 分拆成獨立 RDD,並附加隨機前綴打散成 n 份去進行 join,此時這幾個 key 對應的數據就不會集中在少數幾個 task 上,而是分散到多個 task 進行 join 了,如圖:
解決方案六:使用隨機前綴和擴容 RDD 進行 join#
方案適用場景:如果在進行 join 操作時,RDD 中有大量的 key 導致數據傾斜,那麼進行分拆 key 也沒什麼意義,此時就只能使用最後一種方案來解決問題了。
方案實現思路:
- 類似 “解決方案五 “,首先查看 RDD/Hive 表中的數據分佈情況,找到那個造成數據傾斜的 RDD/Hive 表,比如有多個 key 都對應了超過 1 萬條數據。
- 然後將該 RDD 的每條數據都打上一個 n 以內的隨機前綴。
- 同時對另外一個正常的 RDD 進行擴容,將每條數據都擴容成 n 條數據,擴容出來的每條數據都依次打上一個 0~n 的前綴。
- 最後將兩個處理後的 RDD 進行 join 即可。
方案實現原理:
將原先一樣的 key 通過附加隨機前綴變成不一樣的 key,然後就可以將這些處理後的 “不同 key” 分散到多個 task 中去處理,而不是讓一個 task 處理大量的相同 key。該方案與 “解決方案五” 的不同之處就在於,上一種方案是儘量只對少數傾斜 key 對應的數據進行特殊處理,由於處理過程需要擴容 RDD,因此上一種方案擴容 RDD 後對內存的占用並不大;而這一種方案是針對有大量傾斜 key 的情況,沒法將部分 key 拆分出來進行單獨處理,因此只能對整個 RDD 進行數據擴容,對內存資源要求很高。
** 方案優點:** 對 join 類型的數據傾斜基本都可以處理,而且效果也相對比較顯著,性能提升效果非常不錯。
** 方案缺點:** 該方案更多的是緩解數據傾斜,而不是徹底避免數據傾斜。而且需要對整個 RDD 進行擴容,對內存資源要求很高
shuffle 調優#
在 Spark 1.2 以後的版本中,默認的HashShuffleManager
改成了SortShuffleManager
。SortShuffleManager 相較於 HashShuffleManager 來說,有了一定的改進。主要就在於,每個 Task 在進行 shuffle 操作時,雖然也會產生較多的臨時磁碟文件,但是最後會將所有的臨時文件合併(merge)成一個磁碟文件,因此每個 Task 就只有一個磁碟文件。在下一個 stage 的 shuffle read task 拉取自己的數據時,只要根據索引讀取每個磁碟文件中的部分數據即可
HashShuffleManager 的優化#
設置spark.shuffle.consolidateFiles
, 默認為 false。
開啟consolidate機制
之後,會出現shuffleFileGroup
的概念。consolidate 機制允許不同的 task 復用同一批磁碟文件,這樣就可以有效將多個 task 的磁碟文件進行一定程度上的合併,從而大幅度減少磁碟文件的數量,進而提升 shuffle write 的性能
SortShuffleManager#
SortShuffleManager 的運行機制主要分成兩種:
- 一種是普通運行機制
- 另一種是 bypass 運行機制。當 shuffle read task 的數量小於等於
spark.shuffle.sort.bypassMergeThreshold
參數的值(默認為 200),就會啟用 bypass 機制
shuffle 相關參數調優#
-
spark.shuffle.file.buffer
默認值:32k
參數說明:該參數用於設置 shuffle write task 的 BufferedOutputStream 的 buffer 緩衝大小。將數據寫到磁碟文件之前,會先寫入 buffer 緩衝中,待緩衝寫滿之後,才會溢寫到磁碟 -
spark.reducer.maxSizeInFlight
默認值:48m
參數說明:該參數用於設置 shuffle read task 的 buffer 緩衝大小,而這個 buffer 緩衝決定了每次能夠拉取多少數據。如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小 -
spark.shuffle.io.maxRetries
默認值:3
參數說明:shuffle read task 從 shuffle write task 所在節點拉取屬於自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗 -
spark.shuffle.io.retryWait
默認值:5s
參數說明:代表了每次重試拉取數據的等待間隔。建議加大間隔時長(比如 60s),以增加 shuffle 操作的穩定性 -
spark.shuffle.memoryFraction
默認值:0.2
參數說明:該參數代表了 Executor 內存中,分配給 shuffle read task 進行聚合操作的內存比例,默認是 20% -
spark.shuffle.manager
默認值:sort
參數說明:該參數用於設置 ShuffleManager 的類型。Spark 1.5 以後,有三個可選項:hash
、sort
和tungsten-sort
。HashShuffleManager 是 Spark 1.2 以前的默認選項,但是 Spark 1.2 以及之後的版本默認都是 SortShuffleManager 了。tungsten-sort 與 sort 類似,但是使用了 tungsten 計劃中的堆外內存管理機制,內存使用效率更高