Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 11
Текст из файла (страница 11)
As explained in Chapter 3, for eachHDFS block of the reduce output, the first replica is stored on the local node, with otherreplicas being stored on off-rack nodes for reliability. Thus, writing the reduce outputdoes consume network bandwidth, but only as much as a normal HDFS write pipelineconsumes.The whole data flow with a single reduce task is illustrated in Figure 2-3.
The dottedboxes indicate nodes, the dotted arrows show data transfers on a node, and the solidarrows show data transfers between nodes.32|Chapter 2: MapReduceFigure 2-3. MapReduce data flow with a single reduce taskThe number of reduce tasks is not governed by the size of the input, but instead isspecified independently. In “The Default MapReduce Job” on page 214, you will see howto choose the number of reduce tasks for a given job.When there are multiple reducers, the map tasks partition their output, each creatingone partition for each reduce task. There can be many keys (and their associated values)in each partition, but the records for any given key are all in a single partition. Thepartitioning can be controlled by a user-defined partitioning function, but normally thedefault partitioner—which buckets keys using a hash function—works very well.The data flow for the general case of multiple reduce tasks is illustrated in Figure 2-4.This diagram makes it clear why the data flow between map and reduce tasks is collo‐quially known as “the shuffle,” as each reduce task is fed by many map tasks.
The shuffleis more complicated than this diagram suggests, and tuning it can have a big impact onjob execution time, as you will see in “Shuffle and Sort” on page 197.Scaling Out|33Figure 2-4. MapReduce data flow with multiple reduce tasksFinally, it’s also possible to have zero reduce tasks. This can be appropriate when youdon’t need the shuffle because the processing can be carried out entirely in parallel (afew examples are discussed in “NLineInputFormat” on page 234). In this case, the onlyoff-node data transfer is when the map tasks write to HDFS (see Figure 2-5).Combiner FunctionsMany MapReduce jobs are limited by the bandwidth available on the cluster, so it paysto minimize the data transferred between map and reduce tasks.
Hadoop allows the userto specify a combiner function to be run on the map output, and the combiner function’soutput forms the input to the reduce function. Because the combiner function is anoptimization, Hadoop does not provide a guarantee of how many times it will call it fora particular map output record, if at all. In other words, calling the combiner functionzero, one, or many times should produce the same output from the reducer.34|Chapter 2: MapReduceFigure 2-5.
MapReduce data flow with no reduce tasksThe contract for the combiner function constrains the type of function that may be used.This is best illustrated with an example. Suppose that for the maximum temperatureexample, readings for the year 1950 were processed by two maps (because they were indifferent splits). Imagine the first map produced the output:(1950, 0)(1950, 20)(1950, 10)and the second produced:(1950, 25)(1950, 15)The reduce function would be called with a list of all the values:(1950, [0, 20, 10, 25, 15])with output:(1950, 25)since 25 is the maximum value in the list. We could use a combiner function that, justlike the reduce function, finds the maximum temperature for each map output. Thereduce function would then be called with:(1950, [20, 25])and would produce the same output as before. More succinctly, we may express thefunction calls on the temperature values in this case as follows:max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25Scaling Out|35Not all functions possess this property.1 For example, if we were calculating mean tem‐peratures, we couldn’t use the mean as our combiner function, because:mean(0, 20, 10, 25, 15) = 14but:mean(mean(0, 20, 10), mean(25, 15)) = mean(10, 20) = 15The combiner function doesn’t replace the reduce function.
(How could it? The reducefunction is still needed to process records with the same key from different maps.) Butit can help cut down the amount of data shuffled between the mappers and the reducers,and for this reason alone it is always worth considering whether you can use a combinerfunction in your MapReduce job.Specifying a combiner functionGoing back to the Java MapReduce program, the combiner function is defined usingthe Reducer class, and for this application, it is the same implementation as the reducefunction in MaxTemperatureReducer. The only change we need to make is to set thecombiner class on the Job (see Example 2-6).Example 2-6.
Application to find the maximum temperature, using a combiner func‐tion for efficiencypublic class MaxTemperatureWithCombiner {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: MaxTemperatureWithCombiner <input path> " +"<output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperatureWithCombiner.class);job.setJobName("Max temperature");FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(MaxTemperatureMapper.class);job.setCombinerClass(MaxTemperatureReducer.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);1.
Functions with this property are called commutative and associative. They are also sometimes referred to asdistributive, such as by Jim Gray et al.’s “Data Cube: A Relational Aggregation Operator Generalizing GroupBy, Cross-Tab, and Sub-Totals,” February1995.36|Chapter 2: MapReducejob.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}}Running a Distributed MapReduce JobThe same program will run, without alteration, on a full dataset. This is the point ofMapReduce: it scales to the size of your data and the size of your hardware. Here’s onedata point: on a 10-node EC2 cluster running High-CPU Extra Large instances, theprogram took six minutes to run.2We’ll go through the mechanics of running programs on a cluster in Chapter 6.Hadoop StreamingHadoop provides an API to MapReduce that allows you to write your map and reducefunctions in languages other than Java.
Hadoop Streaming uses Unix standard streamsas the interface between Hadoop and your program, so you can use any language thatcan read standard input and write to standard output to write your MapReduceprogram.3Streaming is naturally suited for text processing. Map input data is passed over standardinput to your map function, which processes it line by line and writes lines to standardoutput. A map output key-value pair is written as a single tab-delimited line.
Input tothe reduce function is in the same format—a tab-separated key-value pair—passed overstandard input. The reduce function reads lines from standard input, which the frame‐work guarantees are sorted by key, and writes its results to standard output.Let’s illustrate this by rewriting our MapReduce program for finding maximum tem‐peratures by year in Streaming.RubyThe map function can be expressed in Ruby as shown in Example 2-7.2. This is a factor of seven faster than the serial run on one machine using awk. The main reason it wasn’tproportionately faster is because the input data wasn’t evenly partitioned.
For convenience, the input fileswere gzipped by year, resulting in large files for later years in the dataset, when the number of weather recordswas much higher.3. Hadoop Pipes is an alternative to Streaming for C++ programmers.
It uses sockets to communicate with theprocess running the C++ map or reduce function.Hadoop Streaming|37Example 2-7. Map function for maximum temperature in Ruby#!/usr/bin/env rubySTDIN.each_line do |line|val = lineyear, temp, q = val[15,4], val[87,5], val[92,1]puts "#{year}\t#{temp}" if (temp != "+9999" && q =~ /[01459]/)endThe program iterates over lines from standard input by executing a block for each linefrom STDIN (a global constant of type IO). The block pulls out the relevant fields fromeach input line and, if the temperature is valid, writes the year and the temperatureseparated by a tab character, \t, to standard output (using puts).It’s worth drawing out a design difference between Streaming and theJava MapReduce API.
The Java API is geared toward processing yourmap function one record at a time. The framework calls the map()method on your Mapper for each record in the input, whereas withStreaming the map program can decide how to process the input—for example, it could easily read and process multiple lines at a timesince it’s in control of the reading.
The user’s Java map implementa‐tion is “pushed” records, but it’s still possible to consider multiple linesat a time by accumulating previous lines in an instance variable in theMapper.4 In this case, you need to implement the cleanup() methodso that you know when the last record has been read, so you can finishprocessing the last group of lines.Because the script just operates on standard input and output, it’s trivial to test the scriptwithout using Hadoop, simply by using Unix pipes:% cat input/ncdc/sample.txt | ch02-mr-intro/src/main/ruby/max_temperature_map.rb1950+00001950+00221950-00111949+01111949+0078The reduce function shown in Example 2-8 is a little more complex.Example 2-8. Reduce function for maximum temperature in Ruby#!/usr/bin/env rubylast_key, max_val = nil, -1000000STDIN.each_line do |line|key, val = line.split("\t")4.