Лекция 8. Hadoop Java API продолжение. Управление и планирование вычислений (2015 Лекции)
Описание файла
Файл "Лекция 8. Hadoop Java API продолжение. Управление и планирование вычислений" внутри архива находится в папке "2015 Лекции". PDF-файл из архива "2015 Лекции", который расположен в категории "". Всё это находится в предмете "(смрхиод) современные методы распределенного хранения и обработки данных" из 10 семестр (2 семестр магистратуры), которые можно найти в файловом архиве МГУ им. Ломоносова. Не смотря на прямую связь этого архива с МГУ им. Ломоносова, его также можно найти и в других разделах. .
Просмотр PDF-файла онлайн
Текст из PDF
Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №4. Hadoop Java API: продолжение.Управление и планирование вычислений.http://bigdata.cs.msu.rubigdata@cs.msu.ruHadoop Java API: продолжениеSequenceFile●●●●sync маркер - между записямиформат без/c сжатиемсжатие записей/блоковзаписейSequenceFileInputFormat<K,V>Вероятность коллизии прииспользовании 16байтового MD5 маркера в 1ПБ файле - 10-23Сжатие данныхЗачем?● Оптимизирует затраты на хранение● Сокращает время передачи по сети, с/на дискГде используется?● Входные данные могут поступать в сжатом виде (архивы)● Сжатие выходных данных Mapper’ов● Архивирование выходных данных программыОбработка сжатых входных данных●●Формат сжатия определяется по расширению файлаОсновной вопрос: поддержка разбиенияФормат сжатияПрограммаDEFLATEАлгоритмРасширениеПоддержкаразбиенияDEFLATE.dflatedНетgzipgzipDEFLATE.gzНетbzip2bzip2bzip2.bz2ДаLZOLZOpLZO.LZOНет (только припредварительноминдексировании)LZ4-LZ4.lz4НетSnappy-Snappy.snappyНет48-битный маркер между блоками bzip2 0x314159265359Использование сжатия в Hadoop MapReduceИмя свойстваТипсвойстваЗначение по умолчаниюОписаниеmapred.output.compressbooleanfalseСжатие выходных данныхmapred.output.compression.codecClassnameorg.apache.hadoop.io.compress.DefaultCodecКодек для сжатия выходных данныхmapred.output.compression.typeStringRECORDТип сжатия для выходныхSequenceFilemapred.compress.map.outputbooleanfalseСжатие выходных данных Mappermapred.map.output.compression.codecClassorg.apache.hadoop.io.compress.DefaultCodecКодек для сжатия выходных данныхMapperОбработка ошибок и сбоевСвойствоПо умолчаниюОписаниеmapreduce.task.timeout600000Число миллсек., в течениекоторых ожидается активностьот задачи (чтение, запись,изменение счетчиков и т.п.)mapreduce.map.maxattempts4Сколько раз Mapper м.б.перезапущена из-за сбоя допризнания всей программысбойной.mapreduce.reduce.maxattempts4Сколько раз Reducer м.б.перезапущен из-за сбоя допризнания всей программысбойной.СчетчикиСчетчикОписаниеMAP_INPUT_RECORDSЧисло вызовов mapMAP_INPUT_BYTESСуммарный (в несжатом виде) объем входных данных всех вызовов map.MAP_OUTPUT_RECORDSЧисло выходных пар, созданных всеми map.MAP_OUTPUT_BYTESСуммарный (в несжатом виде) объем выходных данных всех вызовов map.MAP_OUTPUT_MATERIALIZED_BYTESОбъем выходных данных отображений, записанных на диск (сжатие влияет!)COMBINE_INPUT_RECORDS! Сумма длин всех списков значений во всех вызовах.COMBINE_OUTPUT_RECORDSСуммарное число созданных выходных пар.REDUCE_INPUT_GROUPSЧисло вызовов reduceREDUCE_INPUT_RECORDSСумма длин всех списков значений во всех вызовах.GC_TIME_MILLСуммарное время, затраченное на GC (в миллисек.)...Пользовательские счетчикиenum WordType {BAD,GOOD}if(isBad(word))context.getCounter(“WordType.BAD”).increment(1);…// Динамический счетчикcontext.getCounter(“SpacesCount”).increment(1);СортировкаКак обеспечить полную сортировку результатов?Написать CustomPartitioner, формирующий разделы на основе первой(ых)букв(ы) слов.RandomSampler, TotalOrderPartitioner и DistributedCacheРешение - сэмплинг ключей.public class InputSampler.RandomSampler {public InputSampler.RandomSampler(double freq, int numSamples, int maxSplitsSampled)public K[] getSample(InputFormat<K,V> inf, Job job) throws IOException, InterruptedException}// фрагмент run()job.setPartitionerClass(TotalOrderPartitioner.class);InputSampler.Sampler<Text, IntWritable> sampler = new InputSampler.RandomSampler<Text,IntWritable>(0.1,1000, 2);InputSampler.writePartitionFile(job, sampler);Configuration conf = job.getConfiguration();String partitionFile=TotalOrderPartitioner.getPartitionFile(conf);URI partitionUri = new URI(partitionFile);job.addCacheFile(partitionUri);return job.waitForCompletion(true) ? 0: 1;Secondary Sort#SYMBOL,SYSTEM,MOMENT,ID_DEAL,PRICE_DEAL,VOLUME,OPEN_POS,DIRECTIONSVH1,F,20110111100000080,255223067,30.46000,1,8714,SSVH1,F,20110111100000080,255223068,30.38000,1,8714,SSVH1,F,20110111100000080,255223069,30.32000,1,8714,SSVH1,F,20110111100000080,255223070,30.28000,2,8714,SSVH1,F,20110111100000080,255223071,30.25000,1,8714,SSVH1,F,20110111100000080,255223072,30.05000,1,8714,SSVH1,F,20110111100000080,255223073,30.05000,3,8714,SRIH1,F,20110111100000097,255223074,177885.00000,1,291758,BRIH1,F,20110111100000097,255223075,177935.00000,2,291758,BRIH1,F,20110111100000097,255223076,177980.00000,10,291758,BRIH1,F,20110111100000097,255223077,177995.00000,1,291758,BRIH1,F,20110111100000097,255223078,178100.00000,2,291758,BRIH1,F,20110111100000097,255223079,178200.00000,1,291758,BRIH1,F,20110111100000097,255223080,178205.00000,1,291758,BПо каждому инструменту получить min цену за каждый день.SecondarySortКлюч: дата+инструмент+ценаЗначение: NullWritablejob.setPartitionerClass(MyPartitioner.class); // hash(дата+инструмент)%Rjob.setSortComparator(MySortComparator.class); // по дата,инструмент,ценаjob.setGroupingComparator(MyGroupComparator); // по дата,инструмент… protected void reduce(MyClass key, Iterable<NullWritable> values) ...Управление и планированиевычисленийУправление вычислениями - YARN+MRAppMasterResourceManager (он один)распределяет ресурсы кластерадля любых типов задач.NodeManager управляетконтейнерами своего узла.AppMaster реализует логикураспределенных вычисленийсвоего типа (например,MRAppMaster для MapReduce).Для каждого запускаемогоприложения создается отдельныйAppMaster нужного типа.Container - ресурс (ОЗУ, ядра),выделенный приложению.Название параметраПо умолчаниюОписаниеmapreduce.framework.namelocalМодель управления вычислениями (local, yarn, classic)yarn.resourcemanager.address${yarn.resourcemanager.hostname}:8032Хост:порт менеджера ресурсов YARNyarn.scheduler.minimum-allocation-mb1024Минимальный объем памяти, выделяемой для контейнера (запросы напамять д.б.
кратны ему)yarn.scheduler.maximum-allocation-mb8192Максимальный объем памяти, выделяемой для контейнераyarn.scheduler.minimum-allocation-vcores1Минимальное число ядер, выделяемый для контейнераyarn.scheduler.maximum-allocation-vcores32Максимальное число ядер, выделяемое для контейнераyarn.nodemanager.resource.memory-mb8192Объем памяти NodeManageryarn.nodemanager.resource.cpu-vcores8Число ядер NodeManagermapreduce.map.memory.mb1024Объем памяти, запрашиваемый для Mappermapreduce.map.java.opts-Перегружает mapreduce.reduce.java.opts (по умолчанию “-Xmx200m”)mapreduce.map.cpu.vcores1Число ядер, запрашиваемых для Mappermapreduce.reduce.memory.mb1024Объем памяти, запрашиваемый для Reducermapreduce.reduce.java.opts-Перегружает mapreduce.reduce.java.opts (по умолчанию “-Xmx200m”)mapreduce.reduce.cpu.vcores1Число ядер, запрашиваемых для ReducerUber taskUber task - задача, исполняемая в JVM MRAppMastermapreduce.job.ubertask.enable=truemapreduce.job.ubertask.maxmaps=10mapreduce.job.ubertask.maxreduces=1Управление вычислениями (Hadoop 1)JobTracker ответственнен запланирование одновременноговычисления несколькихMapReduce приложений накластере.JobTracker один на всеMapReduce приложенияКаждый TaskTracker управляетMappers и Reducers на своемузле.Ресурсы кластера описывалисьmap и reduce слотами,зарезервированными под задачисоответствующих типов.Планирование исполнения программ многихпользователей1.2.3.4.Очередь программ (FIFO)Очередь с приоритетами (mapred.job.priority)CapacitySchedulerFairSchedulerПланировщикиCapacityScheduler<configuration><property><name>yarn.scheduler.capacity.root.queues</name><value>lvk,mlab</value></property><property><name>yarn.scheduler.capacity.root.mlab.queues</name><value>comm,science</value></property><property><name>yarn.scheduler.capacity.root.lvk.capacity</name><value>40</value></property><property><name>yarn.scheduler.capacity.root.mlab.capacity</name><value>60</value></property><property><name>yarn.scheduler.capacity.root.lvk.maximum-capacity</name><value>75</value></property><property><name>yarn.scheduler.capacity.root.mlab.comm.capacity</name><value>50</value></property><property><name>yarn.scheduler.capacity.root.mlab.science.capacity</name><value>50</value></property></configuration>rootlvkmlabcomm●●scienceОтнесение задачи к очереди спомощью: mapreduce.job.queuenameОтображение ACL на очередиFairScheduler<allocations><defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy><!-- еще есть drf, fifo --><queue name="lvk"><weight>40</weight><schedulingPolicy>fifo</schedulingPolicy></queue><queue name="mlab"><weight>60</weight><queue name="comm"/><queue name="science"/></queue><queuePlacementPolicy><rule name="specified" create="false"/> <-- группа задана --><rule name="primaryGroup" create="false"/> <-- Unix group (вес 1!) --><rule name="default" queue="mlab.comm"/> <-- иначе --></queuePlacementPolicy></allocations>Вытеснение (preemption)В конфиге:yarn.scheduler.fair.preemption=trueТайм-ауты (сек) вытеснения:●defaultMinSharePreemptionTimeout●minSharePreemptionTimeoutОтложенное планирование (Delay Scheduling)Возможность планирования (scheduling opportunity) - heart beat от NodeManager кResourceManager.Отложенное планирование - попытка откладывания решения по запросу планирования,содержащего требование к местоположению контейнера, до освобождения ресурсов снужной локальностью.СвойствоОписаниеПример значенияyarn.scheduler.capacity.nodelocality-delayЗадержка (в числе возможностейпланирования) до получениятребуемого ресурса.1yarn.scheduler.fair.locality.threshold.nodeЖдать требуемого ресурса пока небудут получены ответы от заданнойчасти NodeManager0,5Dominant Resource Fairness (DRF)Кластер:100 ядер10 ТБ ОЗУПриложения А и Б запрашивают контейнеры:А2 ядра300 ГБ2% ядер3% ОЗУБ6 ядра100 ГБ6% ядер1% ОЗУБ получит в два разаменьше контейнеров чем АРазбор домашнего задания №1Вывод слов с max частотой встречаемости--Вычисляем в редьюсере максимальную частоту встречаемости слов иформируем в нем, например, список слов с максимальной частотойвстречаемости.Используем метод cleanup:@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {for (Text maxWord : maxWords) {context.write(maxWord, new IntWritable(maxValue));}}Вывод статистики по длинам слов-Меняем типы данных на IntWritable, где необходимо.Заменяем слово на его длину:public static class TokenizerMapperextends Mapper<Object, Text, IntWritable, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(new IntWritable(word.toString().length()), one);}}}-Аналогично меняем типы данных в классе редьюсера и main..