Лекция 7. HDFS и основы Hadoop Java API (2015 Лекции)
Описание файла
Файл "Лекция 7. HDFS и основы Hadoop Java API" внутри архива находится в папке "2015 Лекции". PDF-файл из архива "2015 Лекции", который расположен в категории "". Всё это находится в предмете "(смрхиод) современные методы распределенного хранения и обработки данных" из 10 семестр (2 семестр магистратуры), которые можно найти в файловом архиве МГУ им. Ломоносова. Не смотря на прямую связь этого архива с МГУ им. Ломоносова, его также можно найти и в других разделах. .
Просмотр PDF-файла онлайн
Текст из PDF
Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №3. HDFS и основы Hadoop Java APIhttp://bigdata.cs.msu.rubigdata@cs.msu.ruИстория созданияДуг Каттинг и Майкл Кафарелла, 2005●●Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung “The Google File System”, 2003 http://research.google.com/archive/gfs.htmlJeffrey Dean and Sanjay Ghemawat “MapReduce: Simplified Data Processing on LargeClusters”, 2004 http://research.google.com/archive/mapreduce.htmlВерсии HadoopMapReduce+HDFS2+YARNОсновные программные интерфейсы Hadoop● Java API● Streaming● PipesОткуда взять?HDFSHDFS: чтениеНекоторые отличия GFS/HDFSGFSHDFSПараллельный appendRead-append по модели“читатель-писатель”Есть random access read/writeНет random access read/writeNameNode●В ОЗУ:○○●МетаданныеРазмещение блоков по DataNodesВ виде файлов:○○Метаданные - fsimageЖурнал изменений - editsПри старте Namenode:● DataNodes сообщают о хранящихся на них блокахЗащита от полной потери fsimage и edits - автоматический регулярныйлокальный или удаленный (NFS) бекап.Защита от сбоевWrite Ahead Log (WAL)При создании, перемещении файла:1.
обновляется журнал изменений2. обновляются метаданные в ОЗУfsimage - не обновляется!При (ре)старте Namenode:● DataNodes сообщают о хранящихся на них блоках● edits накатываются на fsimage и последний обновляется на диске изагружается в ОЗУSecondary NameNodeПроблема: журнал изменений разрастается, увеличивается времявосстановления после рестарта NameNode. Ухудшается доступность.Решение: контрольные точки - периодическое “проигрывание” editsтекущей версии fsimage на Secondary NameNode.Возможности HDFS 2.0●HDFS Federation - возможностьдобавления NameNode, управляющихсвоей частью иерархии каталогов○○Независимые иерархии файловОбщий пул блоков (общее множество DataNodes)Возможности HDFS 2.0●Высокая доступность (HighAvailability) - быстрое переключениена резервный NameNode в случаеотказа основного NameNode○○Разделение на Active/StandbyОбщее хранилище для edits○Standby непрерывно актуализирует своиметаданные○Запланированное/незапланированноепреодоление сбояИзоляция при сбоях○editsРазмер блокаЦель: время позиционирования - ~1% от времени передачи.Время позиционирования 10 миллисек, p=0.01Скорость передачи 100МБ/сек, v=100Размер блока, block_size=?V - объем данныхV/block_size*p=0.01*V/vblock_size=p*v*100=100МБРазмер блока HDFS, по умолчанию: 128МБHDFS: записьdfs.replication=3dfs.replication.min=1Пакеты находятся в:● Очереди данных● Очереди подтверждений●●Конвейерная запись репликАсинхронное “дореплицирование”в случае сбоевРазмещение репликТребования● надежность● пропускная способность чтения/записиАлгоритм по умолчанию● 1я реплика - на узле клиента (есликлиент не в кластере - наслучайном узле)● 2я реплика - на случайном узле влюбой другой стойке.● 3я реплика - в случайном узле тойже стойки, где находится 2яреплика.● Последующие реплики - вслучайных узлах кластера (безперегрузки стоек)Топология задается с помощьюскриптов и конфиг-файловnode1 /dc1/rack1node2 /dc1/rack1node3 /dc1/rack2Какая метрика требуется при чтении и планировании исполнения map-задач?Расстояние между листьями в иерархии: узел-стойка-ЦОД-кореньИнтерфейсы к HDFS●●●●●командная строкаJava APIHTTP (HFTP, HSFTP, WebHDFS)libhdfs (c)FUSEКомандная строкаfs.defaultFS=hdfs://mynamenodedfs.replication=3hadoop fs -cmd <args>hadoop fs -ls /hadoop fs - lsr /hadoop fs - cat example.txthadoop fs -mkdir /user/sergunhadoop fs -put file://example.txt /hadoop fs -copyFromLocal example.txt hdfs://mynamenodehadoop fs - put example.txt /user/sergunhadoop fs -get example.txt /hadoop fs -copyToLocal /example.txt .hadoop fs -cat /example.txt | headhadoop fs -tail /example.txthadoop fs -rm /example.txthadoop fs -getmerge /res all.txthadoop fs -help <cmd>Файловые системы, доступные из HDFSФайловая системаСхема URIЛокальнаяfileHDFShdfsAzurewasbAmazon S3s3, s3nFTPftp...Другие утилитыhadoop fsck /user/sergun/test.txt -files -blocks -rackshadoop distcp -update -delete -p -m 20 hdfs://namenode1/foo hdfs://namenode2/fooJava API: основные типыСериализация: обертки типовpackage org.apache.hadoop.ioРеализацииWritableComparable<T>interface Writable {void readFields(DataInput in) throws IOException;void write(DataOutput out) throws IOException;}BooleanWritableByteWritableDoubleWritableinterface java.lang.Comparable<T> {int compareTo(T o);}interface WritableComparable<T> extends Writable, java.lang.Comparable<T>Для ключей: WritableComparable<T>Для значений: WritableFloatWritableIntWritableLongWritableTextNullWritableRawComparatorpackage org.apache.hadoop.io;import java.util.Comparator;public interface RawComparator<T> extends Comparator<T>{public int compare(byte[] b1,int s1, int l1, byte[] b2, int s2, int l2);}Распределитель (Mapper)package org.apache.hadoop.mapreduce;public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends Object {protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)throws IOException, InterruptedExceptionprotected void map(KEYIN key, VALUEIN value, /*org.apache.hadoop.mapreduce.Reducer.*/Contextcontext)throws IOException, InterruptedException...}class Reducer.Context{void write(KEYOUT key,VALUEOUT value) throws IOException, InterruptedException...}Реализации распределителейРаспределительОписаниеFieldSelectionMapper<K,V>unix cut Mapper<K,V,Text,Text>InverseMapper<K,V>Mapper<K,V,V,K>, меняющий местами ключ изначениеRegexMapper<K>Mapper<K,Text,Text,LongWritable>Образец: mapreduce.mapper.regexГруппа:mapreduce.mapper.regexmapper.groupВозвращает: (совпадение, 1)TokenCountMapper<K>Разбивает строку на токены с помощьюStringTokenizerMapper<K,Text,Text,LongWritable>(токен,1)MultiThreadedMapper<K,V>Многонитиевой распределитель.Число нитей в пуле: mapreduce.mapper.multithreadedmapper.threadsРедуктор(Reducer)package org.apache.hadoop.mapreduce;public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends Object {protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)throws IOException, InterruptedExceptionprotected void reduce(KEYIN key, Iterable<VALUEIN> values, /*org.apache.hadoop.mapreduce.Mapper.*/Context context)throws IOException, InterruptedException...}class Reducer.Context{void write(KEYOUT key,VALUEOUT value) throws IOException, InterruptedException...}CombinerCombinersum_reducer(‘the’, [1,1,1,1,1])=sum_reducer(‘the’, sum_reducer(‘the’,[1,1,1]), sum_reducer(‘the’,[1,1]))Типы агрегатных функцийПримерыОписаниеДистрибутивныеmax, min, sum, countФункцию можно вычислятьрекурсивно на подмножествахданныхАлгебраическиеavg, varФункция можно вычислить наподмножествах данных спомощью несколькихдистрибутивных функцийХолистическиеmedian, quantilesФункцию нельзя вычислитьрекурсивно на подмножествахданныхCombinerpublic static class ReduceClass extends Reducer<Text,IntWritable,Text,IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}context.write(key, new IntWritable(sum));}}job.setReducerClass(ReduceClass.class);job.setCombinerClass(ReduceClass.class);Тасовка в Hadoop MapReduceВ чем отличие от тасовки+сортировки в Google MapReduce?combinerПреимущества от использования Combiner● Уменьшение трафика между Mappers и Reducers● Уменьшение объемов записи на диск в Mappers иReducersПередача параметров в программуПередача параметровpublic class WordCount {public static class MapClass ...public static class ReduceClass ...public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word count");...import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class ToolWordCount extends Configured implements Tool {...public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Configuration(), new ToolWordCount(), args);System.exit(res);}FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));@Overridepublic int run(String[] args) throws Exception {Configuration conf = this.getConf();job.setJarByClass(WordCount.class);job.setMapperClass(MapClass.class);job.setReducerClass(ReduceClass.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);Job job = Job.getInstance(conf, "word count");FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setNumReduceTasks(2);job.setJarByClass(WordCount.class);job.setMapperClass(MapClass.class);job.setReducerClass(ReduceClass.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}}return job.waitForCompletion(true) ? 0 : 1;}}Теперь можно передавать параметры$ hadoop jar wc.jar input output$ hadoop jar wc.jar input output -D mapred.reduce.tasks=5$ hadoop jar wc.jar input output -conf local.xml$ hadoop jar wc.jar input output -conf cluster.xml$ hadoop jar wc.jar input output -fs hdfs://namenode -jt localhost:8021fs - замена “-D fs.defaultFS” (бывший fs.default.name) - NameNodejt - замена “-D yarn.resourcemanager.address” (бывший mapred.job.tracker) - основная “точка входа”в вычисления на кластереВходные форматыInputFormatsInputFormatОписаниеTextInputFormatКлюч: смещение начала строки в байтах(LongWritable)Значение: Строка (Text)KeyValueTextInputFormatПервый разделитель (key.value.separator.in.input.line, поумолчанию \t)делит строку на ключ (Text) и значение(Text)SequenceFileInputFormat<K,V>Двоичный сжатый формат (длясцепления задач)NLineInputFormatТо же, что и TextInputFormat, нопорциями по N строк (mapred.line.input.format.linespermap, поумолчанию 1)Разбиение на сплитыpublic abstract class InputSplit {public abstract long getLength() throws IOException, InterruptedException;public abstract String[] getLocations() throws IOException, InterruptedException;}public abstract class InputFormat<K, V> {public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException;}public interface RecordReader<K,V> {boolean next(K key, V value) throws IOException;K createKey();V createValue();long getPos() throws IOException;public void close() throws IOException;float getProgress() throws IOException;}MapperMapperMr Sherlock Holmes, who was usuallyvery late in the mornings, save uponthose not infrequent occasions whenhe was up all night, was seated at thebreakfast table.