Лекция 8. Hadoop Java API продолжение. Управление и планирование вычислений (1185416)
Текст из файла
Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №4. Hadoop Java API: продолжение.Управление и планирование вычислений.http://bigdata.cs.msu.rubigdata@cs.msu.ruHadoop Java API: продолжениеSequenceFile●●●●sync маркер - между записямиформат без/c сжатиемсжатие записей/блоковзаписейSequenceFileInputFormat<K,V>Вероятность коллизии прииспользовании 16байтового MD5 маркера в 1ПБ файле - 10-23Сжатие данныхЗачем?● Оптимизирует затраты на хранение● Сокращает время передачи по сети, с/на дискГде используется?● Входные данные могут поступать в сжатом виде (архивы)● Сжатие выходных данных Mapper’ов● Архивирование выходных данных программыОбработка сжатых входных данных●●Формат сжатия определяется по расширению файлаОсновной вопрос: поддержка разбиенияФормат сжатияПрограммаDEFLATEАлгоритмРасширениеПоддержкаразбиенияDEFLATE.dflatedНетgzipgzipDEFLATE.gzНетbzip2bzip2bzip2.bz2ДаLZOLZOpLZO.LZOНет (только припредварительноминдексировании)LZ4-LZ4.lz4НетSnappy-Snappy.snappyНет48-битный маркер между блоками bzip2 0x314159265359Использование сжатия в Hadoop MapReduceИмя свойстваТипсвойстваЗначение по умолчаниюОписаниеmapred.output.compressbooleanfalseСжатие выходных данныхmapred.output.compression.codecClassnameorg.apache.hadoop.io.compress.DefaultCodecКодек для сжатия выходных данныхmapred.output.compression.typeStringRECORDТип сжатия для выходныхSequenceFilemapred.compress.map.outputbooleanfalseСжатие выходных данных Mappermapred.map.output.compression.codecClassorg.apache.hadoop.io.compress.DefaultCodecКодек для сжатия выходных данныхMapperОбработка ошибок и сбоевСвойствоПо умолчаниюОписаниеmapreduce.task.timeout600000Число миллсек., в течениекоторых ожидается активностьот задачи (чтение, запись,изменение счетчиков и т.п.)mapreduce.map.maxattempts4Сколько раз Mapper м.б.перезапущена из-за сбоя допризнания всей программысбойной.mapreduce.reduce.maxattempts4Сколько раз Reducer м.б.перезапущен из-за сбоя допризнания всей программысбойной.СчетчикиСчетчикОписаниеMAP_INPUT_RECORDSЧисло вызовов mapMAP_INPUT_BYTESСуммарный (в несжатом виде) объем входных данных всех вызовов map.MAP_OUTPUT_RECORDSЧисло выходных пар, созданных всеми map.MAP_OUTPUT_BYTESСуммарный (в несжатом виде) объем выходных данных всех вызовов map.MAP_OUTPUT_MATERIALIZED_BYTESОбъем выходных данных отображений, записанных на диск (сжатие влияет!)COMBINE_INPUT_RECORDS! Сумма длин всех списков значений во всех вызовах.COMBINE_OUTPUT_RECORDSСуммарное число созданных выходных пар.REDUCE_INPUT_GROUPSЧисло вызовов reduceREDUCE_INPUT_RECORDSСумма длин всех списков значений во всех вызовах.GC_TIME_MILLСуммарное время, затраченное на GC (в миллисек.)...Пользовательские счетчикиenum WordType {BAD,GOOD}if(isBad(word))context.getCounter(“WordType.BAD”).increment(1);…// Динамический счетчикcontext.getCounter(“SpacesCount”).increment(1);СортировкаКак обеспечить полную сортировку результатов?Написать CustomPartitioner, формирующий разделы на основе первой(ых)букв(ы) слов.RandomSampler, TotalOrderPartitioner и DistributedCacheРешение - сэмплинг ключей.public class InputSampler.RandomSampler {public InputSampler.RandomSampler(double freq, int numSamples, int maxSplitsSampled)public K[] getSample(InputFormat<K,V> inf, Job job) throws IOException, InterruptedException}// фрагмент run()job.setPartitionerClass(TotalOrderPartitioner.class);InputSampler.Sampler<Text, IntWritable> sampler = new InputSampler.RandomSampler<Text,IntWritable>(0.1,1000, 2);InputSampler.writePartitionFile(job, sampler);Configuration conf = job.getConfiguration();String partitionFile=TotalOrderPartitioner.getPartitionFile(conf);URI partitionUri = new URI(partitionFile);job.addCacheFile(partitionUri);return job.waitForCompletion(true) ? 0: 1;Secondary Sort#SYMBOL,SYSTEM,MOMENT,ID_DEAL,PRICE_DEAL,VOLUME,OPEN_POS,DIRECTIONSVH1,F,20110111100000080,255223067,30.46000,1,8714,SSVH1,F,20110111100000080,255223068,30.38000,1,8714,SSVH1,F,20110111100000080,255223069,30.32000,1,8714,SSVH1,F,20110111100000080,255223070,30.28000,2,8714,SSVH1,F,20110111100000080,255223071,30.25000,1,8714,SSVH1,F,20110111100000080,255223072,30.05000,1,8714,SSVH1,F,20110111100000080,255223073,30.05000,3,8714,SRIH1,F,20110111100000097,255223074,177885.00000,1,291758,BRIH1,F,20110111100000097,255223075,177935.00000,2,291758,BRIH1,F,20110111100000097,255223076,177980.00000,10,291758,BRIH1,F,20110111100000097,255223077,177995.00000,1,291758,BRIH1,F,20110111100000097,255223078,178100.00000,2,291758,BRIH1,F,20110111100000097,255223079,178200.00000,1,291758,BRIH1,F,20110111100000097,255223080,178205.00000,1,291758,BПо каждому инструменту получить min цену за каждый день.SecondarySortКлюч: дата+инструмент+ценаЗначение: NullWritablejob.setPartitionerClass(MyPartitioner.class); // hash(дата+инструмент)%Rjob.setSortComparator(MySortComparator.class); // по дата,инструмент,ценаjob.setGroupingComparator(MyGroupComparator); // по дата,инструмент… protected void reduce(MyClass key, Iterable<NullWritable> values) ...Управление и планированиевычисленийУправление вычислениями - YARN+MRAppMasterResourceManager (он один)распределяет ресурсы кластерадля любых типов задач.NodeManager управляетконтейнерами своего узла.AppMaster реализует логикураспределенных вычисленийсвоего типа (например,MRAppMaster для MapReduce).Для каждого запускаемогоприложения создается отдельныйAppMaster нужного типа.Container - ресурс (ОЗУ, ядра),выделенный приложению.Название параметраПо умолчаниюОписаниеmapreduce.framework.namelocalМодель управления вычислениями (local, yarn, classic)yarn.resourcemanager.address${yarn.resourcemanager.hostname}:8032Хост:порт менеджера ресурсов YARNyarn.scheduler.minimum-allocation-mb1024Минимальный объем памяти, выделяемой для контейнера (запросы напамять д.б.
кратны ему)yarn.scheduler.maximum-allocation-mb8192Максимальный объем памяти, выделяемой для контейнераyarn.scheduler.minimum-allocation-vcores1Минимальное число ядер, выделяемый для контейнераyarn.scheduler.maximum-allocation-vcores32Максимальное число ядер, выделяемое для контейнераyarn.nodemanager.resource.memory-mb8192Объем памяти NodeManageryarn.nodemanager.resource.cpu-vcores8Число ядер NodeManagermapreduce.map.memory.mb1024Объем памяти, запрашиваемый для Mappermapreduce.map.java.opts-Перегружает mapreduce.reduce.java.opts (по умолчанию “-Xmx200m”)mapreduce.map.cpu.vcores1Число ядер, запрашиваемых для Mappermapreduce.reduce.memory.mb1024Объем памяти, запрашиваемый для Reducermapreduce.reduce.java.opts-Перегружает mapreduce.reduce.java.opts (по умолчанию “-Xmx200m”)mapreduce.reduce.cpu.vcores1Число ядер, запрашиваемых для ReducerUber taskUber task - задача, исполняемая в JVM MRAppMastermapreduce.job.ubertask.enable=truemapreduce.job.ubertask.maxmaps=10mapreduce.job.ubertask.maxreduces=1Управление вычислениями (Hadoop 1)JobTracker ответственнен запланирование одновременноговычисления несколькихMapReduce приложений накластере.JobTracker один на всеMapReduce приложенияКаждый TaskTracker управляетMappers и Reducers на своемузле.Ресурсы кластера описывалисьmap и reduce слотами,зарезервированными под задачисоответствующих типов.Планирование исполнения программ многихпользователей1.2.3.4.Очередь программ (FIFO)Очередь с приоритетами (mapred.job.priority)CapacitySchedulerFairSchedulerПланировщикиCapacityScheduler<configuration><property><name>yarn.scheduler.capacity.root.queues</name><value>lvk,mlab</value></property><property><name>yarn.scheduler.capacity.root.mlab.queues</name><value>comm,science</value></property><property><name>yarn.scheduler.capacity.root.lvk.capacity</name><value>40</value></property><property><name>yarn.scheduler.capacity.root.mlab.capacity</name><value>60</value></property><property><name>yarn.scheduler.capacity.root.lvk.maximum-capacity</name><value>75</value></property><property><name>yarn.scheduler.capacity.root.mlab.comm.capacity</name><value>50</value></property><property><name>yarn.scheduler.capacity.root.mlab.science.capacity</name><value>50</value></property></configuration>rootlvkmlabcomm●●scienceОтнесение задачи к очереди спомощью: mapreduce.job.queuenameОтображение ACL на очередиFairScheduler<allocations><defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy><!-- еще есть drf, fifo --><queue name="lvk"><weight>40</weight><schedulingPolicy>fifo</schedulingPolicy></queue><queue name="mlab"><weight>60</weight><queue name="comm"/><queue name="science"/></queue><queuePlacementPolicy><rule name="specified" create="false"/> <-- группа задана --><rule name="primaryGroup" create="false"/> <-- Unix group (вес 1!) --><rule name="default" queue="mlab.comm"/> <-- иначе --></queuePlacementPolicy></allocations>Вытеснение (preemption)В конфиге:yarn.scheduler.fair.preemption=trueТайм-ауты (сек) вытеснения:●defaultMinSharePreemptionTimeout●minSharePreemptionTimeoutОтложенное планирование (Delay Scheduling)Возможность планирования (scheduling opportunity) - heart beat от NodeManager кResourceManager.Отложенное планирование - попытка откладывания решения по запросу планирования,содержащего требование к местоположению контейнера, до освобождения ресурсов снужной локальностью.СвойствоОписаниеПример значенияyarn.scheduler.capacity.nodelocality-delayЗадержка (в числе возможностейпланирования) до получениятребуемого ресурса.1yarn.scheduler.fair.locality.threshold.nodeЖдать требуемого ресурса пока небудут получены ответы от заданнойчасти NodeManager0,5Dominant Resource Fairness (DRF)Кластер:100 ядер10 ТБ ОЗУПриложения А и Б запрашивают контейнеры:А2 ядра300 ГБ2% ядер3% ОЗУБ6 ядра100 ГБ6% ядер1% ОЗУБ получит в два разаменьше контейнеров чем АРазбор домашнего задания №1Вывод слов с max частотой встречаемости--Вычисляем в редьюсере максимальную частоту встречаемости слов иформируем в нем, например, список слов с максимальной частотойвстречаемости.Используем метод cleanup:@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {for (Text maxWord : maxWords) {context.write(maxWord, new IntWritable(maxValue));}}Вывод статистики по длинам слов-Меняем типы данных на IntWritable, где необходимо.Заменяем слово на его длину:public static class TokenizerMapperextends Mapper<Object, Text, IntWritable, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(new IntWritable(word.toString().length()), one);}}}-Аналогично меняем типы данных в классе редьюсера и main..
Характеристики
Тип файла PDF
PDF-формат наиболее широко используется для просмотра любого типа файлов на любом устройстве. В него можно сохранить документ, таблицы, презентацию, текст, чертежи, вычисления, графики и всё остальное, что можно показать на экране любого устройства. Именно его лучше всего использовать для печати.
Например, если Вам нужно распечатать чертёж из автокада, Вы сохраните чертёж на флешку, но будет ли автокад в пункте печати? А если будет, то нужная версия с нужными библиотеками? Именно для этого и нужен формат PDF - в нём точно будет показано верно вне зависимости от того, в какой программе создали PDF-файл и есть ли нужная программа для его просмотра.