Hadoop シリーズ - MapReduce#
MapReduce の主要な考え方:自動的に大きな計算プログラムを Map(マッピング)と Reduce(簡略化)に分割する、分治思想
プロセスの概要#
input --> map --> shuffle -->reduce --->output
Map 側のプロセス#
- Map: カスタム Map 関数でデータを処理
- Partition: map の結果を対応する reduce 側に送信するため。全体の partition の数は reducer の数に等しい。具体的な実装は、key をハッシュ化した後、reduce タスクの数でモジュロを取り、指定された job に送る(デフォルトはHashPartitioner、
job.setPartitionerClass(MyPartition.class)
でカスタマイズ可能) - Sort: まず
<key,value,partition>
の partition 番号でソートし、その後 key でソートする。これがsort 操作であり、最後に溢れた小ファイルはパーティションのものであり、同じパーティション内では key が順序を保つ - Combine: 事前に統計を行い、局所的にマージして、Map 側から Reduce 側へのデータ転送の消耗を減らす。開発者はプログラム内で combine を設定する必要がある(プログラム内で
job.setCombinerClass(myCombine.class)
でカスタマイズ可能)。二つの段階で発生する可能性がある: (1) map の出力データが partition でソートされた後、ファイルに書き込む前に一度 combine 操作が実行される、もちろん前提として作業内でこの操作が設定されていること (2) map の出力が大きい場合、溢れたファイルの数が 3 を超える(この値はmin.num.spills.for.combine
属性で設定可能)と、merge の過程でも combine 操作が実行される。 - Merge: map が非常に大きい場合、毎回の溢れ書きで spill_file が生成され、複数の spill_file が存在する。この時、merge 操作を行う必要があり、最終的に一つの MapTask が一つのファイルを出力する。つまり、Map Task の全データが処理された後、タスクが生成した全ての中間データファイルを一度マージして、Map Task が最終的に一つの中間データファイルを生成することを保証する
Reduce 側のプロセス#
- Copy: Reducer ノードが各 mapper ノードから自分のパーティションに属するデータをダウンロードする。同じ Partition のデータは同じノードに落ちる
- Merge: Reducer が異なる Mapper ノードから取得したデータを一つのファイルにマージする
- Reduce: データを集計する
MR 作業を Yarn に提出するプロセス#
1. RunJarプロセスを生成し、クライアントがRMにjobの実行を申請する
2. RMがjob関連のリソース提出のパスstaging-dirとこのjobが生成したjob IDを返す
3. クライアントがjob関連リソースを対応する共有ファイルシステムのパス(/yarn-staging/jobID)に提出する
4. クライアントがRMに提出結果を報告する
5. RMがjobをタスクキューに追加する
6. NMがRMとのハートビート接続を通じて、RMのタスクキューから新しいタスクを取得する
7. RMがNMに実行リソースコンテナcontainerを割り当てる
8. RMがコンテナ内でMRAppMasterプロセスを起動する
9. 作成されたMRAppmasterがどのNodeManagerでmap(つまりyarnchildプロセス)とreduceタスクを実行するかを割り当てる
10. mapとreduceタスクを実行するNMが共有ファイルシステムからjob関連リソースを取得し、jarファイル、設定ファイルなどを含む。それからmapとreduceタスクを実行する
11. jobが完了した後、MRAppMasterがRMに自分を解除し、リソースを解放する
Shuffle メカニズム#
Map から Reduce への入力の全過程は広義に Shuffle と呼ばれる。Shuffle は Map 側と Reduce 側を跨ぎ、Map 側には Partition 分割と Spill 分割の過程が含まれ、Reduce 側には copy と Merge の過程が含まれる。
環状メモリバッファ#
map メソッド () の最後のステップで、OutputCollector.collect(key,value)
またはcontext.write(key,value)
を通じて中間処理結果を出力し、関連するcollect(key,value)
メソッド内でPartitioner.getPartition(K2 key, V2 value, int numPartitions)
メソッドを呼び出して出力の key/value に対応するパーティション番号を取得する(パーティション番号は Reduce Task を実行するノードに対応すると考えられる)、その後<key,value,partition>
を一時的にメモリ内のMapOutputBuffer内部の環状データバッファ
に保存する。このバッファのデフォルトサイズは100MBで、パラメータio.sort.mb
でサイズを調整できる。
MapOutputBuffer 内部に保存されるデータは二つのインデックス構造を使用し、三つの環状メモリバッファに関与する:
-
kvoffsetsバッファ
:オフセットインデックス配列とも呼ばれ、key/value 情報が位置インデックス kvindices の中でのオフセットを保存するために使用される。kvoffsets
の使用率がmapreduce.map.sort.spill.percent (デフォルトは80%)
を超えると、SpillThread スレッドの「溢れ書き」操作がトリガーされ、Spill 段階の操作が開始される。 -
kvindicesバッファ
:位置インデックス配列とも呼ばれ、key/value がデータバッファ kvbuffer の中での開始位置を保存するために使用される。 -
kvbufferつまりデータバッファ
:実際の key/value の値を保存するために使用される。デフォルトではこのバッファは最大でio.sort.mb
の 95% を使用でき、kvbufferの使用率がmapreduce.map.sort.spill.percent (デフォルトは80%)
を超えると、SpillThread スレッドの「溢れ書き」操作がトリガーされ、Spill 段階の操作が開始される。
Spill 溢れ書き#
MapOutputBuffer 内部のkvoffsetsバッファ
とkvbufferつまりデータバッファ
が閾値に達した時に spill 操作がトリガーされる。メモリバッファ内のデータをローカルディスクに書き込み、ローカルディスクに書き込む際にはまず partition、次に key でソートする。
- この spill 操作は別のスレッドによって操作され、バッファに map 結果を書き込むスレッドには影響を与えない
- ディスクにデータを書き込む前に、書き込むデータを一度ソート操作する必要があり、まず <key,value,partition> の中の partition 番号でソートし、その後 key でソートする。これが sort 操作であり、最後に溢れた小ファイルはパーティションのものであり、同じパーティション内では key が順序を保つ
Copy 段階#
デフォルトでは、全ての実行が完了した Map Task の数が Map Task の総数の 5% を超えると、JobTracker は Reduce Task の実行をスケジュールし始める。その後、Reduce Task はデフォルトでmapred.reduce.parallel.copies(デフォルトは5)
個の MapOutputCopier スレッドを起動し、完了した Map Task ノードにそれぞれ自分のデータをコピーする。これらのコピーされたデータは最初にメモリバッファに保存され、メモリバッファの使用率が一定の閾値に達すると、ディスクに書き込まれる。
補足
このメモリバッファのサイズの制御は map 側のメモリバッファのようにio.sort.mb
で設定されるのではなく、別のパラメータで設定される:mapred.job.shuffle.input.buffer.percent(デフォルト0.7)
。このパラメータの意味は:shuffile が reduce メモリ内で使用するデータの最大メモリ量は:0.7 × reduce タスクの maxHeap。もしその reduce タスクの最大 heap 使用量が、通常mapred.child.java.opts
で設定される場合、例えば - Xmx1024m に設定されると、reduce はその **heapsize の 70%** を使用してメモリ内にデータをキャッシュする。もし reduce の heap がビジネスの理由で大きく調整されると、対応するキャッシュサイズも大きくなる。これが reduce がキャッシュに使用するパラメータが固定値ではなく、パーセンテージである理由である。
Merge プロセス#
Copy されたデータは最初にメモリバッファに置かれ、ここでのバッファサイズは map 側のものよりも柔軟で、JVM のheap size
設定に基づいている。Shuffle 段階では Reducer が実行されないため、ほとんどのメモリを Shuffle に使用すべきである。
三つの形式がある:
- メモリからメモリ
- メモリからディスク
- ディスクからディスク
デフォルトでは最初の形式は有効になっていない。メモリ内のデータ量が一定の閾値に達すると、メモリからディスクへのマージが開始される(マージを行う理由は、reduce 側が複数の map 側からデータをコピーする際に、ソートを行わず、単にメモリにロードするだけであり、閾値に達してディスクに書き込む際にマージが必要である)。このマージ方式は、map 側のデータがなくなるまで実行され続け、その後に最終ファイルを生成するための第三のディスクからディスクへのマージ方式が開始される。
MR 調整の考え方#
データの偏り
:データが Reduce に集約される際、ある ReduceTask のデータが過大であると、全体のプログラムの効率が非常に低下する- Map と Reduce タスクの数の設定が不合理
- Map の実行時間が長すぎて、Reduce が長時間待機する
小ファイルが多すぎる
:ファイルの大きさに関わらず、メタデータ情報が割り当てられるため、過剰になるとリソースやストレージスペースが無駄になる- MR 内に大量の分割不可能な超大ファイルがあり、shuffle 段階で継続的に溢れ書きが発生する
- 複数の溢れた小ファイルがあり、多段階のマージが必要