Главная » Просмотр файлов » Tom White - Hadoop The Definitive Guide_ 4 edition - 2015

Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 57

Файл №811394 Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (Tom White - Hadoop The Definitive Guide_ 4 edition - 2015.pdf) 57 страницаTom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394) страница 572020-08-25СтудИзба
Просмтор этого файла доступен только зарегистрированным пользователям. Но у нас супер быстрая регистрация: достаточно только электронной почты!

Текст из файла (страница 57)

However, to performthe join, it is important to have the data from one source before that from the other.For the weather data join, the station record must be the first of the values seen foreach key, so the reducer can fill in the weather records with the station name andemit them straightaway. Of course, it would be possible to receive the records inany order if we buffered them in memory, but this should be avoided because the270|Chapter 9: MapReduce Featuresnumber of records in any group may be very large and exceed the amount of mem‐ory available to the reducer.We saw in “Secondary Sort” on page 262 how to impose an order on the values foreach key that the reducers see, so we use this technique here.To tag each record, we use TextPair (discussed in Chapter 5) for the keys (to store thestation ID) and the tag. The only requirement for the tag values is that they sort in sucha way that the station records come before the weather records.

This can be achievedby tagging station records as 0 and weather records as 1. The mapper classes to do thisare shown in Examples 9-9 and 9-10.Example 9-9. Mapper for tagging station records for a reduce-side joinpublic class JoinStationMapperextends Mapper<LongWritable, Text, TextPair, Text> {private NcdcStationMetadataParser parser = new NcdcStationMetadataParser();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {if (parser.parse(value)) {context.write(new TextPair(parser.getStationId(), "0"),new Text(parser.getStationName()));}}}Example 9-10. Mapper for tagging weather records for a reduce-side joinpublic class JoinRecordMapperextends Mapper<LongWritable, Text, TextPair, Text> {private NcdcRecordParser parser = new NcdcRecordParser();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {parser.parse(value);context.write(new TextPair(parser.getStationId(), "1"), value);}}The reducer knows that it will receive the station record first, so it extracts its namefrom the value and writes it out as a part of every output record (Example 9-11).Example 9-11.

Reducer for joining tagged station records with tagged weather recordspublic class JoinReducer extends Reducer<TextPair, Text, Text, Text> {@Overrideprotected void reduce(TextPair key, Iterable<Text> values, Context context)Joins|271throws IOException, InterruptedException {Iterator<Text> iter = values.iterator();Text stationName = new Text(iter.next());while (iter.hasNext()) {Text record = iter.next();Text outValue = new Text(stationName.toString() + "\t" + record.toString());context.write(key.getFirst(), outValue);}}}The code assumes that every station ID in the weather records has exactly one matchingrecord in the station dataset. If this were not the case, we would need to generalize thecode to put the tag into the value objects, by using another TextPair.

The reduce()method would then be able to tell which entries were station names and detect (andhandle) missing or duplicate entries before processing the weather records.Because objects in the reducer’s values iterator are reused (for effi‐ciency purposes), it is vital that the code makes a copy of the firstText object from the values iterator:Text stationName = new Text(iter.next());If the copy is not made, the stationName reference will refer to thevalue just read when it is turned into a string, which is a bug.Tying the job together is the driver class, shown in Example 9-12. The essential pointhere is that we partition and group on the first part of the key, the station ID, which wedo with a custom Partitioner (KeyPartitioner) and a custom group comparator,FirstComparator (from TextPair).Example 9-12. Application to join weather records with station namespublic class JoinRecordWithStationName extends Configured implements Tool {public static class KeyPartitioner extends Partitioner<TextPair, Text> {@Overridepublic int getPartition(TextPair key, Text value, int numPartitions) {return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;}}@Overridepublic int run(String[] args) throws Exception {if (args.length != 3) {JobBuilder.printUsage(this, "<ncdc input> <station input> <output>");return -1;}Job job = new Job(getConf(), "Join weather records with station names");272| Chapter 9: MapReduce Featuresjob.setJarByClass(getClass());Path ncdcInputPath = new Path(args[0]);Path stationInputPath = new Path(args[1]);Path outputPath = new Path(args[2]);MultipleInputs.addInputPath(job, ncdcInputPath,TextInputFormat.class, JoinRecordMapper.class);MultipleInputs.addInputPath(job, stationInputPath,TextInputFormat.class, JoinStationMapper.class);FileOutputFormat.setOutputPath(job, outputPath);job.setPartitionerClass(KeyPartitioner.class);job.setGroupingComparatorClass(TextPair.FirstComparator.class);job.setMapOutputKeyClass(TextPair.class);job.setReducerClass(JoinReducer.class);job.setOutputKeyClass(Text.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new JoinRecordWithStationName(), args);System.exit(exitCode);}}Running the program on the sample data yields the following output:011990-99999011990-99999011990-99999012650-99999012650-99999SIHCCAJAVRISIHCCAJAVRISIHCCAJAVRITYNSET-HANSMOENTYNSET-HANSMOEN0067011990999991950051507004...0043011990999991950051512004...0043011990999991950051518004...0043012650999991949032412004...0043012650999991949032418004...Side Data DistributionSide data can be defined as extra read-only data needed by a job to process the maindataset.

The challenge is to make side data available to all the map or reduce tasks (whichare spread across the cluster) in a convenient and efficient fashion.Using the Job ConfigurationYou can set arbitrary key-value pairs in the job configuration using the various settermethods on Configuration (or JobConf in the old MapReduce API).

This is very usefulwhen you need to pass a small piece of metadata to your tasks.Side Data Distribution|273In the task, you can retrieve the data from the configuration returned by Context’sgetConfiguration() method. (In the old API, it’s a little more involved: override theconfigure() method in the Mapper or Reducer and use a getter method on the JobConfobject passed in to retrieve the data.

It’s very common to store the data in an instancefield so it can be used in the map() or reduce() method.)Usually a primitive type is sufficient to encode your metadata, but for arbitrary objectsyou can either handle the serialization yourself (if you have an existing mechanism forturning objects to strings and back) or use Hadoop’s Stringifier class. TheDefaultStringifier uses Hadoop’s serialization framework to serialize objects (see“Serialization” on page 109).You shouldn’t use this mechanism for transferring more than a few kilobytes of data,because it can put pressure on the memory usage in MapReduce components.

The jobconfiguration is always read by the client, the application master, and the task JVM, andeach time the configuration is read, all of its entries are read into memory, even if theyare not used.Distributed CacheRather than serializing side data in the job configuration, it is preferable to distributedatasets using Hadoop’s distributed cache mechanism.

This provides a service for copy‐ing files and archives to the task nodes in time for the tasks to use them when they run.To save network bandwidth, files are normally copied to any particular node onceper job.UsageFor tools that use GenericOptionsParser (this includes many of the programs in thisbook; see “GenericOptionsParser, Tool, and ToolRunner” on page 148), you can specifythe files to be distributed as a comma-separated list of URIs as the argument to the-files option. Files can be on the local filesystem, on HDFS, or on another Hadoopreadable filesystem (such as S3).

If no scheme is supplied, then the files are assumed tobe local. (This is true even when the default filesystem is not the local filesystem.)You can also copy archive files (JAR files, ZIP files, tar files, and gzipped tar files) toyour tasks using the -archives option; these are unarchived on the task node. The-libjars option will add JAR files to the classpath of the mapper and reducer tasks.This is useful if you haven’t bundled library JAR files in your job JAR file.Let’s see how to use the distributed cache to share a metadata file for station names.

Thecommand we will run is:% hadoop jar hadoop-examples.jar \MaxTemperatureByStationNameUsingDistributedCacheFile \-files input/ncdc/metadata/stations-fixed-width.txt input/ncdc/all output274| Chapter 9: MapReduce FeaturesThis command will copy the local file stations-fixed-width.txt (no scheme is supplied,so the path is automatically interpreted as a local file) to the task nodes, so we can useit to look up station names.

The listing for MaxTemperatureByStationNameUsingDistributedCacheFile appears in Example 9-13.Example 9-13. Application to find the maximum temperature by station, showing sta‐tion names from a lookup table passed as a distributed cache filepublic class MaxTemperatureByStationNameUsingDistributedCacheFileextends Configured implements Tool {static class StationTemperatureMapperextends Mapper<LongWritable, Text, Text, IntWritable> {private NcdcRecordParser parser = new NcdcRecordParser();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {parser.parse(value);if (parser.isValidTemperature()) {context.write(new Text(parser.getStationId()),new IntWritable(parser.getAirTemperature()));}}}static class MaxTemperatureReducerWithStationLookupextends Reducer<Text, IntWritable, Text, IntWritable> {private NcdcStationMetadata metadata;@Overrideprotected void setup(Context context)throws IOException, InterruptedException {metadata = new NcdcStationMetadata();metadata.initialize(new File("stations-fixed-width.txt"));}@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {String stationName = metadata.getStationName(key.toString());int maxValue = Integer.MIN_VALUE;for (IntWritable value : values) {maxValue = Math.max(maxValue, value.get());}context.write(new Text(stationName), new IntWritable(maxValue));}Side Data Distribution|275}@Overridepublic int run(String[] args) throws Exception {Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);if (job == null) {return -1;}job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setMapperClass(StationTemperatureMapper.class);job.setCombinerClass(MaxTemperatureReducer.class);job.setReducerClass(MaxTemperatureReducerWithStationLookup.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new MaxTemperatureByStationNameUsingDistributedCacheFile(), args);System.exit(exitCode);}}The program finds the maximum temperature by weather station, so the mapper(StationTemperatureMapper) simply emits (station ID, temperature) pairs.

Характеристики

Список файлов книги

Свежие статьи
Популярно сейчас
Как Вы думаете, сколько людей до Вас делали точно такое же задание? 99% студентов выполняют точно такие же задания, как и их предшественники год назад. Найдите нужный учебный материал на СтудИзбе!
Ответы на популярные вопросы
Да! Наши авторы собирают и выкладывают те работы, которые сдаются в Вашем учебном заведении ежегодно и уже проверены преподавателями.
Да! У нас любой человек может выложить любую учебную работу и зарабатывать на её продажах! Но каждый учебный материал публикуется только после тщательной проверки администрацией.
Вернём деньги! А если быть более точными, то автору даётся немного времени на исправление, а если не исправит или выйдет время, то вернём деньги в полном объёме!
Да! На равне с готовыми студенческими работами у нас продаются услуги. Цены на услуги видны сразу, то есть Вам нужно только указать параметры и сразу можно оплачивать.
Отзывы студентов
Ставлю 10/10
Все нравится, очень удобный сайт, помогает в учебе. Кроме этого, можно заработать самому, выставляя готовые учебные материалы на продажу здесь. Рейтинги и отзывы на преподавателей очень помогают сориентироваться в начале нового семестра. Спасибо за такую функцию. Ставлю максимальную оценку.
Лучшая платформа для успешной сдачи сессии
Познакомился со СтудИзбой благодаря своему другу, очень нравится интерфейс, количество доступных файлов, цена, в общем, все прекрасно. Даже сам продаю какие-то свои работы.
Студизба ван лав ❤
Очень офигенный сайт для студентов. Много полезных учебных материалов. Пользуюсь студизбой с октября 2021 года. Серьёзных нареканий нет. Хотелось бы, что бы ввели подписочную модель и сделали материалы дешевле 300 рублей в рамках подписки бесплатными.
Отличный сайт
Лично меня всё устраивает - и покупка, и продажа; и цены, и возможность предпросмотра куска файла, и обилие бесплатных файлов (в подборках по авторам, читай, ВУЗам и факультетам). Есть определённые баги, но всё решаемо, да и администраторы реагируют в течение суток.
Маленький отзыв о большом помощнике!
Студизба спасает в те моменты, когда сроки горят, а работ накопилось достаточно. Довольно удобный сайт с простой навигацией и огромным количеством материалов.
Студ. Изба как крупнейший сборник работ для студентов
Тут дофига бывает всего полезного. Печально, что бывают предметы по которым даже одного бесплатного решения нет, но это скорее вопрос к студентам. В остальном всё здорово.
Спасательный островок
Если уже не успеваешь разобраться или застрял на каком-то задание поможет тебе быстро и недорого решить твою проблему.
Всё и так отлично
Всё очень удобно. Особенно круто, что есть система бонусов и можно выводить остатки денег. Очень много качественных бесплатных файлов.
Отзыв о системе "Студизба"
Отличная платформа для распространения работ, востребованных студентами. Хорошо налаженная и качественная работа сайта, огромная база заданий и аудитория.
Отличный помощник
Отличный сайт с кучей полезных файлов, позволяющий найти много методичек / учебников / отзывов о вузах и преподователях.
Отлично помогает студентам в любой момент для решения трудных и незамедлительных задач
Хотелось бы больше конкретной информации о преподавателях. А так в принципе хороший сайт, всегда им пользуюсь и ни разу не было желания прекратить. Хороший сайт для помощи студентам, удобный и приятный интерфейс. Из недостатков можно выделить только отсутствия небольшого количества файлов.
Спасибо за шикарный сайт
Великолепный сайт на котором студент за не большие деньги может найти помощь с дз, проектами курсовыми, лабораторными, а также узнать отзывы на преподавателей и бесплатно скачать пособия.
Популярные преподаватели
Добавляйте материалы
и зарабатывайте!
Продажи идут автоматически
6505
Авторов
на СтудИзбе
302
Средний доход
с одного платного файла
Обучение Подробнее