Hadoop Series - MapReduce#
The main idea of MapReduce: automatically split a large computation program into Map (mapping) and Reduce (simplification), divide and conquer
.
Process Overview#
input --> map --> shuffle --> reduce ---> output
Map Side Process#
- Map: Custom Map function to process data
- Partition: To send the map results to the corresponding reduce side. The total number of partitions equals the number of reducers. The specific implementation is: hash the key, then take the modulus with the number of reduce tasks, and then go to the specified job (default HashPartitioner, can be customized through
job.setPartitionerClass(MyPartition.class)
) - Sort: First sort by the partition number in
<key,value,partition>
, then sort by key, this is the sort operation, and the small overflow files are partitioned, ensuring that keys are ordered within the same partition. - Combine: Perform statistics in advance, conduct local merging, and reduce data transmission consumption from the Map side to the Reduce side. Developers must set combine in the program (custom combine operation in the program through
job.setCombinerClass(myCombine.class)
). This may occur in two stages: (1) after the map output data is sorted by partition, a combine operation will be executed before writing to the file, provided this operation is set in the job; (2) if the map output is large, and the number of overflow files exceeds 3 (this value can be configured through the propertymin.num.spills.for.combine
), a combine operation will also be executed during the merge process. - Merge: When the map is large, each overflow will produce a spill_file, resulting in multiple spill_files, at which point a merge operation is needed, ultimately ensuring that a MapTask outputs only one file. That is, after all data from the Map Task is processed, a merge operation will be performed on all intermediate data files generated by the task to ensure that a Map Task ultimately generates only one intermediate data file.
Reduce Side Process#
- Copy: The Reducer node downloads the data belonging to its partition from each mapper node; data from the same partition will land on the same node.
- Merge: The Reducer merges the data pulled from different Mapper nodes into one file.
- Reduce: Perform statistics on the data.
Submitting MR Jobs to Yarn Process#
- Generate RunJar process, the client requests RM to execute a job.
- RM returns the resource submission path staging-dir related to the job and the job ID generated by this job.
- The client submits the job-related resources to the corresponding shared file system path (/yarn-staging/jobID).
- The client reports the submission result to RM.
- RM adds the job to the task queue.
- NM obtains new tasks from RM's task queue through heartbeat connection with RM.
- RM allocates running resource containers to NM.
- RM starts the MRAppMaster process in the container.
- The created MRAppmaster is responsible for allocating which NodeManagers to run map (i.e., yarnchild process) and reduce tasks.
- The NM running the map and reduce tasks retrieves job-related resources from the shared file system, including jar files, configuration files, etc., and then runs the map and reduce tasks.
- After the job execution is completed, MRAppMaster unregisters itself from RM and releases resources.
Shuffle Mechanism#
The entire process from Map output to Reduce input can be broadly referred to as Shuffle. Shuffle spans the Map side and Reduce side, including the Partition partitioning and Spill segmentation processes on the Map side, and the copy and Merge processes on the Reduce side.
Circular Memory Buffer#
In the map method(), the last step outputs the intermediate processing results through OutputCollector.collect(key,value)
or context.write(key,value)
. In the relevant collect(key,value)
method, the Partitioner.getPartition(K2 key, V2 value, int numPartitions)
method is called to obtain the partition number corresponding to the output key/value (the partition number can be considered as corresponding to a node where a Reduce Task will be executed), and then <key,value,partition>
is temporarily stored in the MapOutputBuffer's internal circular data buffer
in memory, which has a default size of 100MB, adjustable via the parameter io.sort.mb
.
The data stored in the MapOutputBuffer uses two index structures, involving three circular memory buffers:
-
kvoffsets buffer
: also known as the offset index array, used to save the offset of key/value information in the position index kvindices. When the usage ofkvoffsets
exceedsmapreduce.map.sort.spill.percent (default 80%)
, a SpillThread thread's "spill" operation will be triggered, starting a Spill phase operation. -
kvindices buffer
: also known as the position index array, used to save the starting position of key/value in the data buffer kvbuffer. -
kvbuffer, the data buffer
: used to store the actual key/value values. By default, this buffer can use up to 95% ofio.sort.mb
. When the usage ofkvbuffer
exceedsmapreduce.map.sort.spill.percent (default 80%)
, a SpillThread thread's "spill" operation will be triggered, starting a Spill phase operation.
Spill#
When the kvoffsets buffer
and kvbuffer, the data buffer
in the MapOutputBuffer reach the threshold, a spill operation is triggered. This writes the data from the memory buffer to the local disk, sorting first by partition and then by key during the write to the local disk.
- This spill operation is performed by another separate thread, which does not affect the thread writing map results to the buffer.
- Before writing data to disk, a sorting operation must be performed on the data to be written to disk, first sorting by the partition number in
<key,value,partition>
, then sorting by key; this is the sort operation, and the final overflow small files are partitioned, ensuring that keys are ordered within the same partition.
Copy Phase#
By default, when the number of completed Map Task tasks in the entire MapReduce job exceeds 5% of the total number of Map Tasks, the JobTracker will start scheduling the execution of Reduce Task tasks. Then the Reduce Task tasks will default to start mapred.reduce.parallel.copies (default 5)
MapOutputCopier threads to copy their respective data from the completed Map Task task nodes. These copied data will first be saved in the memory buffer, and when the memory buffer usage reaches a certain threshold, it will be written to disk.
Supplement
The control of the size of this memory buffer is not set through io.sort.mb
like the map side's memory buffer, but through another parameter: mapred.job.shuffle.input.buffer.percent (default 0.7)
. This parameter means that the shuffle in the reduce memory can use a maximum memory amount of: 0.7 × maxHeap of the reduce task. If the maximum heap usage of the reduce task is typically set through mapred.child.java.opts
, for example, set to -Xmx1024m, the reduce will use 70% of its heapsize to cache data in memory. If the reduce's heap is adjusted to be larger for business reasons, the corresponding cache size will also increase, which is why the parameter for caching in reduce is a percentage rather than a fixed value.
Merge Process#
The data copied over will first be placed in the memory buffer, which is more flexible in size than the map side's buffer, as it is based on the JVM's heap size
settings. Since the Shuffle phase does not run on the Reducer, most of the memory should be allocated for Shuffle.
There are three forms:
- Memory to memory
- Memory to disk
- Disk to disk
By default, the first form is not enabled. When the amount of data in memory reaches a certain threshold, memory to disk merge is initiated (the reason for merging is that the reduce side does not sort the data when copying from multiple map sides; it simply loads them into memory, and when the threshold is reached for writing to disk, a merge is needed). This merge method continues to run until there is no data from the map side, at which point the third disk to disk merge method is initiated to generate the final file.
MR Tuning Ideas#
Data Skew
: Data is summarized to Reduce; if one ReduceTask has too much data, it can significantly lower the efficiency of the entire program.- Improper settings for the number of Map and Reduce tasks.
- Map running time is too long, causing Reduce to wait too long.
Too many small files
: Regardless of file size, metadata information is allocated; if too many, it leads to wasted resources and storage space.- There are many large files in MR that cannot be split, causing continuous spills during the shuffle phase.
- Multiple small overflow files require multi-level merging.