Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 55
Текст из файла (страница 55)
Luckily, we don’t have towrite the code to do this ourselves, as Hadoop comes with a selection of samplers.The InputSampler class defines a nested Sampler interface whose implementationsreturn a sample of keys given an InputFormat and Job:public interface Sampler<K, V> {K[] getSample(InputFormat<K, V> inf, Job job)throws IOException, InterruptedException;}3. A better answer is to use Pig (“Sorting Data” on page 465), Hive (“Sorting and Aggregating” on page 503), Crunch,or Spark, all of which can sort with a single command.Sorting|259Figure 9-1. Temperature distribution for the weather datasetThis interface usually is not called directly by clients.
Instead, the writePartitionFile() static method on InputSampler is used, which creates a sequence file to storethe keys that define the partitions:public static <K, V> void writePartitionFile(Job job, Sampler<K, V> sampler)throws IOException, ClassNotFoundException, InterruptedExceptionThe sequence file is used by TotalOrderPartitioner to create partitions for the sortjob. Example 9-5 puts it all together.Example 9-5. A MapReduce program for sorting a SequenceFile with IntWritable keysusing the TotalOrderPartitioner to globally sort the datapublic class SortByTemperatureUsingTotalOrderPartitioner extends Configuredimplements Tool {@Overridepublic int run(String[] args) throws Exception {Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);if (job == null) {return -1;}job.setInputFormatClass(SequenceFileInputFormat.class);job.setOutputKeyClass(IntWritable.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);SequenceFileOutputFormat.setCompressOutput(job, true);SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);260| Chapter 9: MapReduce FeaturesSequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);job.setPartitionerClass(TotalOrderPartitioner.class);InputSampler.Sampler<IntWritable, Text> sampler =new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10);InputSampler.writePartitionFile(job, sampler);// Add to DistributedCacheConfiguration conf = job.getConfiguration();String partitionFile = TotalOrderPartitioner.getPartitionFile(conf);URI partitionUri = new URI(partitionFile);job.addCacheFile(partitionUri);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SortByTemperatureUsingTotalOrderPartitioner(), args);System.exit(exitCode);}}We use a RandomSampler, which chooses keys with a uniform probability—here, 0.1.There are also parameters for the maximum number of samples to take and the maxi‐mum number of splits to sample (here, 10,000 and 10, respectively; these settings arethe defaults when InputSampler is run as an application), and the sampler stops whenthe first of these limits is met.
Samplers run on the client, making it important to limitthe number of splits that are downloaded so the sampler runs quickly. In practice, thetime taken to run the sampler is a small fraction of the overall job time.The InputSampler writes a partition file that we need to share with the tasks runningon the cluster by adding it to the distributed cache (see “Distributed Cache” on page274).On one run, the sampler chose –5.6°C, 13.9°C, and 22.0°C as partition boundaries (forfour partitions), which translates into more even partition sizes than the earlier choice:Temperature range< –5.6°C [–5.6°C, 13.9°C) [13.9°C, 22.0°C) >= 22.0°CProportion of records 29%24%23%24%Sorting|261Your input data determines the best sampler to use.
For example, SplitSampler, whichsamples only the first n records in a split, is not so good for sorted data,4 because itdoesn’t select keys from throughout the split.On the other hand, IntervalSampler chooses keys at regular intervals through the splitand makes a better choice for sorted data. RandomSampler is a good general-purposesampler. If none of these suits your application (and remember that the point of samplingis to produce partitions that are approximately equal in size), you can write your ownimplementation of the Sampler interface.One of the nice properties of InputSampler and TotalOrderPartitioner is that youare free to choose the number of partitions—that is, the number of reducers. However,TotalOrderPartitioner will work only if the partition boundaries are distinct. Oneproblem with choosing a high number is that you may get collisions if you have a smallkey space.Here’s how we run it:% hadoop jar hadoop-examples.jar SortByTemperatureUsingTotalOrderPartitioner \-D mapreduce.job.reduces=30 input/ncdc/all-seq output-totalsortThe program produces 30 output partitions, each of which is internally sorted; in ad‐dition, for these partitions, all the keys in partition i are less than the keys in partitioni + 1.Secondary SortThe MapReduce framework sorts the records by key before they reach the reducers.
Forany particular key, however, the values are not sorted. The order in which the valuesappear is not even stable from one run to the next, because they come from differentmap tasks, which may finish at different times from run to run. Generally speaking,most MapReduce programs are written so as not to depend on the order in which thevalues appear to the reduce function. However, it is possible to impose an order on thevalues by sorting and grouping the keys in a particular way.To illustrate the idea, consider the MapReduce program for calculating the maximumtemperature for each year. If we arranged for the values (temperatures) to be sorted indescending order, we wouldn’t have to iterate through them to find the maximum;instead, we could take the first for each year and ignore the rest.
(This approach isn’tthe most efficient way to solve this particular problem, but it illustrates how secondarysort works in general.)4. In some applications, it’s common for some of the input to already be sorted, or at least partially sorted. Forexample, the weather dataset is ordered by time, which may introduce certain biases, making the RandomSampler a safer choice.262|Chapter 9: MapReduce FeaturesTo achieve this, we change our keys to be composite: a combination of year andtemperature.
We want the sort order for keys to be by year (ascending) and then bytemperature (descending):190019001900...1901190135°C34°C34°C36°C35°CIf all we did was change the key, this wouldn’t help, because then records for the sameyear would have different keys and therefore would not (in general) go to the samereducer. For example, (1900, 35°C) and (1900, 34°C) could go to different reducers.
Bysetting a partitioner to partition by the year part of the key, we can guarantee that recordsfor the same year go to the same reducer. This still isn’t enough to achieve our goal,however. A partitioner ensures only that one reducer receives all the records for a year;it doesn’t change the fact that the reducer groups by key within the partition:The final piece of the puzzle is the setting to control the grouping. If we group valuesin the reducer by the year part of the key, we will see all the records for the same yearin one reduce group.
And because they are sorted by temperature in descending order,the first is the maximum temperature:To summarize, there is a recipe here to get the effect of sorting by value:• Make the key a composite of the natural key and the natural value.• The sort comparator should order by the composite key (i.e., the natural key andnatural value).• The partitioner and grouping comparator for the composite key should consideronly the natural key for partitioning and grouping.Sorting|263Java codePutting this all together results in the code in Example 9-6. This program uses the plaintext input again.Example 9-6.
Application to find the maximum temperature by sorting temperatures inthe keypublic class MaxTemperatureUsingSecondarySortextends Configured implements Tool {static class MaxTemperatureMapperextends Mapper<LongWritable, Text, IntPair, NullWritable> {private NcdcRecordParser parser = new NcdcRecordParser();@Overrideprotected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {parser.parse(value);if (parser.isValidTemperature()) {context.write(new IntPair(parser.getYearInt(),parser.getAirTemperature()), NullWritable.get());}}}static class MaxTemperatureReducerextends Reducer<IntPair, NullWritable, IntPair, NullWritable> {@Overrideprotected void reduce(IntPair key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}}public static class FirstPartitionerextends Partitioner<IntPair, NullWritable> {@Overridepublic int getPartition(IntPair key, NullWritable value, int numPartitions) {// multiply by 127 to perform some mixingreturn Math.abs(key.getFirst() * 127) % numPartitions;}}public static class KeyComparator extends WritableComparator {protected KeyComparator() {super(IntPair.class, true);}264|Chapter 9: MapReduce Features@Overridepublic int compare(WritableComparable w1, WritableComparable w2) {IntPair ip1 = (IntPair) w1;IntPair ip2 = (IntPair) w2;int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());if (cmp != 0) {return cmp;}return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse}}public static class GroupComparator extends WritableComparator {protected GroupComparator() {super(IntPair.class, true);}@Overridepublic int compare(WritableComparable w1, WritableComparable w2) {IntPair ip1 = (IntPair) w1;IntPair ip2 = (IntPair) w2;return IntPair.compare(ip1.getFirst(), ip2.getFirst());}}@Overridepublic int run(String[] args) throws Exception {Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);if (job == null) {return -1;}job.setMapperClass(MaxTemperatureMapper.class);job.setPartitionerClass(FirstPartitioner.class);job.setSortComparatorClass(KeyComparator.class);job.setGroupingComparatorClass(GroupComparator.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(IntPair.class);job.setOutputValueClass(NullWritable.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);System.exit(exitCode);}}In the mapper, we create a key representing the year and temperature, using an IntPairWritable implementation.
(IntPair is like the TextPair class we developed in “Im‐plementing a Custom Writable” on page 121.) We don’t need to carry any informationSorting|265in the value, because we can get the first (maximum) temperature in the reducer fromthe key, so we use a NullWritable. The reducer emits the first key, which, due to thesecondary sorting, is an IntPair for the year and its maximum temperature.