Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 45
Текст из файла (страница 45)
Inthe default implementation, this will delete the job’s temporary working space.206| Chapter 7: How MapReduce WorksThe operations are similar at the task level. The setupTask() method is called beforethe task is run, and the default implementation doesn’t do anything, because temporarydirectories named for task outputs are created when the task outputs are written.The commit phase for tasks is optional and may be disabled by returning false fromneedsTaskCommit(). This saves the framework from having to run the distributedcommit protocol for the task, and neither commitTask() nor abortTask() is called.FileOutputCommitter will skip the commit phase when no output has been written bya task.If a task succeeds, commitTask() is called, which in the default implementation movesthe temporary task output directory (which has the task attempt ID in its name to avoidconflicts between task attempts) to the final output path, ${mapreduce.output.fileoutputformat.outputdir}.
Otherwise, the framework calls abortTask(), which de‐letes the temporary task output directory.The framework ensures that in the event of multiple task attempts for a particular task,only one will be committed; the others will be aborted. This situation may arise becausethe first attempt failed for some reason—in which case, it would be aborted, and a later,successful attempt would be committed. It can also occur if two task attempts wererunning concurrently as speculative duplicates; in this instance, the one that finishedfirst would be committed, and the other would be aborted.Task side-effect filesThe usual way of writing output from map and reduce tasks is by using OutputCollector to collect key-value pairs.
Some applications need more flexibility than a single keyvalue pair model, so these applications write output files directly from the map or reducetask to a distributed filesystem, such as HDFS. (There are other ways to produce multipleoutputs, too, as described in “Multiple Outputs” on page 240.)Care needs to be taken to ensure that multiple instances of the same task don’t try towrite to the same file.
As we saw in the previous section, the OutputCommitter protocolsolves this problem. If applications write side files in their tasks’ working directories,the side files for tasks that successfully complete will be promoted to the output directoryautomatically, whereas failed tasks will have their side files deleted.A task may find its working directory by retrieving the value of the mapreduce.task.output.dir property from the job configuration. Alternatively, a MapReduce program us‐ing the Java API may call the getWorkOutputPath() static method on FileOutputFormat to get the Path object representing the working directory.
The framework createsthe working directory before executing the task, so you don’t need to create it.To take a simple example, imagine a program for converting image files from one formatto another. One way to do this is to have a map-only job, where each map is given a setof images to convert (perhaps using NLineInputFormat; see “NLineInputFormat” onTask Execution|207page 234). If a map task writes the converted images into its working directory, they willbe promoted to the output directory when the task successfully finishes.208|Chapter 7: How MapReduce WorksCHAPTER 8MapReduce Types and FormatsMapReduce has a simple model of data processing: inputs and outputs for the map andreduce functions are key-value pairs.
This chapter looks at the MapReduce model indetail, and in particular at how data in various formats, from simple text to structuredbinary objects, can be used with this model.MapReduce TypesThe map and reduce functions in Hadoop MapReduce have the following general form:map: (K1, V1) → list(K2, V2)reduce: (K2, list(V2)) → list(K3, V3)In general, the map input key and value types (K1 and V1) are different from the mapoutput types (K2 and V2).
However, the reduce input must have the same types as themap output, although the reduce output types may be different again (K3 and V3). TheJava API mirrors this general form:public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {// ...}protected void map(KEYIN key, VALUEIN value,Context context) throws IOException, InterruptedException {// ...}}public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {public class Context extends ReducerContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {// ...}protected void reduce(KEYIN key, Iterable<VALUEIN> values,Context context) throws IOException, InterruptedException {209// ...}}The context objects are used for emitting key-value pairs, and they are parameterizedby the output types so that the signature of the write() method is:public void write(KEYOUT key, VALUEOUT value)throws IOException, InterruptedExceptionSince Mapper and Reducer are separate classes, the type parameters have differentscopes, and the actual type argument of KEYIN (say) in the Mapper may be different fromthe type of the type parameter of the same name (KEYIN) in the Reducer.
For instance,in the maximum temperature example from earlier chapters, KEYIN is replaced by LongWritable for the Mapper and by Text for the Reducer.Similarly, even though the map output types and the reduce input types must match,this is not enforced by the Java compiler.The type parameters are named differently from the abstract types (KEYIN versus K1,and so on), but the form is the same.If a combiner function is used, then it has the same form as the reduce function (and isan implementation of Reducer), except its output types are the intermediate key andvalue types (K2 and V2), so they can feed the reduce function:map: (K1, V1) → list(K2, V2)combiner: (K2, list(V2)) → list(K2, V2)reduce: (K2, list(V2)) → list(K3, V3)Often the combiner and reduce functions are the same, in which case K3 is the same asK2, and V3 is the same as V2.The partition function operates on the intermediate key and value types (K2 and V2)and returns the partition index.
In practice, the partition is determined solely by thekey (the value is ignored):partition: (K2, V2) → integerOr in Java:public abstract class Partitioner<KEY, VALUE> {public abstract int getPartition(KEY key, VALUE value, int numPartitions);}210|Chapter 8: MapReduce Types and FormatsMapReduce Signatures in the Old APIIn the old API (see Appendix D), the signatures are very similar and actually name thetype parameters K1, V1, and so on, although the constraints on the types are exactly thesame in both the old and new APIs:public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {void map(K1 key, V1 value,OutputCollector<K2, V2> output, Reporter reporter) throws IOException;}public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {void reduce(K2 key, Iterator<V2> values,OutputCollector<K3, V3> output, Reporter reporter) throws IOException;}public interface Partitioner<K2, V2> extends JobConfigurable {int getPartition(K2 key, V2 value, int numPartitions);}So much for the theory.
How does this help you configure MapReduce jobs? Table 8-1summarizes the configuration options for the new API (and Table 8-2 does the samefor the old API). It is divided into the properties that determine the types and those thathave to be compatible with the configured types.Input types are set by the input format. So, for instance, a TextInputFormat generateskeys of type LongWritable and values of type Text. The other types are set explicitly bycalling the methods on the Job (or JobConf in the old API).
If not set explicitly, theintermediate types default to the (final) output types, which default to LongWritableand Text. So, if K2 and K3 are the same, you don’t need to call setMapOutputKeyClass(), because it falls back to the type set by calling setOutputKeyClass(). Similarly,if V2 and V3 are the same, you only need to use setOutputValueClass().It may seem strange that these methods for setting the intermediate and final outputtypes exist at all.
After all, why can’t the types be determined from a combination of themapper and the reducer? The answer has to do with a limitation in Java generics: typeerasure means that the type information isn’t always present at runtime, so Hadoop hasto be given it explicitly. This also means that it’s possible to configure a MapReduce jobwith incompatible types, because the configuration isn’t checked at compile time.
Thesettings that have to be compatible with the MapReduce types are listed in the lowerpart of Table 8-1. Type conflicts are detected at runtime during job execution, and forthis reason, it is wise to run a test job using a small amount of data to flush out and fixany type incompatibilities.MapReduce Types|211212|Chapter 8: MapReduce Types and FormatssetMapOutputKeyClass()setMapOutputValueClass()setOutputKeyClass()setOutputValueClass()mapreduce.map.output.key.classmapreduce.map.output.value.classmapreduce.job.output.key.classmapreduce.job.output.value.class•setOutputFormatClass()mapreduce.job.outputformat.class••••••setReducerClass()setSortComparatorClass()mapreduce.job.output.key.comparator.class••V2••K2mapreduce.job.reduce.classsetPartitionerClass()mapreduce.job.partitioner.class••V1•setCombinerClass()mapreduce.job.combine.class••K1••••V3••K3Input types Intermediate types Output typesmapreduce.job.output.group.comparator.class setGroupingComparatorClass()setMapperClass()mapreduce.job.map.classProperties that must be consistent with the types:setInputFormatClass()Job setter methodmapreduce.job.inputformat.classProperties for configuring types:PropertyTable 8-1.
Configuration of MapReduce types in the new APIMapReduce Types|213setMapOutputKeyClass()setMapOutputValueClass()setOutputKeyClass()setOutputValueClass()mapred.mapoutput.key.classmapred.mapoutput.value.classmapred.output.key.classmapred.output.value.class•setReducerClass()setOutputFormat()mapred.output.format.class•••mapred.reducer.class•••setPartitionerClass()mapred.partitioner.class•••V2•mapred.output.value.groupfn.class setOutputValueGroupingComparator()setCombinerClass()mapred.combiner.class••••K2•setMapRunnerClass()mapred.map.runner.class••V1••K1••••V3••K3Input types Intermediate types Output typesmapred.output.key.comparator.class setOutputKeyComparatorClass()setMapperClass()mapred.mapper.classProperties that must be consistent with the types:setInputFormat()JobConf setter methodmapred.input.format.classProperties for configuring types:PropertyTable 8-2.
Configuration of MapReduce types in the old APIThe Default MapReduce JobWhat happens when you run MapReduce without setting a mapper or a reducer? Let’stry it by running this minimal MapReduce program:public class MinimalMapReduce extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {if (args.length != 2) {System.err.printf("Usage: %s [generic options] <input> <output>\n",getClass().getSimpleName());ToolRunner.printGenericCommandUsage(System.err);return -1;}Job job = new Job(getConf());job.setJarByClass(getClass());FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new MinimalMapReduce(), args);System.exit(exitCode);}}The only configuration that we set is an input path and an output path.
We run it overa subset of our weather data with the following:% hadoop MinimalMapReduce "input/ncdc/all/190{1,2}.gz" outputWe do get some output: one file named part-r-00000 in the output directory. Here’s whatthe first few lines look like (truncated to fit the page):0→0029029070999991901010106004+64333+023450FM-12+000599999V0202701N01591...0→0035029070999991902010106004+64333+023450FM-12+000599999V0201401N01181...135→0029029070999991901010113004+64333+023450FM-12+000599999V0202901N00821...141→0035029070999991902010113004+64333+023450FM-12+000599999V0201401N01181...270→0029029070999991901010120004+64333+023450FM-12+000599999V0209991C00001...282→0035029070999991902010120004+64333+023450FM-12+000599999V0201401N01391...Each line is an integer followed by a tab character, followed by the original weather datarecord.