Hadoop 系列 - MapReduce#
MapReduce 的主要思想:自动将一个大的计算程序拆分成 Map(映射)和 Reduce(化简), 分冶思想
流程概括#
input --> map --> shuffle -->reduce --->output
Map 端流程#
- Map: 自定义 Map 函数处理数据
- Partition: 为了将 map 的结果发送到相应的 reduce 端。总的 partition 的数目等于 reducer 的数量。具体实现为:对 key 进行 hash 后,再以 reduce task 数量取模,然后到指定的 job 上(默认HashPartitioner,可以通过
job.setPartitionerClass(MyPartition.class)
自定义) - Sort: 先按
<key,value,partition>
中的 partition 分区号排序,然后再按 key 排序,这个就是sort 操作,最后溢出的小文件是分区的,且同一个分区内是保证 key 有序的 - Combine: 提前进行统计,进行局部合并,减少 Map 端到 Reduce 端的数据传输消耗。要求开发者必须在程序中设置了 combine(程序中通过
job.setCombinerClass(myCombine.class)
自定义 combine 操作). 在两个阶段可能会发生: (1) map 输出数据根据分区排序完成后,在写入文件之前会执行一次 combine 操作,当然前提是作业中设置了这个操作 (2) 如果 map 输出比较大,溢出文件个数大于 3(此值可以通过属性min.num.spills.for.combine
配置)时,在 merge 的过程中还会执行 combine 操作. - Merge: 当 map 很大时,每次溢写会产生一个 spill_file,这样会有多个 spill_file, 这个时候就需要进行 merge 合并操作,最终一个 MapTask 只输出一个文件。也即是,待 Map Task 任务的所有数据都处理完后,会对任务产生的所有中间数据文件做一次合并操作,以确保一个 Map Task 最终只生成一个中间数据文件
Reduce 端流程#
- Copy: Reducer 节点向各个 mapper 节点下载属于自己分区的数据,相同 Partition 的数据会落到同一节点
- Merge将不同 Mapper 节点上拉取的数据进行合并成一个文件
- Reduce: 对数据进行统计
提交 MR 作业到 Yarn 流程#
1. 产生RunJar进程,客户端向RM申请执行一个job
2. RM返回job相关的资源提交的路径staging-dir和本job产生的job ID
3. 客户端会将job相关资源提交到相应的共享文件系统的路径下(/yarn-staging/jobID)
4. 客户端向RM汇报提交结果
5. RM将job加入任务队列
6. NM通过与RM的心跳连接,从RM的任务队列中获取新的任务
7. RM 向 NM 分配运行资源容器container
8. RM在容器中启用 MRAppMaster 进程
9. 由创建的 MRAppmaster 负责分配在哪些NodeManager上运行map(即yarnchild进程)和reduce任务
10. 运行map和reduce任务的NM从共享文件系统中获取job相关资源,包括jar文件,配置文件等。然后运行map和reduce任务
11. job执行完成后, MRAppMaster向RM注销自己,释放资源
Shuffle 机制#
从 Map 输出到 Reduce 输入的整个过程可以广义的称为 Shuffle。Shuffle 横跨 Map 端和 Reduce 端,在 Map 端包括 Partition 分区和 Spill 分割过程,在 Reduce 端包括 copy 复制和 Merge 过程。
环形内存缓冲区#
在 map 方法 () 中,最后一步通过OutputCollector.collect(key,value)
或context.write(key,value)
输出中间处理结果,在相关的collect(key,value)
方法中,会调用Partitioner.getPartition(K2 key, V2 value, int numPartitions)
方法获得输出的 key/value 对应的分区号 (分区号可以认为对应着一个要执行 Reduce Task 的节点),然后将<key,value,partition>
暂时保存在内存中的MapOutputBuffer内部的环形数据缓冲区
,该缓冲区的默认大小是100MB,可以通过参数io.sort.mb
来调整其大小
MapOutputBuffer 内部存数的数据采用了两个索引结构,涉及三个环形内存缓冲区:
-
kvoffsets缓冲区
:也叫偏移量索引数组,用于保存 key/value 信息在位置索引 kvindices 中的偏移量。当kvoffsets
的使用率超过mapreduce.map.sort.spill.percent (默认为80%)
后,便会触发一次 SpillThread 线程的 “溢写” 操作,也就是开始一次 Spill 阶段的操作。 -
kvindices缓冲区
:也叫位置索引数组,用于保存 key/value 在数据缓冲区 kvbuffer 中的起始位置。 -
kvbuffer即数据缓冲区
:用于保存实际的 key/value 的值。默认情况下该缓冲区最多可以使用io.sort.mb
的 95%,当kvbuffer 使用率超过 mapreduce.map.sort.spill.percent (默认为80%)
后,便会出发一次 SpillThread 线程的 “溢写” 操作,也就是开始一次 Spill 阶段的操作
Spill 溢写#
当 MapOutputBuffer 内部的kvoffsets缓冲区
和kvbuffer即数据缓冲区
达到阈值时触发进行 spill 操作。把内存缓冲区中的数据写入到本地磁盘,在写入本地磁盘时先按照 partition、再按照 key 进行排序
- 这个 spill 操作是由另外的单独线程来操作,不影响往缓冲区写 map 结果的线程
- 在将数据写入磁盘之前,先要对要写入磁盘的数据进行一次排序操作,先按 <key,value,partition> 中的 partition 分区号排序,然后再按 key 排序,这个就是 sort 操作,最后溢出的小文件是分区的,且同一个分区内是保证 key 有序的
Copy 阶段#
默认情况下,当整个 MapReduce 作业的所有已执行完成的 Map Task 任务数超过 Map Task 总数的 5% 后,JobTracker 便会开始调度执行 Reduce Task 任务。然后 Reduce Task 任务默认启动mapred.reduce.parallel.copies(默认为5)
个 MapOutputCopier 线程到已完成的 Map Task 任务节点上分别 copy 一份属于自己的数据。 这些 copy 的数据会首先保存的内存缓冲区中,当内存缓冲区的使用率达到一定阀值后,则写到磁盘上
补充
这个内存缓冲区大小的控制就不像 map 端的内存缓冲区那样通过io.sort.mb
来设定了,而是通过另外一个参数来设置:mapred.job.shuffle.input.buffer.percent(default 0.7)
, 这个参数意思是:shuffile 在 reduce 内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。如果该 reduce task 的最大 heap 使用量,通常通过mapred.child.java.opts
来设置,比如设置为 - Xmx1024m,reduce 会使用其 **heapsize 的 70%** 来在内存中缓存数据。如果 reduce 的 heap 由于业务原因调整的比较大,相应的缓存大小也会变大,这也是为什么 reduce 用来做缓存的参数是一个百分比,而不是一个固定的值了
Merge 过程#
Copy 过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比 map 端的更为灵活,它基于 JVM 的heap size
设置,因为 Shuffle 阶段 Reducer 不运行,所以应该把绝大部分的内存都给 Shuffle 用
有三种形式:
- 内存到内存
- 内存到磁盘
- 磁盘到磁盘
默认情况下第一种形式是不启用的。当内存中的数据量到达一定阈值,就启动内存到磁盘的 merge(之所以进行 merge 是因为 reduce 端在从多个 map 端 copy 数据的时候,并没有进行 sort,只是把它们加载到内存,当达到阈值写入磁盘时,需要进行 merge)。这种 merge 方式一直在运行,直到没有 map 端的数据时才结束,然后才会启动第三种磁盘到磁盘的 merge 方式生成最终的文件
MR 调优思路#
数据倾斜
:数据到 Reduce 进行汇总,如果有一个 ReduceTask 中的数据过大,会导致整个程序的效率很低- Map 和 Reduce 任务数量设置不合理
- Map 运行时间过长,导致 Reduce 等待过久
小文件过多
:因无论文件多大,都会分配元数据信息,如果过多就导致浪费资源、存储空间- MR 中有大量不可分块的超大文件,在 shuffle 阶段会不断溢写
- 多个溢写的小文件,需要多级 Merge