Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 54
Текст из файла (страница 54)
Application to calculate the proportion of records with missing tempera‐ture fieldsimport org.apache.hadoop.conf.Configured;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.util.*;public class MissingTemperatureFields extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {if (args.length != 1) {JobBuilder.printUsage(this, "<job ID>");return -1;}String jobID = args[0];Cluster cluster = new Cluster(getConf());Job job = cluster.getJob(JobID.forName(jobID));if (job == null) {System.err.printf("No job with ID %s found.\n", jobID);return -1;}if (!job.isComplete()) {System.err.printf("Job %s is not complete.\n", jobID);return -1;}Counters counters = job.getCounters();long missing = counters.findCounter(MaxTemperatureWithCounters.Temperature.MISSING).getValue();long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();System.out.printf("Records with missing temperature fields: %.2f%%\n",100.0 * missing / total);return 0;}254|Chapter 9: MapReduce Featurespublic static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new MissingTemperatureFields(), args);System.exit(exitCode);}}First we retrieve a Job object from a Cluster by calling the getJob() method with thejob ID.
We check whether there is actually a job with the given ID by checking if it isnull. There may not be, either because the ID was incorrectly specified or because thejob is no longer in the job history.After confirming that the job has completed, we call the Job’s getCounters() method,which returns a Counters object encapsulating all the counters for the job. The Countersclass provides various methods for finding the names and values of counters. We usethe findCounter() method, which takes an enum to find the number of records thathad a missing temperature field and also the total number of records processed (froma built-in counter).Finally, we print the proportion of records that had a missing temperature field. Here’swhat we get for the whole weather dataset:% hadoop jar hadoop-examples.jar MissingTemperatureFields job_1410450250506_0007Records with missing temperature fields: 5.47%User-Defined Streaming CountersA Streaming MapReduce program can increment counters by sending a specially for‐matted line to the standard error stream, which is co-opted as a control channel in thiscase.
The line must have the following format:reporter:counter:group,counter,amountThis snippet in Python shows how to increment the “Missing” counter in the “Tem‐perature” group by 1:sys.stderr.write("reporter:counter:Temperature,Missing,1\n")In a similar way, a status message may be sent with a line formatted like this:reporter:status:messageSortingThe ability to sort data is at the heart of MapReduce. Even if your application isn’tconcerned with sorting per se, it may be able to use the sorting stage that MapReduceprovides to organize its data.
In this section, we examine different ways of sortingdatasets and how you can control the sort order in MapReduce. Sorting Avro data iscovered separately, in “Sorting Using Avro MapReduce” on page 363.Sorting|255PreparationWe are going to sort the weather dataset by temperature. Storing temperatures as Textobjects doesn’t work for sorting purposes, because signed integers don’t sortlexicographically.1 Instead, we are going to store the data using sequence files whoseIntWritable keys represent the temperatures (and sort correctly) and whose Textvalues are the lines of data.The MapReduce job in Example 9-3 is a map-only job that also filters the input to removerecords that don’t have a valid temperature reading.
Each map creates a single blockcompressed sequence file as output. It is invoked with the following command:% hadoop jar hadoop-examples.jar SortDataPreprocessor input/ncdc/all \input/ncdc/all-seqExample 9-3. A MapReduce program for transforming the weather data into Sequence‐File formatpublic class SortDataPreprocessor extends Configured implements Tool {static class CleanerMapperextends Mapper<LongWritable, Text, IntWritable, Text> {private NcdcRecordParser parser = new NcdcRecordParser();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {parser.parse(value);if (parser.isValidTemperature()) {context.write(new IntWritable(parser.getAirTemperature()), value);}}}@Overridepublic int run(String[] args) throws Exception {Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);if (job == null) {return -1;}job.setMapperClass(CleanerMapper.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(Text.class);1.
One commonly used workaround for this problem—particularly in text-based Streaming applications—isto add an offset to eliminate all negative numbers and to left pad with zeros so all numbers are the samenumber of characters. However, see “Streaming” on page 266 for another approach.256|Chapter 9: MapReduce Featuresjob.setNumReduceTasks(0);job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setCompressOutput(job, true);SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SortDataPreprocessor(), args);System.exit(exitCode);}}Partial SortIn “The Default MapReduce Job” on page 214, we saw that, by default, MapReduce willsort input records by their keys.
Example 9-4 is a variation for sorting sequence fileswith IntWritable keys.Example 9-4. A MapReduce program for sorting a SequenceFile with IntWritable keysusing the default HashPartitionerpublic class SortByTemperatureUsingHashPartitioner extends Configuredimplements Tool {@Overridepublic int run(String[] args) throws Exception {Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);if (job == null) {return -1;}job.setInputFormatClass(SequenceFileInputFormat.class);job.setOutputKeyClass(IntWritable.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setCompressOutput(job, true);SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SortByTemperatureUsingHashPartitioner(),args);System.exit(exitCode);}}Sorting|257Controlling Sort OrderThe sort order for keys is controlled by a RawComparator, which is found as follows:1.
If the property mapreduce.job.output.key.comparator.class is set, either ex‐plicitly or by calling setSortComparatorClass() on Job, then an instance of thatclass is used. (In the old API, the equivalent method is setOutputKeyComparatorClass() on JobConf.)2. Otherwise, keys must be a subclass of WritableComparable, and the registeredcomparator for the key class is used.3. If there is no registered comparator, then a RawComparator is used. The RawComparator deserializes the byte streams being compared into objects and delegates tothe WritableComparable’s compareTo() method.These rules reinforce the importance of registering optimized versions of RawComparators for your own custom Writable classes (which is covered in “Implementing a Raw‐Comparator for speed” on page 123), and also show that it’s straightforward to overridethe sort order by setting your own comparator (we do this in “Secondary Sort” on page262).Suppose we run this program using 30 reducers:2% hadoop jar hadoop-examples.jar SortByTemperatureUsingHashPartitioner \-D mapreduce.job.reduces=30 input/ncdc/all-seq output-hashsortThis command produces 30 output files, each of which is sorted.
However, there is noeasy way to combine the files (by concatenation, for example, in the case of plain-textfiles) to produce a globally sorted file.For many applications, this doesn’t matter. For example, having a partially sorted set offiles is fine when you want to do lookups by key. The SortByTemperatureToMapFileand LookupRecordsByTemperature classes in this book’s example code explore this idea.By using a map file instead of a sequence file, it’s possible to first find the relevantpartition that a key belongs in (using the partitioner), then to do an efficient lookup ofthe record within the map file partition.2. See “Sorting and merging SequenceFiles” on page 132 for how to do the same thing using the sort programexample that comes with Hadoop.258|Chapter 9: MapReduce FeaturesTotal SortHow can you produce a globally sorted file using Hadoop? The naive answer is to usea single partition.3 But this is incredibly inefficient for large files, because one machinehas to process all of the output, so you are throwing away the benefits of the parallelarchitecture that MapReduce provides.Instead, it is possible to produce a set of sorted files that, if concatenated, would forma globally sorted file.
The secret to doing this is to use a partitioner that respects thetotal order of the output. For example, if we had four partitions, we could put keys fortemperatures less than –10°C in the first partition, those between –10°C and 0°C in thesecond, those between 0°C and 10°C in the third, and those over 10°C in the fourth.Although this approach works, you have to choose your partition sizes carefully toensure that they are fairly even, so job times aren’t dominated by a single reducer. Forthe partitioning scheme just described, the relative sizes of the partitions are as follows:Temperature range< –10°C[–10°C, 0°C)[0°C, 10°C)>= 10°CProportion of records11%13%17%59%These partitions are not very even.
To construct more even partitions, we need to havea better understanding of the temperature distribution for the whole dataset. It’s fairlyeasy to write a MapReduce job to count the number of records that fall into a collectionof temperature buckets. For example, Figure 9-1 shows the distribution for buckets ofsize 1°C, where each point on the plot corresponds to one bucket.Although we could use this information to construct a very even set of partitions, thefact that we needed to run a job that used the entire dataset to construct them is notideal. It’s possible to get a fairly even set of partitions by sampling the key space. Theidea behind sampling is that you look at a small subset of the keys to approximate thekey distribution, which is then used to construct partitions.