Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 46
Текст из файла (страница 46)
Admittedly, it’s not a very useful program, but understanding how it producesits output does provide some insight into the defaults that Hadoop uses when runningMapReduce jobs. Example 8-1 shows a program that has exactly the same effect asMinimalMapReduce, but explicitly sets the job settings to their defaults.214|Chapter 8: MapReduce Types and FormatsExample 8-1. A minimal MapReduce driver, with the defaults explicitly setpublic class MinimalMapReduceWithDefaults extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);if (job == null) {return -1;}job.setInputFormatClass(TextInputFormat.class);job.setMapperClass(Mapper.class);job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(Text.class);job.setPartitionerClass(HashPartitioner.class);job.setNumReduceTasks(1);job.setReducerClass(Reducer.class);job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(Text.class);job.setOutputFormatClass(TextOutputFormat.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args);System.exit(exitCode);}}We’ve simplified the first few lines of the run() method by extracting the logic forprinting usage and setting the input and output paths into a helper method.
Almost allMapReduce drivers take these two arguments (input and output), so reducingthe boilerplate code here is a good thing. Here are the relevant methods in theJobBuilder class for reference:public static Job parseInputAndOutput(Tool tool, Configuration conf,String[] args) throws IOException {if (args.length != 2) {printUsage(tool, "<input> <output>");return null;}Job job = new Job(conf);job.setJarByClass(tool.getClass());MapReduce Types|215FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));return job;}public static void printUsage(Tool tool, String extraArgsUsage) {System.err.printf("Usage: %s [genericOptions] %s\n\n",tool.getClass().getSimpleName(), extraArgsUsage);GenericOptionsParser.printGenericCommandUsage(System.err);}Going back to MinimalMapReduceWithDefaults in Example 8-1, although there aremany other default job settings, the ones bolded are those most central to running a job.Let’s go through them in turn.The default input format is TextInputFormat, which produces keys of type LongWritable (the offset of the beginning of the line in the file) and values of type Text (the lineof text).
This explains where the integers in the final output come from: they are theline offsets.The default mapper is just the Mapper class, which writes the input key and value un‐changed to the output:public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {protected void map(KEYIN key, VALUEIN value,Context context) throws IOException, InterruptedException {context.write((KEYOUT) key, (VALUEOUT) value);}}Mapper is a generic type, which allows it to work with any key or value types.
In thiscase, the map input and output key is of type LongWritable, and the map input andoutput value is of type Text.The default partitioner is HashPartitioner, which hashes a record’s key to determinewhich partition the record belongs in. Each partition is processed by a reduce task, sothe number of partitions is equal to the number of reduce tasks for the job:public class HashPartitioner<K, V> extends Partitioner<K, V> {public int getPartition(K key, V value,int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}The key’s hash code is turned into a nonnegative integer by bitwise ANDing it with thelargest integer value. It is then reduced modulo the number of partitions to find theindex of the partition that the record belongs in.216|Chapter 8: MapReduce Types and FormatsBy default, there is a single reducer, and therefore a single partition; the action of thepartitioner is irrelevant in this case since everything goes into one partition.
However,it is important to understand the behavior of HashPartitioner when you have morethan one reduce task. Assuming the key’s hash function is a good one, the records willbe allocated evenly across reduce tasks, with all records that share the same key beingprocessed by the same reduce task.You may have noticed that we didn’t set the number of map tasks. The reason for thisis that the number is equal to the number of splits that the input is turned into, whichis driven by the size of the input and the file’s block size (if the file is in HDFS). Theoptions for controlling split size are discussed in “FileInputFormat input splits” on page224.Choosing the Number of ReducersThe single reducer default is something of a gotcha for new users to Hadoop. Almostall real-world jobs should set this to a larger number; otherwise, the job will be very slowsince all the intermediate data flows through a single reduce task.Choosing the number of reducers for a job is more of an art than a science.
Increasingthe number of reducers makes the reduce phase shorter, since you get more parallelism.However, if you take this too far, you can have lots of small files, which is suboptimal.One rule of thumb is to aim for reducers that each run for five minutes or so, and whichproduce at least one HDFS block’s worth of output.The default reducer is Reducer, again a generic type, which simply writes all its inputto its output:public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context contextContext context) throws IOException, InterruptedException {for (VALUEIN value: values) {context.write((KEYOUT) key, (VALUEOUT) value);}}}For this job, the output key is LongWritable and the output value is Text. In fact, allthe keys for this MapReduce program are LongWritable and all the values are Text,since these are the input keys and values, and the map and reduce functions are bothidentity functions, which by definition preserve type.
Most MapReduce programs,however, don’t use the same key or value types throughout, so you need to configurethe job to declare the types you are using, as described in the previous section.MapReduce Types|217Records are sorted by the MapReduce system before being presented to the reducer. Inthis case, the keys are sorted numerically, which has the effect of interleaving the linesfrom the input files into one combined output file.The default output format is TextOutputFormat, which writes out records, one per line,by converting keys and values to strings and separating them with a tab character. Thisis why the output is tab-separated: it is a feature of TextOutputFormat.The default Streaming jobIn Streaming, the default job is similar, but not identical, to the Java equivalent.
Thebasic form is:% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \-input input/ncdc/sample.txt \-output output \-mapper /bin/catWhen we specify a non-Java mapper and the default text mode is in effect (-io text),Streaming does something special. It doesn’t pass the key to the mapper process; it justpasses the value. (For other input formats, the same effect can be achieved by settingstream.map.input.ignoreKey to true.) This is actually very useful because the key isjust the line offset in the file and the value is the line, which is all most applications areinterested in. The overall effect of this job is to perform a sort of the input.With more of the defaults spelled out, the command looks like this (notice that Stream‐ing uses the old MapReduce API classes):% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \-input input/ncdc/sample.txt \-output output \-inputformat org.apache.hadoop.mapred.TextInputFormat \-mapper /bin/cat \-partitioner org.apache.hadoop.mapred.lib.HashPartitioner \-numReduceTasks 1 \-reducer org.apache.hadoop.mapred.lib.IdentityReducer \-outputformat org.apache.hadoop.mapred.TextOutputFormat-io textThe -mapper and -reducer arguments take a command or a Java class.
A combiner mayoptionally be specified using the -combiner argument.Keys and values in StreamingA Streaming application can control the separator that is used when a key-value pair isturned into a series of bytes and sent to the map or reduce process over standard input.The default is a tab character, but it is useful to be able to change it in the case that thekeys or values themselves contain tab characters.218|Chapter 8: MapReduce Types and FormatsSimilarly, when the map or reduce writes out key-value pairs, they may be separated bya configurable separator.
Furthermore, the key from the output can be composed ofmore than the first field: it can be made up of the first n fields (defined bystream.num.map.output.key.fields or stream.num.reduce.output.key.fields),with the value being the remaining fields. For example, if the output from a Streamingprocess was a,b,c (with a comma as the separator), and n was 2, the key would be parsedas a,b and the value as c.Separators may be configured independently for maps and reduces. The properties arelisted in Table 8-3 and shown in a diagram of the data flow path in Figure 8-1.These settings do not have any bearing on the input and output formats. For example,if stream.reduce.output.field.separator were set to be a colon, say, and the reducestream process wrote the line a:b to standard out, the Streaming reducer would knowto extract the key as a and the value as b.