Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 26
Текст из файла (страница 26)
The map output is writtento disk and transferred across the network to the reducer nodes, so by using a fast108| Chapter 5: Hadoop I/Ocompressor such as LZO, LZ4, or Snappy, you can get performance gains simply becausethe volume of data to transfer is reduced. The configuration properties to enable com‐pression for map outputs and to set the compression format are shown in Table 5-6.Table 5-6. Map output compression propertiesProperty nameTypeDefault valuemapreduce.map.output.compressboolean falsemapreduce.map.output.compress.codecClassDescriptionWhether to compressmap outputsorg.apache.hadoop.io.compress.De The compression codecto use for map outputsfaultCodecHere are the lines to add to enable gzip map output compression in your job (using thenew API):Configuration conf = new Configuration();conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class,CompressionCodec.class);Job job = new Job(conf);In the old API (see Appendix D), there are convenience methods on the JobConf objectfor doing the same thing:conf.setCompressMapOutput(true);conf.setMapOutputCompressorClass(GzipCodec.class);SerializationSerialization is the process of turning structured objects into a byte stream for trans‐mission over a network or for writing to persistent storage.
Deserialization is the reverseprocess of turning a byte stream back into a series of structured objects.Serialization is used in two quite distinct areas of distributed data processing: forinterprocess communication and for persistent storage.In Hadoop, interprocess communication between nodes in the system is implementedusing remote procedure calls (RPCs). The RPC protocol uses serialization to render themessage into a binary stream to be sent to the remote node, which then deserializes thebinary stream into the original message.
In general, it is desirable that an RPC seriali‐zation format is:CompactA compact format makes the best use of network bandwidth, which is the mostscarce resource in a data center.Serialization|109FastInterprocess communication forms the backbone for a distributed system, so it isessential that there is as little performance overhead as possible for the serializationand deserialization process.ExtensibleProtocols change over time to meet new requirements, so it should bestraightforward to evolve the protocol in a controlled manner for clients andservers. For example, it should be possible to add a new argument to a method calland have the new servers accept messages in the old format (without the new ar‐gument) from old clients.InteroperableFor some systems, it is desirable to be able to support clients that are written indifferent languages to the server, so the format needs to be designed to make thispossible.On the face of it, the data format chosen for persistent storage would have differentrequirements from a serialization framework.
After all, the lifespan of an RPC is lessthan a second, whereas persistent data may be read years after it was written. But it turnsout, the four desirable properties of an RPC’s serialization format are also crucial for apersistent storage format. We want the storage format to be compact (to make efficientuse of storage space), fast (so the overhead in reading or writing terabytes of data isminimal), extensible (so we can transparently read data written in an older format), andinteroperable (so we can read or write persistent data using different languages).Hadoop uses its own serialization format, Writables, which is certainly compact andfast, but not so easy to extend or use from languages other than Java. Because Writablesare central to Hadoop (most MapReduce programs use them for their key and valuetypes), we look at them in some depth in the next three sections, before looking at someof the other serialization frameworks supported in Hadoop.
Avro (a serialization systemthat was designed to overcome some of the limitations of Writables) is covered inChapter 12.The Writable InterfaceThe Writable interface defines two methods—one for writing its state to a DataOutput binary stream and one for reading its state from a DataInput binary stream:package org.apache.hadoop.io;import java.io.DataOutput;import java.io.DataInput;import java.io.IOException;public interface Writable {void write(DataOutput out) throws IOException;110|Chapter 5: Hadoop I/Ovoid readFields(DataInput in) throws IOException;}Let’s look at a particular Writable to see what we can do with it.
We will useIntWritable, a wrapper for a Java int. We can create one and set its value using theset() method:IntWritable writable = new IntWritable();writable.set(163);Equivalently, we can use the constructor that takes the integer value:IntWritable writable = new IntWritable(163);To examine the serialized form of the IntWritable, we write a small helper methodthat wraps a java.io.ByteArrayOutputStream in a java.io.DataOutputStream (animplementation of java.io.DataOutput) to capture the bytes in the serialized stream:public static byte[] serialize(Writable writable) throws IOException {ByteArrayOutputStream out = new ByteArrayOutputStream();DataOutputStream dataOut = new DataOutputStream(out);writable.write(dataOut);dataOut.close();return out.toByteArray();}An integer is written using four bytes (as we see using JUnit 4 assertions):byte[] bytes = serialize(writable);assertThat(bytes.length, is(4));The bytes are written in big-endian order (so the most significant byte is written to thestream first, which is dictated by the java.io.DataOutput interface), and we can seetheir hexadecimal representation by using a method on Hadoop’s StringUtils:assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));Let’s try deserialization.
Again, we create a helper method to read a Writable objectfrom a byte array:public static byte[] deserialize(Writable writable, byte[] bytes)throws IOException {ByteArrayInputStream in = new ByteArrayInputStream(bytes);DataInputStream dataIn = new DataInputStream(in);writable.readFields(dataIn);dataIn.close();return bytes;}We construct a new, value-less IntWritable, and then call deserialize() to read fromthe output data that we just wrote. Then we check that its value, retrieved using theget() method, is the original value, 163:Serialization|111IntWritable newWritable = new IntWritable();deserialize(newWritable, bytes);assertThat(newWritable.get(), is(163));WritableComparable and comparatorsIntWritable implements the WritableComparable interface, which is just a subinter‐face of the Writable and java.lang.Comparable interfaces:package org.apache.hadoop.io;public interface WritableComparable<T> extends Writable, Comparable<T> {}Comparison of types is crucial for MapReduce, where there is a sorting phase duringwhich keys are compared with one another.
One optimization that Hadoop provides isthe RawComparator extension of Java’s Comparator:package org.apache.hadoop.io;import java.util.Comparator;public interface RawComparator<T> extends Comparator<T> {public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);}This interface permits implementors to compare records read from a stream withoutdeserializing them into objects, thereby avoiding any overhead of object creation.
Forexample, the comparator for IntWritables implements the raw compare() method byreading an integer from each of the byte arrays b1 and b2 and comparing them directlyfrom the given start positions (s1 and s2) and lengths (l1 and l2).WritableComparator is a general-purpose implementation of RawComparator forWritableComparable classes. It provides two main functions.
First, it provides a defaultimplementation of the raw compare() method that deserializes the objects to be com‐pared from the stream and invokes the object compare() method. Second, it acts as afactory for RawComparator instances (that Writable implementations have registered).For example, to obtain a comparator for IntWritable, we just use:RawComparator<IntWritable> comparator =WritableComparator.get(IntWritable.class);The comparator can be used to compare two IntWritable objects:IntWritable w1 = new IntWritable(163);IntWritable w2 = new IntWritable(67);assertThat(comparator.compare(w1, w2), greaterThan(0));or their serialized representations:112|Chapter 5: Hadoop I/Obyte[] b1 = serialize(w1);byte[] b2 = serialize(w2);assertThat(comparator.compare(b1, 0, b1.length, b2, 0, b2.length),greaterThan(0));Writable ClassesHadoop comes with a large selection of Writable classes, which are available in theorg.apache.hadoop.io package.
They form the class hierarchy shown in Figure 5-1.Writable wrappers for Java primitivesThere are Writable wrappers for all the Java primitive types (see Table 5-7) except char(which can be stored in an IntWritable). All have a get() and set() method for re‐trieving and storing the wrapped value.Table 5-7. Writable wrapper classes for Java primitivesJava primitive Writable implementation Serialized size (bytes)booleanBooleanWritable1byteByteWritable1shortShortWritable2intIntWritable4VIntWritable1–5floatFloatWritable4longLongWritable8VLongWritable1–9DoubleWritable8doubleWhen it comes to encoding integers, there is a choice between the fixed-length formats(IntWritable and LongWritable) and the variable-length formats (VIntWritable andVLongWritable).
The variable-length formats use only a single byte to encode the valueif it is small enough (between –112 and 127, inclusive); otherwise, they use the first byteto indicate whether the value is positive or negative, and how many bytes follow. Forexample, 163 requires two bytes:byte[] data = serialize(new VIntWritable(163));assertThat(StringUtils.byteToHexString(data), is("8fa3"));Serialization|113Figure 5-1. Writable class hierarchy114| Chapter 5: Hadoop I/OHow do you choose between a fixed-length and a variable-length encoding? Fixedlength encodings are good when the distribution of values is fairly uniform across thewhole value space, such as when using a (well-designed) hash function.
Most numericvariables tend to have nonuniform distributions, though, and on average, the variablelength encoding will save space. Another advantage of variable-length encodings is thatyou can switch from VIntWritable to VLongWritable, because their encodings are ac‐tually the same. So, by choosing a variable-length representation, you have room togrow without committing to an 8-byte long representation from the beginning.TextText is a Writable for UTF-8 sequences. It can be thought of as the Writable equivalentof java.lang.String.The Text class uses an int (with a variable-length encoding) to store the number ofbytes in the string encoding, so the maximum value is 2 GB.
Furthermore, Text usesstandard UTF-8, which makes it potentially easier to interoperate with other tools thatunderstand UTF-8.Indexing. Because of its emphasis on using standard UTF-8, there are some differencesbetween Text and the Java String class. Indexing for the Text class is in terms of positionin the encoded byte sequence, not the Unicode character in the string or the Java charcode unit (as it is for String). For ASCII strings, these three concepts of index positioncoincide. Here is an example to demonstrate the use of the charAt() method:Text t = new Text("hadoop");assertThat(t.getLength(), is(6));assertThat(t.getBytes().length, is(6));assertThat(t.charAt(2), is((int) 'd'));assertThat("Out of bounds", t.charAt(100), is(-1));Notice that charAt() returns an int representing a Unicode code point, unlike theString variant that returns a char. Text also has a find() method, which is analogousto String’s indexOf():Text t = new Text("hadoop");assertThat("Find a substring", t.find("do"), is(2));assertThat("Finds first 'o'", t.find("o"), is(3));assertThat("Finds 'o' from position 4 or later", t.find("o", 4), is(4));assertThat("No match", t.find("pig"), is(-1));Serialization|115Unicode.