Hadoop 系列 - MapReduce#
MapReduce 的主要思想:自動將一個大的計算程序拆分成 Map(映射)和 Reduce(化簡), 分治思想
流程概括#
input --> map --> shuffle -->reduce --->output
Map 端流程#
- Map: 自定義 Map 函數處理數據
- Partition: 為了將 map 的結果發送到相應的 reduce 端。總的 partition 的數目等於 reducer 的數量。具體實現為:對 key 進行 hash 後,再以 reduce task 數量取模,然後到指定的 job 上(默認HashPartitioner,可以通過
job.setPartitionerClass(MyPartition.class)
自定義) - Sort: 先按
<key,value,partition>
中的 partition 分區號排序,然後再按 key 排序,這個就是sort 操作,最後溢出的的小文件是分區的,且同一個分區內是保證 key 有序的 - Combine: 提前進行統計,進行局部合併,減少 Map 端到 Reduce 端的數據傳輸消耗。要求開發者必須在程序中設置了 combine(程序中通過
job.setCombinerClass(myCombine.class)
自定義 combine 操作). 在兩個階段可能會發生: (1) map 輸出數據根據分區排序完成後,在寫入文件之前會執行一次 combine 操作,當然前提是作業中設置了這個操作 (2) 如果 map 輸出比較大,溢出文件個數大於 3(此值可以通過屬性min.num.spills.for.combine
配置)時,在 merge 的過程中還會執行 combine 操作. - Merge: 當 map 很大時,每次溢寫會產生一個 spill_file,這樣會有多個 spill_file, 這個時候就需要進行 merge 合併操作,最終一個 MapTask 只輸出一個文件。也即是,待 Map Task 任務的所有數據都處理完後,會對任務產生的所有中間數據文件做一次合併操作,以確保一個 Map Task 最終只生成一個中間數據文件
Reduce 端流程#
- Copy: Reducer 節點向各個 mapper 節點下載屬於自己分區的數據,相同 Partition 的數據會落到同一節點
- Merge將不同 Mapper 節點上拉取的數據進行合併成一個文件
- Reduce: 對數據進行統計
提交 MR 作業到 Yarn 流程#
1. 產生RunJar進程,客戶端向RM申請執行一個job
2. RM返回job相關的資源提交的路徑staging-dir和本job產生的job ID
3. 客戶端會將job相關資源提交到相應的共享文件系統的路徑下(/yarn-staging/jobID)
4. 客戶端向RM匯報提交結果
5. RM將job加入任務隊列
6. NM通過與RM的心跳連接,從RM的任務隊列中獲取新的任務
7. RM 向 NM 分配運行資源容器container
8. RM在容器中啟用 MRAppMaster 進程
9. 由創建的 MRAppmaster 負責分配在哪些NodeManager上運行map(即yarnchild進程)和reduce任務
10. 運行map和reduce任務的NM從共享文件系統中獲取job相關資源,包括jar文件,配置文件等。然後運行map和reduce任務
11. job執行完成後, MRAppMaster向RM註銷自己,釋放資源
Shuffle 機制#
從 Map 輸出到 Reduce 輸入的整個過程可以廣義的稱為 Shuffle。Shuffle 橫跨 Map 端和 Reduce 端,在 Map 端包括 Partition 分區和 Spill 分割過程,在 Reduce 端包括 copy 複製和 Merge 過程。
環形內存緩衝區#
在 map 方法 () 中,最後一步通過OutputCollector.collect(key,value)
或context.write(key,value)
輸出中間處理結果,在相關的collect(key,value)
方法中,會調用Partitioner.getPartition(K2 key, V2 value, int numPartitions)
方法獲得輸出的 key/value 對應的分區號 (分區號可以認為對應著一個要執行 Reduce Task 的節點),然後將<key,value,partition>
暫時保存在內存中的MapOutputBuffer內部的環形數據緩衝區
,該緩衝區的默認大小是100MB,可以通過參數io.sort.mb
來調整其大小
MapOutputBuffer 內部存數據的採用了兩個索引結構,涉及三個環形內存緩衝區:
-
kvoffsets緩衝區
:也叫偏移量索引數組,用於保存 key/value 信息在位置索引 kvindices 中的偏移量。當kvoffsets
的使用率超過mapreduce.map.sort.spill.percent (默認為80%)
後,便會觸發一次 SpillThread 線程的 “溢寫” 操作,也就是開始一次 Spill 階段的操作。 -
kvindices緩衝區
:也叫位置索引數組,用於保存 key/value 在數據緩衝區 kvbuffer 中的起始位置。 -
kvbuffer即數據緩衝區
:用於保存實際的 key/value 的值。默認情況下該緩衝區最多可以使用io.sort.mb
的 95%,當kvbuffer 使用率超過 mapreduce.map.sort.spill.percent (默認為80%)
後,便會出發一次 SpillThread 線程的 “溢寫” 操作,也就是開始一次 Spill 階段的操作
Spill 溢寫#
當 MapOutputBuffer 內部的kvoffsets緩衝區
和kvbuffer即數據緩衝區
達到閾值時觸發進行 spill 操作。把內存緩衝區中的數據寫入到本地磁碟,在寫入本地磁碟時先按照 partition、再按照 key 進行排序
- 這個 spill 操作是由另外的單獨線程來操作,不影響往緩衝區寫 map 結果的線程
- 在將數據寫入磁碟之前,先要對要寫入磁碟的數據進行一次排序操作,先按 <key,value,partition> 中的 partition 分區號排序,然後再按 key 排序,這個就是 sort 操作,最後溢出的的小文件是分區的,且同一個分區內是保證 key 有序的
Copy 階段#
默認情況下,當整個 MapReduce 作業的所有已執行完成的 Map Task 任務數超過 Map Task 總數的 5% 後,JobTracker 便會開始調度執行 Reduce Task 任務。然後 Reduce Task 任務默認啟動mapred.reduce.parallel.copies(默認為5)
個 MapOutputCopier 線程到已完成的 Map Task 任務節點上分別 copy 一份屬於自己的數據。 這些 copy 的數據會首先保存的內存緩衝區中,當內存緩衝區的使用率達到一定閾值後,則寫到磁碟上
補充
這個內存緩衝區大小的控制就不像 map 端的內存緩衝區那樣通過io.sort.mb
來設定了,而是通過另外一個參數來設置:mapred.job.shuffle.input.buffer.percent(default 0.7)
, 這個參數意思是:shuffile 在 reduce 內存中的數據最多使用內存量為:0.7 × maxHeap of reduce task。如果該 reduce task 的最大 heap 使用量,通常通過mapred.child.java.opts
來設置,比如設置為 - Xmx1024m,reduce 會使用其 **heapsize 的 70%** 來在內存中緩存數據。如果 reduce 的 heap 由於業務原因調整的比較大,相應的緩存大小也會變大,這也是為什麼 reduce 用來做緩存的參數是一個百分比,而不是一個固定的值了
Merge 過程#
Copy 過來的數據會先放入內存緩衝區中,這裡的緩衝區大小要比 map 端的更為靈活,它基於 JVM 的heap size
設置,因為 Shuffle 階段 Reducer 不運行,所以應該把絕大部分的內存都給 Shuffle 用
有三種形式:
- 內存到內存
- 內存到磁碟
- 磁碟到磁碟
默認情況下第一種形式是不啟用的。當內存中的數據量到達一定閾值,就啟動內存到磁碟的 merge(之所以進行 merge 是因為 reduce 端在從多個 map 端 copy 數據的時候,並沒有進行 sort,只是把它們加載到內存,當達到閾值寫入磁碟時,需要進行 merge)。這種 merge 方式一直在運行,直到沒有 map 端的數據時才結束,然後才會啟動第三種磁碟到磁碟的 merge 方式生成最終的文件
MR 調優思路#
數據傾斜
:數據到 Reduce 進行匯總,如果有一個 ReduceTask 中的數據過大,會導致整個程序的效率很低- Map 和 Reduce 任務數量設置不合理
- Map 運行時間過長,導致 Reduce 等待過久
小文件過多
:因無論文件多大,都会分配元數據信息,如果過多就導致浪費資源、存儲空間- MR 中有大量不可分塊的超大文件,在 shuffle 階段會不斷溢寫
- 多個溢寫的小文件,需要多級 Merge