Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 49
Текст из файла (страница 49)
It defines two methods. First, the format is careful to specify that input filesshould never be split, by overriding isSplitable() to return false. Second, we228|Chapter 8: MapReduce Types and Formatsimplement createRecordReader() to return a custom implementation ofRecordReader, which appears in Example 8-3.Example 8-3. The RecordReader used by WholeFileInputFormat for reading a wholefile as a recordclass WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {privateprivateprivateprivateFileSplit fileSplit;Configuration conf;BytesWritable value = new BytesWritable();boolean processed = false;@Overridepublic void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {this.fileSplit = (FileSplit) split;this.conf = context.getConfiguration();}@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {if (!processed) {byte[] contents = new byte[(int) fileSplit.getLength()];Path file = fileSplit.getPath();FileSystem fs = file.getFileSystem(conf);FSDataInputStream in = null;try {in = fs.open(file);IOUtils.readFully(in, contents, 0, contents.length);value.set(contents, 0, contents.length);} finally {IOUtils.closeStream(in);}processed = true;return true;}return false;}@Overridepublic NullWritable getCurrentKey() throws IOException, InterruptedException {return NullWritable.get();}@Overridepublic BytesWritable getCurrentValue() throws IOException,InterruptedException {return value;}@OverrideInput Formats|229public float getProgress() throws IOException {return processed ? 1.0f : 0.0f;}@Overridepublic void close() throws IOException {// do nothing}}WholeFileRecordReader is responsible for taking a FileSplit and converting it into asingle record, with a null key and a value containing the bytes of the file.
Because thereis only a single record, WholeFileRecordReader has either processed it or not, so itmaintains a Boolean called processed. If the file has not been processed when thenextKeyValue() method is called, then we open the file, create a byte array whose lengthis the length of the file, and use the Hadoop IOUtils class to slurp the file into the bytearray. Then we set the array on the BytesWritable instance that was passed into thenext() method, and return true to signal that a record has been read.The other methods are straightforward bookkeeping methods for accessing the currentkey and value types and getting the progress of the reader, and a close() method, whichis invoked by the MapReduce framework when the reader is done.To demonstrate how WholeFileInputFormat can be used, consider a MapReduce jobfor packaging small files into sequence files, where the key is the original filename andthe value is the content of the file.
The listing is in Example 8-4.Example 8-4. A MapReduce program for packaging a collection of small files as a singleSequenceFilepublic class SmallFilesToSequenceFileConverter extends Configuredimplements Tool {static class SequenceFileMapperextends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {private Text filenameKey;@Overrideprotected void setup(Context context) throws IOException,InterruptedException {InputSplit split = context.getInputSplit();Path path = ((FileSplit) split).getPath();filenameKey = new Text(path.toString());}@Overrideprotected void map(NullWritable key, BytesWritable value, Context context)throws IOException, InterruptedException {context.write(filenameKey, value);230|Chapter 8: MapReduce Types and Formats}}@Overridepublic int run(String[] args) throws Exception {Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);if (job == null) {return -1;}job.setInputFormatClass(WholeFileInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);job.setMapperClass(SequenceFileMapper.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(), args);System.exit(exitCode);}}Because the input format is a WholeFileInputFormat, the mapper only has to find thefilename for the input file split.
It does this by casting the InputSplit from the contextto a FileSplit, which has a method to retrieve the file path. The path is stored in a Textobject for the key. The reducer is the identity (not explicitly set), and the output formatis a SequenceFileOutputFormat.Here’s a run on a few small files. We’ve chosen to use two reducers, so we get two outputsequence files:% hadoop jar hadoop-examples.jar SmallFilesToSequenceFileConverter \-conf conf/hadoop-localhost.xml -D mapreduce.job.reduces=2 \input/smallfiles outputTwo part files are created, each of which is a sequence file.
We can inspect these withthe -text option to the filesystem shell:% hadoop fs -conf conf/hadoop-localhost.xml -text output/part-r-00000hdfs://localhost/user/tom/input/smallfiles/a 61 61 61 61 61 61 61 61 61hdfs://localhost/user/tom/input/smallfiles/c 63 63 63 63 63 63 63 63 63hdfs://localhost/user/tom/input/smallfiles/e% hadoop fs -conf conf/hadoop-localhost.xml -text output/part-r-00001hdfs://localhost/user/tom/input/smallfiles/b 62 62 62 62 62 62 62 62 62hdfs://localhost/user/tom/input/smallfiles/d 64 64 64 64 64 64 64 64 64hdfs://localhost/user/tom/input/smallfiles/f 66 66 66 66 66 66 66 66 666163626466Input Formats|231The input files were named a, b, c, d, e, and f, and each contained 10 characters of thecorresponding letter (so, for example, a contained 10 “a” characters), except e, whichwas empty.
We can see this in the textual rendering of the sequence files, which printsthe filename followed by the hex representation of the file.There’s at least one way we could improve this program. As men‐tioned earlier, having one mapper per file is inefficient, so subclass‐ing CombineFileInputFormat instead of FileInputFormat would bea better approach.Text InputHadoop excels at processing unstructured text. In this section, we discuss the differentInputFormats that Hadoop provides to process text.TextInputFormatTextInputFormat is the default InputFormat.
Each record is a line of input. The key, aLongWritable, is the byte offset within the file of the beginning of the line. The value isthe contents of the line, excluding any line terminators (e.g., newline or carriage return),and is packaged as a Text object. So, a file containing the following text:On the top of the Crumpetty TreeThe Quangle Wangle sat,But his face you could not see,On account of his Beaver Hat.is divided into one split of four records. The records are interpreted as the followingkey-value pairs:(0, On the top of the Crumpetty Tree)(33, The Quangle Wangle sat,)(57, But his face you could not see,)(89, On account of his Beaver Hat.)Clearly, the keys are not line numbers.
This would be impossible to implement in general,in that a file is broken into splits at byte, not line, boundaries. Splits are processedindependently. Line numbers are really a sequential notion. You have to keep a countof lines as you consume them, so knowing the line number within a split would bepossible, but not within the file.However, the offset within the file of each line is known by each split independently ofthe other splits, since each split knows the size of the preceding splits and just adds thisonto the offsets within the split to produce a global file offset. The offset is usuallysufficient for applications that need a unique identifier for each line.
Combined withthe file’s name, it is unique within the filesystem. Of course, if all the lines are a fixedwidth, calculating the line number is simply a matter of dividing the offset by the width.232|Chapter 8: MapReduce Types and FormatsThe Relationship Between Input Splits and HDFS BlocksThe logical records that FileInputFormats define usually do not fit neatly into HDFSblocks.
For example, a TextInputFormat’s logical records are lines, which will crossHDFS boundaries more often than not. This has no bearing on the functioning of yourprogram—lines are not missed or broken, for example—but it’s worth knowing aboutbecause it does mean that data-local maps (that is, maps that are running on the samehost as their input data) will perform some remote reads.
The slight overhead this causesis not normally significant.Figure 8-3 shows an example. A single file is broken into lines, and the line boundariesdo not correspond with the HDFS block boundaries. Splits honor logical record bound‐aries (in this case, lines), so we see that the first split contains line 5, even though it spansthe first and second block. The second split starts at line 6.Figure 8-3. Logical records and HDFS blocks for TextInputFormatControlling the maximum line length.
If you are using one of the text input formats dis‐cussed here, you can set a maximum expected line length to safeguard against corruptedfiles. Corruption in a file can manifest itself as a very long line, which can cause out-ofmemory errors and then task failure. By setting mapreduce.input.linerecordreader.line.maxlength to a value in bytes that fits in memory (and is comfortably greaterthan the length of lines in your input data), you ensure that the record reader will skipthe (long) corrupt lines without the task failing.KeyValueTextInputFormatTextInputFormat’s keys, being simply the offsets within the file, are not normally veryuseful. It is common for each line in a file to be a key-value pair, separated by a delimitersuch as a tab character.
For example, this is the kind of output produced by TextOutputFormat, Hadoop’s default OutputFormat. To interpret such files correctly, KeyValueTextInputFormat is appropriate.You can specify the separator via the mapreduce.input.keyvaluelinerecordreader.key.value.separator property.