Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 25
Текст из файла (страница 25)
Each codec knows itsdefault filename extension, thus permitting CompressionCodecFactory to searchthrough the registered codecs to find a match for the given extension (if any).Compression|103Table 5-3. Compression codec propertiesProperty nameTypeDefaultvalueio.compression.codecs Comma-separated ClassnamesDescriptionA list of additional CompressionCodecclasses for compression/decompressionNative librariesFor performance, it is preferable to use a native library for compression anddecompression.
For example, in one test, using the native gzip libraries reduced de‐compression times by up to 50% and compression times by around 10% (compared tothe built-in Java implementation). Table 5-4 shows the availability of Java and nativeimplementations for each compression format. All formats have native implementa‐tions, but not all have a Java implementation (LZO, for example).Table 5-4. Compression library implementationsCompression format Java implementation? Native implementation?DEFLATEYesYesgzipYesYesbzip2YesYesLZONoYesLZ4NoYesSnappyNoYesThe Apache Hadoop binary tarball comes with prebuilt native compression binaries for64-bit Linux, called libhadoop.so. For other platforms, you will need to compile thelibraries yourself, following the BUILDING.txt instructions at the top level of the sourcetree.The native libraries are picked up using the Java system property java.library.path.The hadoop script in the etc/hadoop directory sets this property for you, but if you don’tuse this script, you will need to set the property in your application.By default, Hadoop looks for native libraries for the platform it is running on, and loadsthem automatically if they are found.
This means you don’t have to change any config‐uration settings to use the native libraries. In some circumstances, however, you maywish to disable use of native libraries, such as when you are debugging a compressionrelated problem. You can do this by setting the property io.native.lib.available tofalse, which ensures that the built-in Java equivalents will be used (if they are available).CodecPool. If you are using a native library and you are doing a lot of compression ordecompression in your application, consider using CodecPool, which allows you to104|Chapter 5: Hadoop I/Oreuse compressors and decompressors, thereby amortizing the cost of creating theseobjects.The code in Example 5-3 shows the API, although in this program, which creates onlya single Compressor, there is really no need to use a pool.Example 5-3.
A program to compress data read from standard input and write it tostandard output using a pooled compressorpublic class PooledStreamCompressor {public static void main(String[] args) throws Exception {String codecClassname = args[0];Class<?> codecClass = Class.forName(codecClassname);Configuration conf = new Configuration();CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);Compressor compressor = null;try {compressor = CodecPool.getCompressor(codec);CompressionOutputStream out =codec.createOutputStream(System.out, compressor);IOUtils.copyBytes(System.in, out, 4096, false);out.finish();} finally {CodecPool.returnCompressor(compressor);}}}We retrieve a Compressor instance from the pool for a given CompressionCodec, whichwe use in the codec’s overloaded createOutputStream() method.
By using a finallyblock, we ensure that the compressor is returned to the pool even if there is anIOException while copying the bytes between the streams.Compression and Input SplitsWhen considering how to compress data that will be processed by MapReduce, it isimportant to understand whether the compression format supports splitting. Consideran uncompressed file stored in HDFS whose size is 1 GB.
With an HDFS block size of128 MB, the file will be stored as eight blocks, and a MapReduce job using this file asinput will create eight input splits, each processed independently as input to a separatemap task.Imagine now that the file is a gzip-compressed file whose compressed size is 1 GB. Asbefore, HDFS will store the file as eight blocks. However, creating a split for each blockwon’t work, because it is impossible to start reading at an arbitrary point in the gzipstream and therefore impossible for a map task to read its split independently of theCompression|105others. The gzip format uses DEFLATE to store the compressed data, and DEFLATEstores data as a series of compressed blocks.
The problem is that the start of each blockis not distinguished in any way that would allow a reader positioned at an arbitrarypoint in the stream to advance to the beginning of the next block, thereby synchronizingitself with the stream. For this reason, gzip does not support splitting.In this case, MapReduce will do the right thing and not try to split the gzipped file, sinceit knows that the input is gzip-compressed (by looking at the filename extension) andthat gzip does not support splitting.
This will work, but at the expense of locality: a singlemap will process the eight HDFS blocks, most of which will not be local to the map.Also, with fewer maps, the job is less granular and so may take longer to run.If the file in our hypothetical example were an LZO file, we would have the same problembecause the underlying compression format does not provide a way for a reader tosynchronize itself with the stream. However, it is possible to preprocess LZO files usingan indexer tool that comes with the Hadoop LZO libraries, which you can obtain fromthe Google and GitHub sites listed in “Codecs” on page 101.
The tool builds an indexof split points, effectively making them splittable when the appropriate MapReduceinput format is used.A bzip2 file, on the other hand, does provide a synchronization marker between blocks(a 48-bit approximation of pi), so it does support splitting. (Table 5-1 lists whether eachcompression format supports splitting.)Which Compression Format Should I Use?Hadoop applications process large datasets, so you should strive to take advantage ofcompression. Which compression format you use depends on such considerations asfile size, format, and the tools you are using for processing. Here are some suggestions,arranged roughly in order of most to least effective:• Use a container file format such as sequence files (see the section on page 127), Avrodatafiles (see the section on page 352), ORCFiles (see the section on page 136),or Parquet files (see the section on page 370), all of which support both compressionand splitting.
A fast compressor such as LZO, LZ4, or Snappy is generally a goodchoice.• Use a compression format that supports splitting, such as bzip2 (although bzip2 isfairly slow), or one that can be indexed to support splitting, such as LZO.• Split the file into chunks in the application, and compress each chunk separatelyusing any supported compression format (it doesn’t matter whether it is splittable).In this case, you should choose the chunk size so that the compressed chunks areapproximately the size of an HDFS block.106| Chapter 5: Hadoop I/O• Store the files uncompressed.For large files, you should not use a compression format that does not support splittingon the whole file, because you lose locality and make MapReduce applications veryinefficient.Using Compression in MapReduceAs described in “Inferring CompressionCodecs using CompressionCodecFactory” onpage 102, if your input files are compressed, they will be decompressed automaticallyas they are read by MapReduce, using the filename extension to determine which codecto use.In order to compress the output of a MapReduce job, in the job configuration, set themapreduce.output.fileoutputformat.compress property to true and set the mapreduce.output.fileoutputformat.compress.codec property to the classname of thecompression codec you want to use.
Alternatively, you can use the static conveniencemethods on FileOutputFormat to set these properties, as shown in Example 5-4.Example 5-4. Application to run the maximum temperature job producing compressedoutputpublic class MaxTemperatureWithCompression {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: MaxTemperatureWithCompression <input path> " +"<output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperature.class);FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileOutputFormat.setCompressOutput(job, true);FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);job.setMapperClass(MaxTemperatureMapper.class);job.setCombinerClass(MaxTemperatureReducer.class);job.setReducerClass(MaxTemperatureReducer.class);System.exit(job.waitForCompletion(true) ? 0 : 1);Compression|107}}We run the program over compressed input (which doesn’t have to use the same com‐pression format as the output, although it does in this example) as follows:% hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz outputEach part of the final output is compressed; in this case, there is a single part:% gunzip -c output/part-r-00000.gz1949111195022If you are emitting sequence files for your output, you can set the mapreduce.output.fileoutputformat.compress.type property to control the type of compressionto use.
The default is RECORD, which compresses individual records. Changing this toBLOCK, which compresses groups of records, is recommended because it compressesbetter (see “The SequenceFile format” on page 133).There is also a static convenience method on SequenceFileOutputFormat calledsetOutputCompressionType() to set this property.The configuration properties to set compression for MapReduce job outputs are sum‐marized in Table 5-5. If your MapReduce driver uses the Tool interface (described in“GenericOptionsParser, Tool, and ToolRunner” on page 148), you can pass any of theseproperties to the program on the command line, which may be more convenient thanmodifying your program to hardcode the compression properties.Table 5-5. MapReduce compression propertiesProperty nameTypeDefault valueDescriptionmapreduce.output.fileoutputformat.compressboolean falseWhether tocompress outputsmapreduce.output.fileoutputformat.compress.codecClassnameorg.apache.hadoop.io.compress.DefaultCodecThe compressioncodec to use foroutputsmapreduce.output.fileoutputformat.compress.typeStringRECORDThe type ofcompression to usefor sequence fileoutputs: NONE,RECORD, orBLOCKCompressing map outputEven if your MapReduce application reads and writes uncompressed data, it may benefitfrom compressing the intermediate output of the map phase.