Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 43
Текст из файла (страница 43)
Running the combiner function makes for a more compactmap output, so there is less data to write to local disk and to transfer to the reducer.Each time the memory buffer reaches the spill threshold, a new spill file is created, soafter the map task has written its last output record, there could be several spill files.Before the task is finished, the spill files are merged into a single partitioned and sortedoutput file. The configuration property mapreduce.task.io.sort.factor controls themaximum number of streams to merge at once; the default is 10.If there are at least three spill files (set by the mapreduce.map.combine.minspillsproperty), the combiner is run again before the output file is written.
Recall thatcombiners may be run repeatedly over the input without affecting the final result. Ifthere are only one or two spills, the potential reduction in map output size is not worththe overhead in invoking the combiner, so it is not run again for this map output.It is often a good idea to compress the map output as it is written to disk, because doingso makes it faster to write to disk, saves disk space, and reduces the amount of data totransfer to the reducer. By default, the output is not compressed, but it is easy to enablethis by setting mapreduce.map.output.compress to true. The compression library touse is specified by mapreduce.map.output.compress.codec; see “Compression” onpage 100 for more on compression formats.The output file’s partitions are made available to the reducers over HTTP. The maximumnumber of worker threads used to serve the file partitions is controlled by the mapreduce.shuffle.max.threads property; this setting is per node manager, not per maptask.
The default of 0 sets the maximum number of threads to twice the number ofprocessors on the machine.The Reduce SideLet’s turn now to the reduce part of the process. The map output file is sitting on thelocal disk of the machine that ran the map task (note that although map outputs alwaysget written to local disk, reduce outputs may not be), but now it is needed by the machinethat is about to run the reduce task for the partition. Moreover, the reduce task needsthe map output for its particular partition from several map tasks across the cluster. Themap tasks may finish at different times, so the reduce task starts copying their outputsas soon as each completes. This is known as the copy phase of the reduce task. The reducetask has a small number of copier threads so that it can fetch map outputs in parallel.198|Chapter 7: How MapReduce WorksThe default is five threads, but this number can be changed by setting the mapreduce.reduce.shuffle.parallelcopies property.How do reducers know which machines to fetch map output from?As map tasks complete successfully, they notify their applicationmaster using the heartbeat mechanism.
Therefore, for a given job, theapplication master knows the mapping between map outputs andhosts. A thread in the reducer periodically asks the master for mapoutput hosts until it has retrieved them all.Hosts do not delete map outputs from disk as soon as the first re‐ducer has retrieved them, as the reducer may subsequently fail. In‐stead, they wait until they are told to delete them by the applicationmaster, which is after the job has completed.Map outputs are copied to the reduce task JVM’s memory if they are small enough (thebuffer’s size is controlled by mapreduce.reduce.shuffle.input.buffer.percent,which specifies the proportion of the heap to use for this purpose); otherwise, they arecopied to disk. When the in-memory buffer reaches a threshold size (controlled bymapreduce.reduce.shuffle.merge.percent) or reaches a threshold number of mapoutputs (mapreduce.reduce.merge.inmem.threshold), it is merged and spilled to disk.If a combiner is specified, it will be run during the merge to reduce the amount of datawritten to disk.As the copies accumulate on disk, a background thread merges them into larger, sortedfiles.
This saves some time merging later on. Note that any map outputs that were com‐pressed (by the map task) have to be decompressed in memory in order to perform amerge on them.When all the map outputs have been copied, the reduce task moves into the sortphase (which should properly be called the merge phase, as the sorting was carried outon the map side), which merges the map outputs, maintaining their sort ordering.
Thisis done in rounds. For example, if there were 50 map outputs and the merge factor was10 (the default, controlled by the mapreduce.task.io.sort.factor property, just likein the map’s merge), there would be five rounds. Each round would merge 10 files into1, so at the end there would be 5 intermediate files.Rather than have a final round that merges these five files into a single sorted file, themerge saves a trip to disk by directly feeding the reduce function in what is the lastphase: the reduce phase. This final merge can come from a mixture of in-memory andon-disk segments.Shuffle and Sort|199The number of files merged in each round is actually more subtle thanthis example suggests.
The goal is to merge the minimum number offiles to get to the merge factor for the final round. So if there were 40files, the merge would not merge 10 files in each of the four roundsto get 4 files. Instead, the first round would merge only 4 files, andthe subsequent three rounds would merge the full 10 files. The 4merged files and the 6 (as yet unmerged) files make a total of 10 filesfor the final round. The process is illustrated in Figure 7-5.Note that this does not change the number of rounds; it’s just anoptimization to minimize the amount of data that is written to disk,since the final round always merges directly into the reduce.Figure 7-5.
Efficiently merging 40 file segments with a merge factor of 10During the reduce phase, the reduce function is invoked for each key in the sortedoutput. The output of this phase is written directly to the output filesystem, typically200|Chapter 7: How MapReduce WorksHDFS. In the case of HDFS, because the node manager is also running a datanode, thefirst block replica will be written to the local disk.Configuration TuningWe are now in a better position to understand how to tune the shuffle to improveMapReduce performance. The relevant settings, which can be used on a per-job basis(except where noted), are summarized in Tables 7-1 and 7-2, along with the defaults,which are good for general-purpose jobs.The general principle is to give the shuffle as much memory as possible.
However, thereis a trade-off, in that you need to make sure that your map and reduce functions getenough memory to operate. This is why it is best to write your map and reduce functionsto use as little memory as possible—certainly they should not use an unbounded amountof memory (avoid accumulating values in a map, for example).The amount of memory given to the JVMs in which the map and reduce tasks run isset by the mapred.child.java.opts property. You should try to make this as large aspossible for the amount of memory on your task nodes; the discussion in “Memorysettings in YARN and MapReduce” on page 301 goes through the constraints to consider.On the map side, the best performance can be obtained by avoiding multiple spills todisk; one is optimal.
If you can estimate the size of your map outputs, you can set themapreduce.task.io.sort.* properties appropriately to minimize the number of spills.In particular, you should increase mapreduce.task.io.sort.mb if you can. There is aMapReduce counter (SPILLED_RECORDS; see “Counters” on page 247) that counts the totalnumber of records that were spilled to disk over the course of a job, which can be usefulfor tuning. Note that the counter includes both map- and reduce-side spills.On the reduce side, the best performance is obtained when the intermediate data canreside entirely in memory. This does not happen by default, since for the general caseall the memory is reserved for the reduce function.
But if your reduce function has lightmemory requirements, setting mapreduce.reduce.merge.inmem.threshold to 0 andmapreduce.reduce.input.buffer.percent to 1.0 (or a lower value; see Table 7-2) maybring a performance boost.In April 2008, Hadoop won the general-purpose terabyte sort benchmark (as discussedin “A Brief History of Apache Hadoop” on page 12), and one of the optimizations usedwas keeping the intermediate data in memory on the reduce side.More generally, Hadoop uses a buffer size of 4 KB by default, which is low, so you shouldincrease this across the cluster (by setting io.file.buffer.size; see also “Other Ha‐doop Properties” on page 307).Shuffle and Sort|201Table 7-1.
Map-side tuning propertiesProperty nameTypeDefault valueDescriptionmapreduce.task.io.sort.mbint100The size, in megabytes, of the memorybuffer to use while sorting map output.mapreduce.map.sort.spill.percentfloat0.80The threshold usage proportion for boththe map output memory buffer and therecord boundaries index to start theprocess of spilling to disk.mapreduce.task.io.sort.factorint10The maximum number of streams tomerge at once when sorting files.