Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (Tom White - Hadoop The Definitive Guide_ 4 edition - 2015.pdf), страница 9
Описание файла
PDF-файл из архива "Tom White - Hadoop The Definitive Guide_ 4 edition - 2015.pdf", который расположен в категории "". Всё это находится в предмете "(смрхиод) современные методы распределенного хранения и обработки данных" из 10 семестр (2 семестр магистратуры), которые можно найти в файловом архиве МГУ им. Ломоносова. Не смотря на прямую связь этого архива с МГУ им. Ломоносова, его также можно найти и в других разделах. .
Просмотр PDF-файла онлайн
Текст 9 страницы из PDF
At the bottom of the diagram is a Unixpipeline, which mimics the whole MapReduce flow and which we will see again later inthis chapter when we look at Hadoop Streaming.Figure 2-1. MapReduce logical data flowJava MapReduceHaving run through how the MapReduce program works, the next step is to express itin code. We need three things: a map function, a reduce function, and some code to runthe job. The map function is represented by the Mapper class, which declares an abstractmap() method.
Example 2-3 shows the implementation of our map function.Example 2-3. Mapper for the maximum temperature exampleimport java.io.IOException;importimportimportimportorg.apache.hadoop.io.IntWritable;org.apache.hadoop.io.LongWritable;org.apache.hadoop.io.Text;org.apache.hadoop.mapreduce.Mapper;public class MaxTemperatureMapperextends Mapper<LongWritable, Text, Text, IntWritable> {private static final int MISSING = 9999;@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signsairTemperature = Integer.parseInt(line.substring(88, 92));} else {airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);24|Chapter 2: MapReduceif (airTemperature != MISSING && quality.matches("[01459]")) {context.write(new Text(year), new IntWritable(airTemperature));}}}The Mapper class is a generic type, with four formal type parameters that specify theinput key, input value, output key, and output value types of the map function.
For thepresent example, the input key is a long integer offset, the input value is a line of text,the output key is a year, and the output value is an air temperature (an integer). Ratherthan using built-in Java types, Hadoop provides its own set of basic types that are op‐timized for network serialization. These are found in the org.apache.hadoop.io pack‐age. Here we use LongWritable, which corresponds to a Java Long, Text (like JavaString), and IntWritable (like Java Integer).The map() method is passed a key and a value. We convert the Text value containingthe line of input into a Java String, then use its substring() method to extract thecolumns we are interested in.The map() method also provides an instance of Context to write the output to.
In thiscase, we write the year as a Text object (since we are just using it as a key), and thetemperature is wrapped in an IntWritable. We write an output record only if the tem‐perature is present and the quality code indicates the temperature reading is OK.The reduce function is similarly defined using a Reducer, as illustrated in Example 2-4.Example 2-4. Reducer for the maximum temperature exampleimport java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class MaxTemperatureReducerextends Reducer<Text, IntWritable, Text, IntWritable> {@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int maxValue = Integer.MIN_VALUE;for (IntWritable value : values) {maxValue = Math.max(maxValue, value.get());}context.write(key, new IntWritable(maxValue));}}Analyzing the Data with Hadoop|25Again, four formal type parameters are used to specify the input and output types, thistime for the reduce function.
The input types of the reduce function must match theoutput types of the map function: Text and IntWritable. And in this case, the outputtypes of the reduce function are Text and IntWritable, for a year and its maximumtemperature, which we find by iterating through the temperatures and comparing eachwith a record of the highest found so far.The third piece of code runs the MapReduce job (see Example 2-5).Example 2-5. Application to find the maximum temperature in the weather datasetimportimportimportimportimportimportorg.apache.hadoop.fs.Path;org.apache.hadoop.io.IntWritable;org.apache.hadoop.io.Text;org.apache.hadoop.mapreduce.Job;org.apache.hadoop.mapreduce.lib.input.FileInputFormat;org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MaxTemperature {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: MaxTemperature <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperature.class);job.setJobName("Max temperature");FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(MaxTemperatureMapper.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}}A Job object forms the specification of the job and gives you control over how the jobis run.
When we run this job on a Hadoop cluster, we will package the code into a JARfile (which Hadoop will distribute around the cluster). Rather than explicitly specifyingthe name of the JAR file, we can pass a class in the Job’s setJarByClass() method,which Hadoop will use to locate the relevant JAR file by looking for the JAR file con‐taining this class.26|Chapter 2: MapReduceHaving constructed a Job object, we specify the input and output paths. An input pathis specified by calling the static addInputPath() method on FileInputFormat, and itcan be a single file, a directory (in which case, the input forms all the files in that direc‐tory), or a file pattern.
As the name suggests, addInputPath() can be called more thanonce to use input from multiple paths.The output path (of which there is only one) is specified by the static setOutputPath() method on FileOutputFormat. It specifies a directory where the output filesfrom the reduce function are written. The directory shouldn’t exist before running thejob because Hadoop will complain and not run the job. This precaution is to preventdata loss (it can be very annoying to accidentally overwrite the output of a long job withthat of another).Next, we specify the map and reduce types to use via the setMapperClass() andsetReducerClass() methods.The setOutputKeyClass() and setOutputValueClass() methods control the outputtypes for the reduce function, and must match what the Reduce class produces.
The mapoutput types default to the same types, so they do not need to be set if the mapperproduces the same types as the reducer (as it does in our case). However, if they aredifferent, the map output types must be set using the setMapOutputKeyClass() andsetMapOutputValueClass() methods.The input types are controlled via the input format, which we have not explicitly setbecause we are using the default TextInputFormat.After setting the classes that define the map and reduce functions, we are ready to runthe job.
The waitForCompletion() method on Job submits the job and waits for it tofinish. The single argument to the method is a flag indicating whether verbose outputis generated. When true, the job writes information about its progress to the console.The return value of the waitForCompletion() method is a Boolean indicating success(true) or failure (false), which we translate into the program’s exit code of 0 or 1.The Java MapReduce API used in this section, and throughout thebook, is called the “new API”; it replaces the older, functionallyequivalent API. The differences between the two APIs are explainedin Appendix D, along with tips on how to convert between the twoAPIs. You can also find the old API equivalent of the maximum tem‐perature application there.A test runAfter writing a MapReduce job, it’s normal to try it out on a small dataset to flush outany immediate problems with the code. First, install Hadoop in standalone mode (thereare instructions for how to do this in Appendix A).
This is the mode in which HadoopAnalyzing the Data with Hadoop|27runs using the local filesystem with a local job runner. Then, install and compile theexamples using the instructions on the book’s website.Let’s test it on the five-line sample discussed earlier (the output has been slightly refor‐matted to fit the page, and some lines have been removed):% export HADOOP_CLASSPATH=hadoop-examples.jar% hadoop MaxTemperature input/ncdc/sample.txt output14/09/16 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadooplibrary for your platform... using builtin-java classes where applicable14/09/16 09:48:40 WARN mapreduce.JobSubmitter: Hadoop command-line optionparsing not performed.
Implement the Tool interface and execute your applicationwith ToolRunner to remedy this.14/09/16 09:48:40 INFO input.FileInputFormat: Total input paths to process : 114/09/16 09:48:40 INFO mapreduce.JobSubmitter: number of splits:114/09/16 09:48:40 INFO mapreduce.JobSubmitter: Submitting tokens for job:job_local26392882_000114/09/16 09:48:40 INFO mapreduce.Job: The url to track the job:http://localhost:8080/14/09/16 09:48:40 INFO mapreduce.Job: Running job: job_local26392882_000114/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter set in config null14/09/16 09:48:40 INFO mapred.LocalJobRunner: OutputCommitter isorg.apache.hadoop.mapreduce.lib.output.FileOutputCommitter14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for map tasks14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task:attempt_local26392882_0001_m_000000_014/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null14/09/16 09:48:40 INFO mapred.LocalJobRunner:14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_m_000000_0is done.
And is in the process of committing14/09/16 09:48:40 INFO mapred.LocalJobRunner: map14/09/16 09:48:40 INFO mapred.Task: Task 'attempt_local26392882_0001_m_000000_0'done.14/09/16 09:48:40 INFO mapred.LocalJobRunner: Finishing task:attempt_local26392882_0001_m_000000_014/09/16 09:48:40 INFO mapred.LocalJobRunner: map task executor complete.14/09/16 09:48:40 INFO mapred.LocalJobRunner: Waiting for reduce tasks14/09/16 09:48:40 INFO mapred.LocalJobRunner: Starting task:attempt_local26392882_0001_r_000000_014/09/16 09:48:40 INFO mapred.Task: Using ResourceCalculatorProcessTree : null14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1segments left of total size: 50 bytes14/09/16 09:48:40 INFO mapred.Merger: Merging 1 sorted segments14/09/16 09:48:40 INFO mapred.Merger: Down to the last merge-pass, with 1segments left of total size: 50 bytes14/09/16 09:48:40 INFO mapred.LocalJobRunner: 1 / 1 copied.14/09/16 09:48:40 INFO mapred.Task: Task:attempt_local26392882_0001_r_000000_0is done.