Yige

Yige

Build

Sparkの進階 - Sparkのチューニングまとめ

Spark 調整まとめ#

内容整理自:
三万字長文 | Spark 性能最適化実戦マニュアル

開発調整#

  • 重複 RDD の作成を避ける

  • 可能な限り同じ RDD を再利用する

  • 複数回使用する RDD を永続化する

  • shuffle 系の演算子の使用をできるだけ避ける

  • map-side での事前集約 shuffle 操作を使用する。例えば、reduceByKey や aggregateByKey 演算子を使用して groupByKey 演算子を置き換える

  • 高性能な演算子を使用する。例えば、mapPartitionsを普通のmapの代わりに使用する、foreach の代わりに foreachPartitions を使用する、filter の後に coalesce 操作を行う、repartitionAndSortWithinPartitions を repartition や sort 系の操作の代わりに使用する

  • 大きな変数をブロードキャストする

  • Kryo を使用してシリアル化性能を最適化する

  • データ構造を最適化する:可能な限りオブジェクトの代わりに文字列を使用し、文字列の代わりに原始型 (int、Long) を使用し、コレクションの代わりに配列を使用する

リソース調整#

Spark の実行フロー:
image.png

リソースパラメータ調整#

  • num-executors: Spark ジョブで使用する Executor プロセスの総数を設定するために使用される
  • executor-memory: このパラメータは各 Executor プロセスのメモリを設定するために使用される。Executor メモリのサイズは、しばしば Spark ジョブの性能を直接決定し、一般的な JVM OOM 例外とも直接関連している
  • executor-cores: このパラメータは各 Executor プロセスの CPU コア数を設定するために使用される。このパラメータは各 Executor プロセスがタスクスレッドを並行して実行する能力を決定する
  • driver-memory: Driver プロセスのメモリを設定するために使用される。Driver のメモリは通常設定しないか、1G 程度に設定すれば十分である。collect 演算子を使用してすべてのデータを Driver 側に引き寄せる場合は、Driver に十分なメモリがあることを確認する必要がある
  • spark.default.parallelism: 各ステージのデフォルトタスク数を設定するために使用される。公式の推奨設定原則はnum-executors * executor-coresの2~3倍が適切である。例えば、Executor の総 CPU コア数が 300 の場合、1000 タスクを設定することができる
  • spark.storage.memoryFraction: RDD の永続化データが Executor メモリ内で占める割合を設定するために使用される。デフォルトは 0.6 で、選択した異なる永続化戦略によって、メモリが不足している場合、データが永続化されないか、データがディスクに書き込まれる可能性がある
  • spark.shuffle.memoryFraction: shuffle プロセス中にタスクが前のステージのタスクの出力を取得した後、集約操作を行う際に使用できる Executor メモリの割合を設定するために使用される。デフォルトは 0.2

リソースパラメータ参考例#

./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \

データスキュー調整#

  • 大多数のタスクが非常に速く実行されるが、特定のタスクは非常に遅く実行され、Spark の進行状況は最も時間がかかるタスクに依存する。
  • shuffle 段階で、特定のキーに対応するデータ量が特に大きい場合、データスキューが発生する

調整の考え方#

  • データスキューがどのステージで発生しているかを特定する: Spark コード内に shuffle 系の演算子が現れたり、Spark SQL の SQL 文に shuffle を引き起こす文(例えば group by 文)が現れたりした場合、その場所を境界として前後の 2 つのステージが区分されると判断できる
  • Spark Web UI を使用して、エラーが発生したステージの各タスクの実行時間や割り当てられたデータ量を確認し、ログの例外スタックを通じて具体的なコードを特定する
  • データスキューを引き起こしているキーのデータ分布状況を確認し、異なる解決策を選択する:
    1. Spark SQL の group by や join 文が原因でデータスキューが発生している場合、SQL で使用されているテーブルのキー分布状況を確認する。
    2. Spark RDD で shuffle 演算子を実行してデータスキューが発生している場合、Spark ジョブにキー分布を確認するコードを追加することができる。例えば、RDD.countByKey () を使用する。次に、統計された各キーの出現回数を collect/take してクライアントに印刷すれば、キーの分布状況が確認できる

データスキューの解決策#

解決策 1:少数のスキューを引き起こすキーをフィルタリングする#

もし、少数のデータ量が特に多いキーがジョブの実行や計算結果にそれほど重要でないと判断した場合、単純にその少数のキーをフィルタリングしてしまう。例えば、Spark SQL では where 句を使用してこれらのキーをフィルタリングするか、Spark Core で RDD に filter 演算子を適用してこれらのキーをフィルタリングすることができる

解決策 2:shuffle 操作の並行度を高める#

解決策の実装思考:
RDD に対して shuffle 演算子を実行する際、shuffle 演算子にパラメータを渡す。例えば、reduceByKey (1000) のように。このパラメータは、この shuffle 演算子が実行される際の shuffle read タスクの数を設定する。Spark SQL の shuffle 系文、例えば group by や join などでは、spark.sql.shuffle.partitions というパラメータを設定する必要がある。このパラメータは shuffle read タスクの並行度を表し、デフォルトは 200 で、多くのシナリオでは少し小さすぎる

解決策の実装原理:
shuffle read タスクの数を増やすことで、元々1 つのタスクに割り当てられていた複数のキーを複数のタスクに割り当てることができ、各タスクが元よりも少ないデータを処理できるようになる。例えば、元々5 つのキーがあり、それぞれのキーに 10 件のデータがある場合、これら 5 つのキーは 1 つのタスクに割り当てられるため、そのタスクは 50 件のデータを処理する必要がある。しかし、shuffle read タスクを増やすと、各タスクは 1 つのキーに割り当てられるため、各タスクは 10 件のデータを処理することになり、自然に各タスクの実行時間は短くなる

解決策 3:二段階集約(局所集約 + 全体集約)#

実装思考:
二段階集約を行う。最初は局所集約で、各キーにランダムな数を付与する。例えば、10 以内のランダムな数を付与する。この時、元々同じキーは異なるものになる。例えば、(hello, 1) (hello, 1) (hello, 1) (hello, 1) は (1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1) に変わる。次に、ランダムな数が付与されたデータに対して reduceByKey などの集約操作を実行し、局所集約を行う。この局所集約結果は (1_hello, 2) (2_hello, 2) になる。その後、各キーの接頭辞を取り除くと (hello,2)(hello,2) になり、再度全体集約操作を行うことで最終結果を得ることができる。例えば (hello, 4)

実装コード:

// 第一歩、RDD内の各キーにランダムな接頭辞を付与する。
JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(new PairFunction<Tuple2<Long,Long>, String, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, Long> call(Tuple2<Long, Long> tuple) throws Exception {
        Random random = new Random();
        int prefix = random.nextInt(10);
        return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
    }
});
// 第二歩、ランダムな接頭辞が付与されたキーに対して局所集約を行う。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Long call(Long v1, Long v2) throws Exception {
        return v1 + v2;
    }
});
// 第三歩、RDD内の各キーのランダムな接頭辞を取り除く。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)throws Exception {
        long originalKey = Long.valueOf(tuple._1.split("_")[1]);
        return new Tuple2<Long, Long>(originalKey, tuple._2);
    }
});
// 第四歩、ランダムな接頭辞を取り除いたRDDに対して全体集約を行う。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Long call(Long v1, Long v2) throws Exception {
        return v1 + v2;
    }
});

解決策の利点:
集約系の shuffle 操作によるデータスキューに対して非常に良い効果がある。通常、データスキューを解決できるか、少なくとも大幅に緩和し、Spark ジョブの性能を数倍以上向上させることができる。

解決策の欠点:
集約系の shuffle 操作にのみ適用され、適用範囲は比較的狭い。join 系の shuffle 操作の場合は、他の解決策を使用する必要がある。

解決策 4:reduce join を map join に変換する#

実装思考:
join 演算子を使用せず、Broadcast 変数と map 系演算子を使用して join 操作を実現し、shuffle 系の操作を完全に回避し、データスキューの発生を防ぐ

実装原理:
通常の join は shuffle プロセスを経るが、一度 shuffle が発生すると、同じキーのデータが 1 つの shuffle read タスクに引き寄せられて join される。この時、reduce join となる。しかし、もし RDD が比較的小さい場合、ブロードキャストされた小 RDD の全データ + map 演算子を使用して join と同様の効果を実現することができ、これを map join と呼ぶ。この時、shuffle 操作は発生せず、データスキューも発生しない。

実装コード

// まず、データ量が比較的小さいRDDのデータをDriverに集める。
List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()
// その後、Sparkのブロードキャスト機能を使用して、小RDDのデータをブロードキャスト変数に変換する。これにより、各ExecutorにはRDDデータが1つだけ存在する。
// メモリスペースをできるだけ節約し、ネットワーク伝送性能のオーバーヘッドを減少させることができる。
final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);
// もう一つのRDDに対してmap系操作を実行し、join系操作を行わない。
JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(
new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple) throws Exception {
        // 演算子関数内で、ブロードキャスト変数を使用して、ローカルExecutor内のrdd1データを取得する。
        List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();
        // rdd1のデータをMapに変換し、後でjoin操作を行うのに便利にする。
        Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();
        for(Tuple2<Long, Row> data : rdd1Data) {
            rdd1DataMap.put(data._1, data._2);
        }
        // 現在のRDDデータのキーと値を取得する。
        String key = tuple._1;
        String value = tuple._2;
        // rdd1データMapから、キーに基づいてjoinできるデータを取得する。
        Row rdd1Value = rdd1DataMap.get(key);
        return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));
    }
});
// ここで注意が必要です。
// 上記の方法は、rdd1のキーが重複していない、すべてがユニークなシナリオにのみ適用されます。
// もしrdd1に複数の同じキーがある場合、flatMap系の操作を使用して、join時にmapを使用せず、rdd1のすべてのデータをループしてjoinする必要があります。
// rdd2の各データは、join後に複数のデータを返す可能性があります。

解決策 5:傾斜キーをサンプリングし、join 操作を分割する#

解決策の適用シナリオ:
2 つの RDD/Hive テーブルを join する際、データ量が大きい場合、2 つの RDD/Hive テーブルのキー分布状況を確認することができる。もしデータスキューが発生している場合、それは特定の RDD/Hive テーブルの少数のキーのデータ量が過大であり、もう一方の RDD/Hive テーブルのすべてのキーが均等に分布している場合、この解決策が適切である。
解決策の実装思考:

  1. データ量が過大な少数のキーを含む RDD に対して、sample 演算子を使用してサンプルを取得し、各キーの数を統計し、データ量が最大のキーを計算する。
  2. 次に、これらのキーに対応するデータを元の RDD から分割し、独立した RDD を形成し、各キーに n 以内のランダムな数を接頭辞として付与し、傾斜を引き起こさない大部分のキーを別の RDD として形成する。
  3. 次に、join する必要のあるもう一つの RDD からも、傾斜キーに対応するデータをフィルタリングし、独立した RDD を形成し、各データを n 件に膨張させ、0〜n の接頭辞を順次付加し、傾斜を引き起こさない大部分のキーを別の RDD として形成する。
  4. その後、接頭辞が付加された独立 RDD と、膨張した n 倍の独立 RDD を join し、この時、元々同じキーを n 分割し、複数のタスクに分散して join を行うことができる。
  5. 他の 2 つの通常の RDD はそのまま join すればよい。
  6. 最後に、2 回の join 結果を union 演算子で結合すれば、最終的な join 結果が得られる。

解決策の実装原理:
join によるデータスキューが発生している場合、もし少数のキーが原因であれば、それらの少数のキーを独立した RDD に分割し、ランダムな接頭辞を付与して n 分割して join することで、これらのキーに対応するデータが少数のタスクに集中することなく、複数のタスクに分散して join される。

解決策 6:ランダム接頭辞と RDD の拡張を使用して join する#

解決策の適用シナリオ:join 操作を行う際、RDD 内に大量のキーが存在しデータスキューを引き起こす場合、キーを分割する意味がなくなる。この時、最後の解決策を使用して問題を解決するしかない。
解決策の実装思考:

  1. 「解決策 5」と似て、まず RDD/Hive テーブル内のデータ分布状況を確認し、データスキューを引き起こしている RDD/Hive テーブルを特定する。例えば、複数のキーが 1 万件を超えるデータに対応している場合。
  2. 次に、その RDD の各データに n 以内のランダムな接頭辞を付与する。
  3. 同時に、正常な RDD に対して拡張を行い、各データを n 件に拡張し、拡張された各データに 0〜n の接頭辞を付与する。
  4. 最後に、処理された 2 つの RDD を join すればよい。

解決策の実装原理:
元々同じキーをランダム接頭辞を付与することで異なるキーに変え、これらの処理された「異なるキー」を複数のタスクに分散して処理できるようにする。この解決策は「解決策 5」との違いは、前の解決策は少数の傾斜キーに対応するデータを特別に処理することを目指しているが、処理過程で RDD を拡張する必要があるため、メモリの占有はそれほど大きくない。一方、この解決策は大量の傾斜キーが存在する場合に対処するため、部分的なキーを分割して特別に処理することができないため、全体の RDD を拡張する必要があり、メモリリソースの要求が高い。

解決策の利点: join タイプのデータスキューを基本的に処理でき、効果も比較的顕著で、性能向上の効果が非常に良い。
解決策の欠点: この解決策はデータスキューを完全に回避するのではなく、基本的にはデータスキューを緩和するものであり、全体の RDD を拡張するため、メモリリソースの要求が高い。

shuffle 調整#

参考: Spark Shuffle 原理及び関連調整

Spark 1.2 以降のバージョンでは、デフォルトのHashShuffleManagerSortShuffleManagerに変更された。SortShuffleManager は HashShuffleManager に比べて一定の改善がなされている。主に、各タスクが shuffle 操作を行う際、比較的多くの一時ディスクファイルが生成されるが、最終的にはすべての一時ファイルがマージされ(merge)、1 つのディスクファイルにまとめられるため、各タスクは 1 つのディスクファイルのみを持つことになる。次のステージの shuffle read タスクが自分のデータを取得する際は、インデックスに基づいて各ディスクファイル内の一部データを読み取るだけで済む。

HashShuffleManager の最適化#

spark.shuffle.consolidateFilesを設定する。デフォルトは false。
consolidate機能を有効にすると、shuffleFileGroupの概念が登場する。consolidate 機能は異なるタスクが同じ一群のディスクファイルを再利用することを許可するため、複数のタスクのディスクファイルを一定程度マージすることができ、ディスクファイルの数を大幅に減少させ、shuffle 書き込みの性能を向上させる。

SortShuffleManager#

SortShuffleManager の実行メカニズムは主に 2 つに分かれる:

  • 一つは通常の実行メカニズム
  • もう一つは bypass 実行メカニズム。shuffle read タスクの数がspark.shuffle.sort.bypassMergeThresholdパラメータの値(デフォルトは 200)以下の場合、bypass メカニズムが有効になる。

shuffle 関連パラメータ調整#

  • spark.shuffle.file.buffer
    デフォルト値:32k
    パラメータ説明:このパラメータは shuffle 書き込みタスクの BufferedOutputStream のバッファサイズを設定するために使用される。データがディスクファイルに書き込まれる前に、まずバッファに書き込まれ、バッファが満杯になった時点でディスクに溢れ出す。

  • spark.reducer.maxSizeInFlight
    デフォルト値:48m
    パラメータ説明:このパラメータは shuffle read タスクのバッファサイズを設定するために使用され、このバッファは一度にどれだけのデータを取得できるかを決定する。ジョブで利用可能なメモリリソースが十分であれば、このパラメータのサイズを適度に増加させることができる。

  • spark.shuffle.io.maxRetries
    デフォルト値:3
    パラメータ説明:shuffle read タスクが shuffle 書き込みタスクが存在するノードから自分のデータを取得する際、ネットワーク異常により取得に失敗した場合、自動的に再試行される。このパラメータは最大再試行回数を表す。指定された回数内に取得が成功しない場合、ジョブの実行が失敗する可能性がある。

  • spark.shuffle.io.retryWait
    デフォルト値:5s
    パラメータ説明:データ取得の再試行間隔を表す。間隔を長くすること(例えば 60s)を推奨し、shuffle 操作の安定性を高める。

  • spark.shuffle.memoryFraction
    デフォルト値:0.2
    パラメータ説明:このパラメータは Executor メモリ内で、shuffle read タスクが集約操作を行うために割り当てられるメモリの割合を表す。デフォルトは 20%。

  • spark.shuffle.manager
    デフォルト値:sort
    パラメータ説明:このパラメータは ShuffleManager のタイプを設定するために使用される。Spark 1.5 以降、3 つの選択肢がある:hashsort、およびtungsten-sort。HashShuffleManager は Spark 1.2 以前のデフォルトオプションだが、Spark 1.2 以降のバージョンではデフォルトで SortShuffleManager が使用されている。tungsten-sort は sort に似ているが、tungsten 計画のヒープ外メモリ管理メカニズムを使用しており、メモリ使用効率が高い

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。