Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 74
Текст из файла (страница 74)
Aliasesallow you to use different names in the schema used to read the Avro data than in theschema originally used to write the data. For example, the following reader’s schemacan be used to read StringPair data with the new field names first and second insteadof left and right (which are what it was written with):{"type": "record","name": "StringPair","doc": "A pair of strings with aliased field names.","fields": [{"name": "first", "type": "string", "aliases": ["left"]},{"name": "second", "type": "string", "aliases": ["right"]}]}Note that the aliases are used to translate (at read time) the writer’s schema into thereader’s, but the alias names are not available to the reader. In this example, the readercannot use the field names left and right, because they have already been translatedto first and second.Schema Resolution|357Sort OrderAvro defines a sort order for objects.
For most Avro types, the order is the natural oneyou would expect—for example, numeric types are ordered by ascending numeric value.Others are a little more subtle. For instance, enums are compared by the order in whichthe symbols are defined and not by the values of the symbol strings.All types except record have preordained rules for their sort order, as described in theAvro specification, that cannot be overridden by the user.
For records, however, you cancontrol the sort order by specifying the order attribute for a field. It takes one of threevalues: ascending (the default), descending (to reverse the order), or ignore (so thefield is skipped for comparison purposes).For example, the following schema (SortedStringPair.avsc) defines an ordering ofStringPair records by the right field in descending order. The left field is ignoredfor the purposes of ordering, but it is still present in the projection:{"type": "record","name": "StringPair","doc": "A pair of strings, sorted by right field descending.","fields": [{"name": "left", "type": "string", "order": "ignore"},{"name": "right", "type": "string", "order": "descending"}]}The record’s fields are compared pairwise in the document order of the reader’s schema.Thus, by specifying an appropriate reader’s schema, you can impose an arbitraryordering on data records.
This schema (SwitchedStringPair.avsc) defines a sort orderby the right field, then the left:{"type": "record","name": "StringPair","doc": "A pair of strings, sorted by right then left.","fields": [{"name": "right", "type": "string"},{"name": "left", "type": "string"}]}Avro implements efficient binary comparisons. That is to say, Avro does not have todeserialize binary data into objects to perform the comparison, because it can instead358|Chapter 12: Avrowork directly on the byte streams.5 In the case of the original StringPair schema (withno order attributes), for example, Avro implements the binary comparison as follows.The first field, left, is a UTF-8-encoded string, for which Avro can compare the byteslexicographically.
If they differ, the order is determined, and Avro can stop the com‐parison there. Otherwise, if the two byte sequences are the same, it compares the secondtwo (right) fields, again lexicographically at the byte level because the field is anotherUTF-8 string.Notice that this description of a comparison function has exactly the same logic as thebinary comparator we wrote for Writables in “Implementing a RawComparator forspeed” on page 123.
The great thing is that Avro provides the comparator for us, so wedon’t have to write and maintain this code. It’s also easy to change the sort order just bychanging the reader’s schema. For the SortedStringPair.avsc and SwitchedStringPair.avsc schemas, the comparison function Avro uses is essentially the same as the onejust described.
The differences are which fields are considered, the order in which theyare considered, and whether the sort order is ascending or descending.Later in the chapter, we’ll use Avro’s sorting logic in conjunction with MapReduce tosort Avro datafiles in parallel.Avro MapReduceAvro provides a number of classes for making it easy to run MapReduce programs onAvro data. We’ll use the new MapReduce API classes from the org.apache.avro.mapreduce package, but you can find (old-style) MapReduce classes in theorg.apache.avro.mapred package.Let’s rework the MapReduce program for finding the maximum temperature for eachyear in the weather dataset, this time using the Avro MapReduce API.
We will representweather records using the following schema:{"type": "record","name": "WeatherRecord","doc": "A weather reading.","fields": [{"name": "year", "type": "int"},{"name": "temperature", "type": "int"},{"name": "stationId", "type": "string"}]}5. A useful consequence of this property is that you can compute an Avro datum’s hash code from either theobject or the binary representation (the latter by using the static hashCode() method on BinaryData) andget the same result in both cases.Avro MapReduce|359The program in Example 12-2 reads text input (in the format we saw in earlier chapters)and writes Avro datafiles containing weather records as output.Example 12-2. MapReduce program to find the maximum temperature, creating Avrooutputpublic class AvroGenericMaxTemperature extends Configured implements Tool {private static final Schema SCHEMA = new Schema.Parser().parse("{" +" \"type\": \"record\"," +" \"name\": \"WeatherRecord\"," +" \"doc\": \"A weather reading.\"," +" \"fields\": [" +"{\"name\": \"year\", \"type\": \"int\"}," +"{\"name\": \"temperature\", \"type\": \"int\"}," +"{\"name\": \"stationId\", \"type\": \"string\"}" +" ]" +"}");public static class MaxTemperatureMapperextends Mapper<LongWritable, Text, AvroKey<Integer>,AvroValue<GenericRecord>> {private NcdcRecordParser parser = new NcdcRecordParser();private GenericRecord record = new GenericData.Record(SCHEMA);@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {parser.parse(value.toString());if (parser.isValidTemperature()) {record.put("year", parser.getYearInt());record.put("temperature", parser.getAirTemperature());record.put("stationId", parser.getStationId());context.write(new AvroKey<Integer>(parser.getYearInt()),new AvroValue<GenericRecord>(record));}}}public static class MaxTemperatureReducerextends Reducer<AvroKey<Integer>, AvroValue<GenericRecord>,AvroKey<GenericRecord>, NullWritable> {@Overrideprotected void reduce(AvroKey<Integer> key, Iterable<AvroValue<GenericRecord>>values, Context context) throws IOException, InterruptedException {GenericRecord max = null;for (AvroValue<GenericRecord> value : values) {GenericRecord record = value.datum();if (max == null ||360| Chapter 12: Avro(Integer) record.get("temperature") > (Integer) max.get("temperature")) {max = newWeatherRecord(record);}}context.write(new AvroKey(max), NullWritable.get());}private GenericRecord newWeatherRecord(GenericRecord value) {GenericRecord record = new GenericData.Record(SCHEMA);record.put("year", value.get("year"));record.put("temperature", value.get("temperature"));record.put("stationId", value.get("stationId"));return record;}}@Overridepublic int run(String[] args) throws Exception {if (args.length != 2) {System.err.printf("Usage: %s [generic options] <input> <output>\n",getClass().getSimpleName());ToolRunner.printGenericCommandUsage(System.err);return -1;}Job job = new Job(getConf(), "Max temperature");job.setJarByClass(getClass());job.getConfiguration().setBoolean(Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.INT));AvroJob.setMapOutputValueSchema(job, SCHEMA);AvroJob.setOutputKeySchema(job, SCHEMA);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(AvroKeyOutputFormat.class);job.setMapperClass(MaxTemperatureMapper.class);job.setReducerClass(MaxTemperatureReducer.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new AvroGenericMaxTemperature(), args);System.exit(exitCode);}}Avro MapReduce|361This program uses the Generic Avro mapping.
This frees us from generating code torepresent records, at the expense of type safety (field names are referred to by stringvalue, such as "temperature").6 The schema for weather records is inlined in the codefor convenience (and read into the SCHEMA constant), although in practice it might bemore maintainable to read the schema from a local file in the driver code and pass it tothe mapper and reducer via the Hadoop job configuration.
(Techniques for achievingthis are discussed in “Side Data Distribution” on page 273.)There are a couple of differences from the regular Hadoop MapReduce API. The firstis the use of wrappers around Avro Java types. For this MapReduce program, the key isthe year (an integer), and the value is the weather record, which is represented by Avro’sGenericRecord.
This translates to AvroKey<Integer> for the key type and AvroValue<GenericRecord> for the value type in the map output (and reduce input).The MaxTemperatureReducer iterates through the records for each key (year) and findsthe one with the maximum temperature. It is necessary to make a copy of the recordwith the highest temperature found so far, since the iterator reuses the instance forreasons of efficiency (and only the fields are updated).The second major difference from regular MapReduce is the use of AvroJob for con‐figuring the job.
AvroJob is a convenience class for specifying the Avro schemas for theinput, map output, and final output data. In this program, no input schema is set, be‐cause we are reading from a text file. The map output key schema is an Avro int andthe value schema is the weather record schema. The final output key schema is theweather record schema, and the output format is AvroKeyOutputFormat, which writeskeys to Avro datafiles and ignores the values (which are NullWritable).The following commands show how to run the program on a small sample dataset:% export HADOOP_CLASSPATH=avro-examples.jar% export HADOOP_USER_CLASSPATH_FIRST=true # override version of Avro in Hadoop% hadoop jar avro-examples.jar AvroGenericMaxTemperature \input/ncdc/sample.txt outputOn completion we can look at the output using the Avro tools JAR to render the Avrodatafile as JSON, one record per line:% java -jar $AVRO_HOME/avro-tools-*.jar tojson output/part-r-00000.avro{"year":1949,"temperature":111,"stationId":"012650-99999"}{"year":1950,"temperature":22,"stationId":"011990-99999"}In this example we read a text file and created an Avro datafile, but other combinationsare possible, which is useful for converting between Avro formats and other formats6.