21 Mart 2015 Cumartesi

Hadoop Shuffle and Sort Phase

I will try to explain some points about shuffle&sort phase of Hadoop map-reduce.

It is tested with Hadoop 1.0.3 on Ubuntu 12.04.

1. Map-Reduce Mechanics

When writing map-reduce code, we implement map and reduce functions to define the logic of data flow. We get key-value pair as an input to our map function and emit a new key-value pair. These key-value pairs emitted from map tasks are then given to reduce task. Reduce function gets a key and set of values and then emits a key-value pair finishing the processing for that data.

This is an efficient abstraction hiding many details of processing done in the background. However, understanding the mechanics of underlying process is key to grasp the full capability of map-reduce and also key to optimize the performance of map-reduce jobs.

I will focus on shuffle and sort phase. I will also give an example to clarify some points.

2. Shuffle And Sort

Shuffle phase covers the transformation of map outputs and their transfer to reducers. Sort phase covers the merge and sort of map outputs in reducers. Below figure taken from "Hadoop Definitive Guide" outlines this process.

Map Side

When map function starts to produce key-value pairs, these values are not directly written to disk but stored in memory. While in memory they are divided into different partitions corresponding to the reducers they will be sent. If the number of reducers is set to 3 (by Job.setNumReduceTasks(3)), there will be 3 partitions.
Data coming to these partitions are then sorted by key. If combiner function is specified, it is run on sorted data. When memory limit reserved for this operation is reached, data is spilled to disk. When spill operation is continuing, new coming data can go to memory. At the end of map task, there will be several spill files. These spill files are then merged and sorted to produce the output file of map task.

To summarize, in the map side,

  • Map output is written to local disk as a single file
  • Map output file contains partitions that are ready to be copied to reducers
  • Partitions in map output are sorted by key 
  • if combiner function is specified, combiner operation is performed on map output
  • if map output compression is enabled, map output is compressed
Note: If combiner function and map output compression are used, they both increase performance. These operations reduces the size of data that is written to disk and transferred over network to reducers.

Shuffle&sort term somewhat implies that reducer side is performing sort operation, however initial sort is performed on map side. There are multiple configuration parameters that govern this map side process. For further reading, I recommend Hadoop Definitive Guide, Chapter 6.


Reduce Side

As soon as there are finished map tasks, reducers start to copy map outputs. These outputs are copied to reducer's memory if data is small enough, otherwise written to disk. Reducer collects its partition data from multiple map outputs and then merges them as they are available. When merging outputs, it maintains the sort order. When all partitions are collected and merged, reduce function is called on data.
When reduce function finishes processing merged data, it writes its output to HDFS. If replication factor is larger than 1, the first replica is stored on reducer machine assuming it has Datanode daemon running.

To summarize, in the reduce side,

  • Partitions from multiple map outputs are copied to memory and local disk
  • Copied partitions are merged maintaining the sort order. 
    • This can be thought as second sort of data, given that first sort is performed on map side.
  • If map and reduce functions use same field as key, output of reduce function will also be sorted. This is because input data is sorted.
    • However, reduce outputs are not sorted globally. Every reduce output has sort order limited to its own data.
  • If reduce output compression is enabled, reduce output is compressed
  • Reducer output is written to HDFS

3. Example

I will list some implications through an example:
We have a map-reduce job that takes a file with size 100 MB and we set the number of reducers to 5. HDFS block size and input split size for job is 64 MB.

Implications

On map side:
  • We will have 2 map output files written to disk. 
  • This disk location is determined by mapred.local.dir. If mapred.local.dir equals to /app/hadoop, one map output file may be "/app/hadoop/mapred/local/taskTracker/isa/jobcache/job_201503211800_0022/attempt_201503211800_0022_m_000000_0/output/file.out"
  • Every output file can have up to 5 partitions which we can observe by opening the file.
  • Every partition in output file will contain data sorted by key.
On reduce side
  • We will have 5 output files in HDFS
  • Every output file will have its own keys sorted
    • This show that multiple map outputs are merged maintaining the sort order.
  • There is no global sort order in whole output. (Although this can be achieved)

Further Reading

Hadoop Definitive Guide Chapter 6