Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 28
Текст из файла (страница 28)
For lists of a single type of Writable, ArrayWritable is adequate, but tostore different types of Writable in a single list, you can use GenericWritable to wrap120|Chapter 5: Hadoop I/Othe elements in an ArrayWritable. Alternatively, you could write a general ListWritable using the ideas from MapWritable.Implementing a Custom WritableHadoop comes with a useful set of Writable implementations that serve most purposes;however, on occasion, you may need to write your own custom implementation.
Witha custom Writable, you have full control over the binary representation and the sortorder. Because Writables are at the heart of the MapReduce data path, tuning the binaryrepresentation can have a significant effect on performance. The stock Writableimplementations that come with Hadoop are well tuned, but for more elaborate struc‐tures, it is often better to create a new Writable type rather than composing the stocktypes.If you are considering writing a custom Writable, it may be worthtrying another serialization framework, like Avro, that allows you todefine custom types declaratively.
See “Serialization Frameworks” onpage 126 and Chapter 12.To demonstrate how to create a custom Writable, we shall write an implementationthat represents a pair of strings, called TextPair. The basic implementation is shownin Example 5-7.Example 5-7.
A Writable implementation that stores a pair of Text objectsimport java.io.*;import org.apache.hadoop.io.*;public class TextPair implements WritableComparable<TextPair> {private Text first;private Text second;public TextPair() {set(new Text(), new Text());}public TextPair(String first, String second) {set(new Text(first), new Text(second));}public TextPair(Text first, Text second) {set(first, second);}public void set(Text first, Text second) {Serialization|121this.first = first;this.second = second;}public Text getFirst() {return first;}public Text getSecond() {return second;}@Overridepublic void write(DataOutput out) throws IOException {first.write(out);second.write(out);}@Overridepublic void readFields(DataInput in) throws IOException {first.readFields(in);second.readFields(in);}@Overridepublic int hashCode() {return first.hashCode() * 163 + second.hashCode();}@Overridepublic boolean equals(Object o) {if (o instanceof TextPair) {TextPair tp = (TextPair) o;return first.equals(tp.first) && second.equals(tp.second);}return false;}@Overridepublic String toString() {return first + "\t" + second;}@Overridepublic int compareTo(TextPair tp) {int cmp = first.compareTo(tp.first);if (cmp != 0) {return cmp;}return second.compareTo(tp.second);}}122|Chapter 5: Hadoop I/OThe first part of the implementation is straightforward: there are two Text instancevariables, first and second, and associated constructors, getters, and setters.
AllWritable implementations must have a default constructor so that the MapReduceframework can instantiate them, then populate their fields by calling readFields().Writable instances are mutable and often reused, so you should take care to avoidallocating objects in the write() or readFields() methods.TextPair’s write() method serializes each Text object in turn to the output stream bydelegating to the Text objects themselves. Similarly, readFields() deserializes the bytesfrom the input stream by delegating to each Text object. The DataOutput and DataInputinterfaces have a rich set of methods for serializing and deserializing Java primitives, so,in general, you have complete control over the wire format of your Writable object.Just as you would for any value object you write in Java, you should override thehashCode(), equals(), and toString() methods from java.lang.Object.
The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)to choose a reduce partition, so you should make sure that you write a good hash func‐tion that mixes well to ensure reduce partitions are of a similar size.If you plan to use your custom Writable with TextOutputFormat,you must implement its toString() method. TextOutputFormatcalls toString() on keys and values for their output representa‐tion. For TextPair, we write the underlying Text objects as stringsseparated by a tab character.TextPair is an implementation of WritableComparable, so it provides an implemen‐tation of the compareTo() method that imposes the ordering you would expect: it sortsby the first string followed by the second. Notice that, apart from the number of Textobjects it can store, TextPair differs from TextArrayWritable (which we discussed inthe previous section), since TextArrayWritable is only a Writable, not a WritableComparable.Implementing a RawComparator for speedThe code for TextPair in Example 5-7 will work as it stands; however, there is a furtheroptimization we can make.
As explained in “WritableComparable and comparators” onpage 112, when TextPair is being used as a key in MapReduce, it will have to be dese‐rialized into an object for the compareTo() method to be invoked. What if it were pos‐sible to compare two TextPair objects just by looking at their serializedrepresentations?It turns out that we can do this because TextPair is the concatenation of two Textobjects, and the binary representation of a Text object is a variable-length integer con‐taining the number of bytes in the UTF-8 representation of the string, followed by theSerialization|123UTF-8 bytes themselves.
The trick is to read the initial length so we know how long thefirst Text object’s byte representation is; then we can delegate to Text’s RawComparator and invoke it with the appropriate offsets for the first or second string. Example 5-8gives the details (note that this code is nested in the TextPair class).Example 5-8. A RawComparator for comparing TextPair byte representationspublic static class Comparator extends WritableComparator {private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();public Comparator() {super(TextPair.class);}@Overridepublic int compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2) {try {int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);int cmp = TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);if (cmp != 0) {return cmp;}return TEXT_COMPARATOR.compare(b1, s1 + firstL1, l1 - firstL1,b2, s2 + firstL2, l2 - firstL2);} catch (IOException e) {throw new IllegalArgumentException(e);}}}static {WritableComparator.define(TextPair.class, new Comparator());}We actually subclass WritableComparator rather than implementing RawComparatordirectly, since it provides some convenience methods and default implementations.
Thesubtle part of this code is calculating firstL1 and firstL2, the lengths of the first Textfield in each byte stream. Each is made up of the length of the variable-length integer(returned by decodeVIntSize() on WritableUtils) and the value it is encoding (re‐turned by readVInt()).The static block registers the raw comparator so that whenever MapReduce sees theTextPair class, it knows to use the raw comparator as its default comparator.124|Chapter 5: Hadoop I/OCustom comparatorsAs you can see with TextPair, writing raw comparators takes some care because youhave to deal with details at the byte level. It is worth looking at some of the implemen‐tations of Writable in the org.apache.hadoop.io package for further ideas if you needto write your own.
The utility methods on WritableUtils are very handy, too.Custom comparators should also be written to be RawComparators, if possible. Theseare comparators that implement a different sort order from the natural sort order de‐fined by the default comparator. Example 5-9 shows a comparator for TextPair, calledFirstComparator, that considers only the first string of the pair. Note that we overridethe compare() method that takes objects so both compare() methods have the samesemantics.We will make use of this comparator in Chapter 9, when we look at joins and secondarysorting in MapReduce (see “Joins” on page 268).Example 5-9.
A custom RawComparator for comparing the first field of TextPair byterepresentationspublic static class FirstComparator extends WritableComparator {private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();public FirstComparator() {super(TextPair.class);}@Overridepublic int compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2) {try {int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);return TEXT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);} catch (IOException e) {throw new IllegalArgumentException(e);}}@Overridepublic int compare(WritableComparable a, WritableComparable b) {if (a instanceof TextPair && b instanceof TextPair) {return ((TextPair) a).first.compareTo(((TextPair) b).first);}return super.compare(a, b);}}Serialization|125Serialization FrameworksAlthough most MapReduce programs use Writable key and value types, this isn’t man‐dated by the MapReduce API.
In fact, any type can be used; the only requirement is amechanism that translates to and from a binary representation of each type.To support this, Hadoop has an API for pluggable serialization frameworks. A seriali‐zation framework is represented by an implementation of Serialization (in theorg.apache.hadoop.io.serializer package). WritableSerialization, for example,is the implementation of Serialization for Writable types.A Serialization defines a mapping from types to Serializer instances (for turningan object into a byte stream) and Deserializer instances (for turning a byte streaminto an object).Set the io.serializations property to a comma-separated list of classnames in orderto register Serialization implementations. Its default value includes org.apache.hadoop.io.serializer.WritableSerialization and the Avro Specific and Reflect se‐rializations (see “Avro Data Types and Schemas” on page 346), which means that onlyWritable or Avro objects can be serialized or deserialized out of the box.Hadoop includes a class called JavaSerialization that uses Java Object Serialization.Although it makes it convenient to be able to use standard Java types such as Integeror String in MapReduce programs, Java Object Serialization is not as efficient as Writ‐ables, so it’s not worth making this trade-off (see the following sidebar).Why Not Use Java Object Serialization?Java comes with its own serialization mechanism, called Java Object Serialization (oftenreferred to simply as “Java Serialization”), that is tightly integrated with the language, soit’s natural to ask why this wasn’t used in Hadoop.