Yige

Yige

Build

Spark進階-Spark調優總結

Spark 調優總結#

內容整理自:
三萬字長文 | Spark 性能優化實戰手冊

開發調優#

  • 避免創建重複 RDD

  • 儘可能重用同一個 RDD

  • 對多次使用的 RDD 進行持久化

  • 儘量避免使用 shuffle 類算子

  • 使用 map-side 預聚合的 shuffle 操作,比如使用 reduceByKey 或者 aggregateByKey 算子來替代掉 groupByKey 算子

  • 使用高性能的算子,比如mapPartitions替代普通map、使用 foreachPartitions 替代 foreach、使用 filter 之後進行 coalesce 操作、使用 repartitionAndSortWithinPartitions 替代 repartition 與 sort 類操作

  • 廣播大變量

  • 使用 Kryo 優化序列化性能

  • 優化數據結構:儘量使用字符串替換對象,使用原始類型 (int、Long) 替換字符串、使用數組替代集合

資源調優#

Spark 運行流程:
image.png

資源參數調優#

  • num-executors: 用於設置 Spark 作業總共要用多少個 Executor 進程來執行
  • executor-memory: 該參數用於設置每個 Executor 進程的內存。Executor 內存的大小,很多時候直接決定了 Spark 作業的性能,而且跟常見的 JVM OOM 異常,也有直接的關聯
  • executor-cores: 該參數用於設置每個 Executor 進程的 CPU core 數量。這個參數決定了每個 Executor 進程並行執行 task 線程的能力
  • driver-memory: 用於設置 Driver 進程的內存,Driver 的內存通常來說不設置,或者設置 1G 左右應該就夠了,需要注意的是如果使用 collect 算子拉取所有數據到 driver 端,需要保證 Driver 有足夠的內存
  • spark.default.parallelism: 用於設置每個 stage 的默認 task 數量,官網建議的設置原則為num-executors * executor-cores的2~3倍較為合適,比如 Executor 的總 CPU core 數量為 300 個,那麼設置 1000 個 task 是可以的
  • spark.storage.memoryFraction: 用於設置 RDD 持久化數據在 Executor 內存中能占的比例,默認是 0.6, 根據你選擇的不同的持久化策略,如果內存不夠時,可能數據就不會持久化,或者數據會寫入磁碟
  • spark.shuffle.memoryFraction: 用於設置 shuffle 過程中一個 task 拉取到上個 stage 的 task 的輸出後,進行聚合操作時能夠使用的 Executor 內存的比例,默認是 0.2

資源參數參考示例#

./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \

數據傾斜調優#

  • 絕大多數 task 執行很快,個別 task 卻執行很慢,而 Spark 運行進度取決於耗時最長的那個 task。
  • 在 shuffle 階段,如果某個 key 對應的數據量特別大,就會發生數據傾斜

調優思路#

  • 找到數據傾斜發生在哪個 stage 中:只要看到 Spark 代碼中出現了一個 shuffle 類算子或者是 Spark SQL 的 SQL 語句中出現了會導致 shuffle 的語句(比如 group by 語句),那麼就可以判定,以那個地方為界限劃分出了前後兩個 stage
  • 通過 Spark Web UI 查看報錯的那個 stage 的各個 task 的運行時間以及分配的數據量,然後通過 log 異常棧定位到具體代碼
  • 查看導致數據傾斜的 key 的數據分佈情況,選擇不同的方案解決:
    1. 如果是 Spark SQL 中的 group by、join 語句導致的數據傾斜,那麼就查詢一下 SQL 中使用的表的 key 分佈情況。
    2. 如果是對 Spark RDD 執行 shuffle 算子導致的數據傾斜,那麼可以在 Spark 作業中加入查看 key 分佈的代碼,比如 RDD.countByKey ()。然後對統計出來的各個 key 出現的次數,collect/take 到客戶端打印一下,就可以看到 key 的分佈情況

數據傾斜的解決方案#

解決方案一:過濾少數導致傾斜的 key#

如果我們判斷那少數幾個數據量特別多的 key,對作業的執行和計算結果不是特別重要的話,那麼乾脆就直接過濾掉那少數幾個 key。比如在 Spark SQL 中可以使用 where 子句過濾掉這些 key 或者在 Spark Core 中對 RDD 執行 filter 算子過濾掉這些 key

解決方案二:提高 shuffle 操作的並行度#

方案實現思路:
在對 RDD 執行 shuffle 算子時,給 shuffle 算子傳入一個參數,比如 reduceByKey (1000),該參數就設置了這個 shuffle 算子執行時 shuffle read task 的數量。對於 Spark SQL 中的 shuffle 類語句,比如 group by、join 等,需要設置一個參數,即 spark.sql.shuffle.partitions,該參數代表了 shuffle read task 的並行度,該值默認是 200,對於很多場景來說都有點過小

方案實現原理:
增加 shuffle read task 的數量,可以讓原本分配給一個 task 的多個 key 分配給多個 task,從而讓每個 task 處理比原來更少的數據。舉例來說,如果原本有 5 個 key,每個 key 對應 10 條數據,這 5 個 key 都是分配給一個 task 的,那麼這個 task 就要處理 50 條數據。而增加了 shuffle read task 以後,每個 task 就分配到一個 key,即每個 task 就處理 10 條數據,那麼自然每個 task 的執行時間都會變短了

解決方案三:兩階段聚合(局部聚合 + 全局聚合)#

實現思路:
進行兩階段聚合。第一次是局部聚合,先給每個 key 都打上一個隨機數,比如 10 以內的隨機數,此時原先一樣的 key 就變成不一樣的了,比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成 (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機數後的數據,執行 reduceByKey 等聚合操作,進行局部聚合,那麼局部聚合結果,就會變成了 (1_hello, 2) (2_hello, 2)。然後將各個 key 的前綴給去掉,就會變成 (hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果了,比如 (hello, 4)

實現代碼:

// 第一步,給RDD中的每個key都打上一個隨機前綴。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(new PairFunction<Tuple2<Long,Long>, String, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, Long> call(Tuple2<Long, Long> tuple) throws Exception {
        Random random = new Random();
        int prefix = random.nextInt(10);
        return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
    }
});
// 第二步,對打上隨機前綴的key進行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Long call(Long v1, Long v2) throws Exception {
        return v1 + v2;
    }
});
// 第三步,去除RDD中每個key的隨機前綴。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)throws Exception {
        long originalKey = Long.valueOf(tuple._1.split("_")[1]);
        return new Tuple2<Long, Long>(originalKey, tuple._2);
    }
});
// 第四步,對去除了隨機前綴的RDD進行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Long call(Long v1, Long v2) throws Exception {
        return v1 + v2;
    }
});

方案優點:
對於聚合類的 shuffle 操作導致的數據傾斜,效果是非常不錯的。通常都可以解決掉數據傾斜,或者至少是大幅度緩解數據傾斜,將 Spark 作業的性能提升數倍以上。

方案缺點:
僅僅適用於聚合類的 shuffle 操作,適用範圍相對較窄。如果是 join 類的 shuffle 操作,還得用其他的解決方案

解決方案四:將 reduce join 轉為 map join#

實現思路:
不使用 join 算子進行連接操作,而使用 Broadcast 變量與 map 類算子實現 join 操作,進而完全規避掉 shuffle 類的操作,徹底避免數據傾斜的發生和出現

實現原理:
普通的 join 是會走 shuffle 過程的,而一旦 shuffle,就相當於會將相同 key 的數據拉取到一個 shuffle read task 中再進行 join,此時就是 reduce join。但是如果一個 RDD 是比較小的,則可以採用廣播小 RDD 全量數據 + map 算子來實現與 join 同樣的效果,也就是 map join,此時就不會發生 shuffle 操作,也就不會發生數據傾斜

實現代碼

// 首先將數量比較小的RDD的數據,collect到Driver中來。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// 然後使用Spark的廣播功能,將小RDD的數據轉換成廣播變量,這樣每個Executor就只有一份RDD的數據。
// 可以儘可能節省內存空間,並且減少網絡傳輸性能開銷。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
// 對另外一個RDD執行map類操作,而不再是join類操作。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple) throws Exception {
        // 在算子函數中,通過廣播變量,獲取到本地Executor中的rdd1數據。
        List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
        // 可以將rdd1的數據轉換為一個Map,便於後面進行join操作。
        Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
        for(Tuple2<Long, Row> data : rdd1Data) {
            rdd1DataMap.put(data._1, data._2);
        }
        // 獲取當前RDD數據的key以及value。
        String key = tuple._1;
        String value = tuple._2;
        // 從rdd1數據Map中,根據key獲取到可以join到的數據。
        Row rdd1Value = rdd1DataMap.get(key);
        return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
    }
});
// 這裡得提示一下。
// 上面的做法,僅僅適用於rdd1中的key沒有重複,全部是唯一的場景。
// 如果rdd1中有多個相同的key,那麼就得用flatMap類的操作,在進行join的時候不能用map,而是得遍歷rdd1所有數據進行join。
// rdd2中每條數據都可能會返回多條join後的數據。

解決方案五:採樣傾斜 key 並分拆 join 操作#

方案適用場景:
兩個 RDD/Hive 表進行 join 的時候,如果數據量都比較大,那麼此時可以看一下兩個 RDD/Hive 表中的 key 分佈情況。如果出現數據傾斜,是因為其中某一個 RDD/Hive 表中的少數幾個 key 的數據量過大,而另一個 RDD/Hive 表中的所有 key 都分佈比較均勻,那麼採用這個解決方案是比較合適的。
方案實現思路:

  1. 對包含少數幾個數據量過大的 key 的那個 RDD,通過 sample 算子採樣出一份樣本來,然後統計一下每個 key 的數量,計算出來數據量最大的是哪幾個 key。
  2. 然後將這幾個 key 對應的數據從原來的 RDD 中拆分出來,形成一個單獨的 RDD,並給每個 key 都打上 n 以內的隨機數作為前綴,而不會導致傾斜的大部分 key 形成另外一個 RDD。
  3. 接著將需要 join 的另一個 RDD,也過濾出來那幾個傾斜 key 對應的數據並形成一個單獨的 RDD,將每條數據膨脹成 n 條數據,這 n 條數據都按順序附加一個 0~n 的前綴,不會導致傾斜的大部分 key 也形成另外一個 RDD。
  4. 再將附加了隨機前綴的獨立 RDD 與另一個膨脹 n 倍的獨立 RDD 進行 join,此時就可以將原先相同的 key 打散成 n 份,分散到多個 task 中去進行 join 了。
  5. 而另外兩個普通的 RDD 就照常 join 即可。
  6. 最後將兩次 join 的結果使用 union 算子合併起來即可,就是最終的 join 結果。

方案實現原理:
對於 join 導致的數據傾斜,如果只是某幾個 key 導致了傾斜,可以將少數幾個 key 分拆成獨立 RDD,並附加隨機前綴打散成 n 份去進行 join,此時這幾個 key 對應的數據就不會集中在少數幾個 task 上,而是分散到多個 task 進行 join 了,如圖:

解決方案六:使用隨機前綴和擴容 RDD 進行 join#

方案適用場景:如果在進行 join 操作時,RDD 中有大量的 key 導致數據傾斜,那麼進行分拆 key 也沒什麼意義,此時就只能使用最後一種方案來解決問題了。
方案實現思路:

  1. 類似 “解決方案五 “,首先查看 RDD/Hive 表中的數據分佈情況,找到那個造成數據傾斜的 RDD/Hive 表,比如有多個 key 都對應了超過 1 萬條數據。
  2. 然後將該 RDD 的每條數據都打上一個 n 以內的隨機前綴。
  3. 同時對另外一個正常的 RDD 進行擴容,將每條數據都擴容成 n 條數據,擴容出來的每條數據都依次打上一個 0~n 的前綴。
  4. 最後將兩個處理後的 RDD 進行 join 即可。

方案實現原理:
將原先一樣的 key 通過附加隨機前綴變成不一樣的 key,然後就可以將這些處理後的 “不同 key” 分散到多個 task 中去處理,而不是讓一個 task 處理大量的相同 key。該方案與 “解決方案五” 的不同之處就在於,上一種方案是儘量只對少數傾斜 key 對應的數據進行特殊處理,由於處理過程需要擴容 RDD,因此上一種方案擴容 RDD 後對內存的占用並不大;而這一種方案是針對有大量傾斜 key 的情況,沒法將部分 key 拆分出來進行單獨處理,因此只能對整個 RDD 進行數據擴容,對內存資源要求很高。

** 方案優點:** 對 join 類型的數據傾斜基本都可以處理,而且效果也相對比較顯著,性能提升效果非常不錯。
** 方案缺點:** 該方案更多的是緩解數據傾斜,而不是徹底避免數據傾斜。而且需要對整個 RDD 進行擴容,對內存資源要求很高

shuffle 調優#

參考: Spark Shuffle 原理及相關調優

在 Spark 1.2 以後的版本中,默認的HashShuffleManager改成了SortShuffleManager。SortShuffleManager 相較於 HashShuffleManager 來說,有了一定的改進。主要就在於,每個 Task 在進行 shuffle 操作時,雖然也會產生較多的臨時磁碟文件,但是最後會將所有的臨時文件合併(merge)成一個磁碟文件,因此每個 Task 就只有一個磁碟文件。在下一個 stage 的 shuffle read task 拉取自己的數據時,只要根據索引讀取每個磁碟文件中的部分數據即可

HashShuffleManager 的優化#

設置spark.shuffle.consolidateFiles, 默認為 false。
開啟consolidate機制之後,會出現shuffleFileGroup的概念。consolidate 機制允許不同的 task 復用同一批磁碟文件,這樣就可以有效將多個 task 的磁碟文件進行一定程度上的合併,從而大幅度減少磁碟文件的數量,進而提升 shuffle write 的性能

SortShuffleManager#

SortShuffleManager 的運行機制主要分成兩種:

  • 一種是普通運行機制
  • 另一種是 bypass 運行機制。當 shuffle read task 的數量小於等於spark.shuffle.sort.bypassMergeThreshold參數的值(默認為 200),就會啟用 bypass 機制

shuffle 相關參數調優#

  • spark.shuffle.file.buffer
    默認值:32k
    參數說明:該參數用於設置 shuffle write task 的 BufferedOutputStream 的 buffer 緩衝大小。將數據寫到磁碟文件之前,會先寫入 buffer 緩衝中,待緩衝寫滿之後,才會溢寫到磁碟

  • spark.reducer.maxSizeInFlight
    默認值:48m
    參數說明:該參數用於設置 shuffle read task 的 buffer 緩衝大小,而這個 buffer 緩衝決定了每次能夠拉取多少數據。如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小

  • spark.shuffle.io.maxRetries
    默認值:3
    參數說明:shuffle read task 從 shuffle write task 所在節點拉取屬於自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗

  • spark.shuffle.io.retryWait
    默認值:5s
    參數說明:代表了每次重試拉取數據的等待間隔。建議加大間隔時長(比如 60s),以增加 shuffle 操作的穩定性

  • spark.shuffle.memoryFraction
    默認值:0.2
    參數說明:該參數代表了 Executor 內存中,分配給 shuffle read task 進行聚合操作的內存比例,默認是 20%

  • spark.shuffle.manager
    默認值:sort
    參數說明:該參數用於設置 ShuffleManager 的類型。Spark 1.5 以後,有三個可選項:hashsorttungsten-sort。HashShuffleManager 是 Spark 1.2 以前的默認選項,但是 Spark 1.2 以及之後的版本默認都是 SortShuffleManager 了。tungsten-sort 與 sort 類似,但是使用了 tungsten 計劃中的堆外內存管理機制,內存使用效率更高

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。