Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 52
Текст из файла (страница 52)
We thenuse the MultipleOutputs instance in the reduce() method to write to the output, inplace of the context. The write() method takes the key and value, as well as a name.We use the station identifier for the name, so the overall effect is to produce output fileswith the naming scheme station_identifier-r-nnnnn.In one run, the first few output files were named as follows:output/010010-99999-r-00027output/010050-99999-r-00013output/010100-99999-r-00015output/010280-99999-r-00014output/010550-99999-r-00000output/010980-99999-r-00011output/011060-99999-r-00025output/012030-99999-r-00029output/012350-99999-r-00018output/012620-99999-r-00004The base path specified in the write() method of MultipleOutputs is interpreted rel‐ative to the output directory, and because it may contain file path separator characters(/), it’s possible to create subdirectories of arbitrary depth.
For example, the followingmodification partitions the data by station and year so that each year’s data is containedin a directory named by the station ID (such as 029070-99999/1901/part-r-00000):@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {for (Text value : values) {parser.parse(value);String basePath = String.format("%s/%s/part",parser.getStationId(), parser.getYear());multipleOutputs.write(NullWritable.get(), value, basePath);}}MultipleOutputs delegates to the mapper’s OutputFormat. In this example it’s a TextOutputFormat, but more complex setups are possible. For example, you can createnamed outputs, each with its own OutputFormat and key and value types (which maydiffer from the output types of the mapper or reducer).
Furthermore, the mapper orreducer (or both) may write to multiple output files for each record processed. Consultthe Java documentation for more information.244|Chapter 8: MapReduce Types and FormatsLazy OutputFileOutputFormat subclasses will create output (part-r-nnnnn) files, even if they areempty. Some applications prefer that empty files not be created, which is where LazyOutputFormat helps. It is a wrapper output format that ensures that the output file iscreated only when the first record is emitted for a given partition. To use it, call itssetOutputFormatClass() method with the JobConf and the underlying output format.Streaming supports a -lazyOutput option to enable LazyOutputFormat.Database OutputThe output formats for writing to relational databases and to HBase are mentioned in“Database Input (and Output)” on page 238.Output Formats|245CHAPTER 9MapReduce FeaturesThis chapter looks at some of the more advanced features of MapReduce, includingcounters and sorting and joining datasets.CountersThere are often things that you would like to know about the data you are analyzing butthat are peripheral to the analysis you are performing.
For example, if you were countinginvalid records and discovered that the proportion of invalid records in the wholedataset was very high, you might be prompted to check why so many records were beingmarked as invalid—perhaps there is a bug in the part of the program that detects invalidrecords? Or if the data was of poor quality and genuinely did have very many invalidrecords, after discovering this, you might decide to increase the size of the dataset sothat the number of good records was large enough for meaningful analysis.Counters are a useful channel for gathering statistics about the job: for quality controlor for application-level statistics. They are also useful for problem diagnosis. If you aretempted to put a log message into your map or reduce task, it is often better to seewhether you can use a counter instead to record that a particular condition occurred.In addition to counter values being much easier to retrieve than log output for largedistributed jobs, you get a record of the number of times that condition occurred, whichis more work to obtain from a set of logfiles.Built-in CountersHadoop maintains some built-in counters for every job, and these report various met‐rics.
For example, there are counters for the number of bytes and records processed,which allow you to confirm that the expected amount of input was consumed and theexpected amount of output was produced.247Counters are divided into groups, and there are several groups for the built-in counters,listed in Table 9-1.Table 9-1. Built-in counter groupsGroupName/EnumReferenceMapReduce task countersorg.apache.hadoop.mapreduce.TaskCounterTable 9-2Filesystem countersorg.apache.hadoop.mapreduce.FileSystemCounterTable 9-3FileInputFormat countersorg.apache.hadoop.mapreduce.lib.input.FileInputFormatCounterTable 9-4FileOutputFormat counters org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounterTable 9-5Job countersTable 9-6org.apache.hadoop.mapreduce.JobCounterEach group either contains task counters (which are updated as a task progresses) orjob counters (which are updated as a job progresses).
We look at both types in the fol‐lowing sections.Task countersTask counters gather information about tasks over the course of their execution, andthe results are aggregated over all the tasks in a job. The MAP_INPUT_RECORDS counter,for example, counts the input records read by each map task and aggregates over allmap tasks in a job, so that the final figure is the total number of input records for thewhole job.Task counters are maintained by each task attempt, and periodically sent to the appli‐cation master so they can be globally aggregated. (This is described in “Progress andStatus Updates” on page 190.) Task counters are sent in full every time, rather thansending the counts since the last transmission, since this guards against errors due tolost messages. Furthermore, during a job run, counters may go down if a task fails.Counter values are definitive only once a job has successfully completed.
However, somecounters provide useful diagnostic information as a task is progressing, and it can beuseful to monitor them with the web UI. For example, PHYSICAL_MEMORY_BYTES,VIRTUAL_MEMORY_BYTES, and COMMITTED_HEAP_BYTES provide an indication of howmemory usage varies over the course of a particular task attempt.The built-in task counters include those in the MapReduce task counters group(Table 9-2) and those in the file-related counters groups (Tables 9-3, 9-4, and 9-5).248|Chapter 9: MapReduce FeaturesTable 9-2.
Built-in MapReduce task countersCounterDescriptionMap input records (MAP_INPUT_RECORDS)The number of input records consumed by all the maps in the job.Incremented every time a record is read from a RecordReader andpassed to the map’s map() method by the framework.Split raw bytes (SPLIT_RAW_BYTES)The number of bytes of input-split objects read by maps. These objectsrepresent the split metadata (that is, the offset and length within afile) rather than the split data itself, so the total size should be small.Map output records (MAP_OUTPUT_RECORDS)The number of map output records produced by all the maps in thejob. Incremented every time the collect() method is called on amap’s OutputCollector.Map output bytes (MAP_OUTPUT_BYTES)The number of bytes of uncompressed output produced by all themaps in the job.
Incremented every time the collect() method iscalled on a map’s OutputCollector.Map output materialized bytes(MAP_OUTPUT_MATERIALIZED_BYTES)The number of bytes of map output actually written to disk. If mapoutput compression is enabled, this is reflected in the counter value.Combine input records(COMBINE_INPUT_RECORDS)The number of input records consumed by all the combiners (if any) inthe job. Incremented every time a value is read from the combiner’siterator over values.
Note that this count is the number of valuesconsumed by the combiner, not the number of distinct key groups(which would not be a useful metric, since there is not necessarily onegroup per key for a combiner; see “Combiner Functions” on page 34,and also “Shuffle and Sort” on page 197).Combine output records(COMBINE_OUTPUT_RECORDS)The number of output records produced by all the combiners (if any) inthe job.
Incremented every time the collect() method is called ona combiner’s OutputCollector.Reduce input groups (REDUCE_INPUT_GROUPS)The number of distinct key groups consumed by all the reducers in thejob. Incremented every time the reducer’s reduce() method is calledby the framework.Reduce input records(REDUCE_INPUT_RECORDS)The number of input records consumed by all the reducers in the job.Incremented every time a value is read from the reducer’s iterator overvalues.
If reducers consume all of their inputs, this count should be thesame as the count for map output records.Reduce output records(REDUCE_OUTPUT_RECORDS)The number of reduce output records produced by all the maps in thejob. Incremented every time the collect() method is called on areducer’s OutputCollector.Reduce shuffle bytes(REDUCE_SHUFFLE_BYTES)The number of bytes of map output copied by the shuffle to reducers.Spilled records (SPILLED_RECORDS)The number of records spilled to disk in all map and reduce tasks in thejob.CPU milliseconds (CPU_MILLISECONDS)The cumulative CPU time for a task in milliseconds, as reportedby /proc/cpuinfo.Physical memory bytes(PHYSICAL_MEMORY_BYTES)The physical memory being used by a task in bytes, as reportedby /proc/meminfo.Counters|249CounterDescriptionVirtual memory bytes(VIRTUAL_MEMORY_BYTES)The virtual memory being used by a task in bytes, as reportedby /proc/meminfo.Committed heap bytes(COMMITTED_HEAP_BYTES)The total amount of memory available in the JVM in bytes, as reportedby Runtime.getRuntime().totalMemory().GC time milliseconds (GC_TIME_MILLIS)The elapsed time for garbage collection in tasks in milliseconds, asreported by GarbageCollectorMXBean.getCollectionTime().Shuffled maps (SHUFFLED_MAPS)The number of map output files transferred to reducers by the shuffle(see “Shuffle and Sort” on page 197).Failed shuffle (FAILED_SHUFFLE)The number of map output copy failures during the shuffle.Merged map outputs (MERGED_MAP_OUTPUTS)The number of map outputs that have been merged on the reduce sideof the shuffle.Table 9-3.