Лекция 7. HDFS и основы Hadoop Java API (1185415)
Текст из файла
Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №3. HDFS и основы Hadoop Java APIhttp://bigdata.cs.msu.rubigdata@cs.msu.ruИстория созданияДуг Каттинг и Майкл Кафарелла, 2005●●Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung “The Google File System”, 2003 http://research.google.com/archive/gfs.htmlJeffrey Dean and Sanjay Ghemawat “MapReduce: Simplified Data Processing on LargeClusters”, 2004 http://research.google.com/archive/mapreduce.htmlВерсии HadoopMapReduce+HDFS2+YARNОсновные программные интерфейсы Hadoop● Java API● Streaming● PipesОткуда взять?HDFSHDFS: чтениеНекоторые отличия GFS/HDFSGFSHDFSПараллельный appendRead-append по модели“читатель-писатель”Есть random access read/writeНет random access read/writeNameNode●В ОЗУ:○○●МетаданныеРазмещение блоков по DataNodesВ виде файлов:○○Метаданные - fsimageЖурнал изменений - editsПри старте Namenode:● DataNodes сообщают о хранящихся на них блокахЗащита от полной потери fsimage и edits - автоматический регулярныйлокальный или удаленный (NFS) бекап.Защита от сбоевWrite Ahead Log (WAL)При создании, перемещении файла:1.
обновляется журнал изменений2. обновляются метаданные в ОЗУfsimage - не обновляется!При (ре)старте Namenode:● DataNodes сообщают о хранящихся на них блоках● edits накатываются на fsimage и последний обновляется на диске изагружается в ОЗУSecondary NameNodeПроблема: журнал изменений разрастается, увеличивается времявосстановления после рестарта NameNode. Ухудшается доступность.Решение: контрольные точки - периодическое “проигрывание” editsтекущей версии fsimage на Secondary NameNode.Возможности HDFS 2.0●HDFS Federation - возможностьдобавления NameNode, управляющихсвоей частью иерархии каталогов○○Независимые иерархии файловОбщий пул блоков (общее множество DataNodes)Возможности HDFS 2.0●Высокая доступность (HighAvailability) - быстрое переключениена резервный NameNode в случаеотказа основного NameNode○○Разделение на Active/StandbyОбщее хранилище для edits○Standby непрерывно актуализирует своиметаданные○Запланированное/незапланированноепреодоление сбояИзоляция при сбоях○editsРазмер блокаЦель: время позиционирования - ~1% от времени передачи.Время позиционирования 10 миллисек, p=0.01Скорость передачи 100МБ/сек, v=100Размер блока, block_size=?V - объем данныхV/block_size*p=0.01*V/vblock_size=p*v*100=100МБРазмер блока HDFS, по умолчанию: 128МБHDFS: записьdfs.replication=3dfs.replication.min=1Пакеты находятся в:● Очереди данных● Очереди подтверждений●●Конвейерная запись репликАсинхронное “дореплицирование”в случае сбоевРазмещение репликТребования● надежность● пропускная способность чтения/записиАлгоритм по умолчанию● 1я реплика - на узле клиента (есликлиент не в кластере - наслучайном узле)● 2я реплика - на случайном узле влюбой другой стойке.● 3я реплика - в случайном узле тойже стойки, где находится 2яреплика.● Последующие реплики - вслучайных узлах кластера (безперегрузки стоек)Топология задается с помощьюскриптов и конфиг-файловnode1 /dc1/rack1node2 /dc1/rack1node3 /dc1/rack2Какая метрика требуется при чтении и планировании исполнения map-задач?Расстояние между листьями в иерархии: узел-стойка-ЦОД-кореньИнтерфейсы к HDFS●●●●●командная строкаJava APIHTTP (HFTP, HSFTP, WebHDFS)libhdfs (c)FUSEКомандная строкаfs.defaultFS=hdfs://mynamenodedfs.replication=3hadoop fs -cmd <args>hadoop fs -ls /hadoop fs - lsr /hadoop fs - cat example.txthadoop fs -mkdir /user/sergunhadoop fs -put file://example.txt /hadoop fs -copyFromLocal example.txt hdfs://mynamenodehadoop fs - put example.txt /user/sergunhadoop fs -get example.txt /hadoop fs -copyToLocal /example.txt .hadoop fs -cat /example.txt | headhadoop fs -tail /example.txthadoop fs -rm /example.txthadoop fs -getmerge /res all.txthadoop fs -help <cmd>Файловые системы, доступные из HDFSФайловая системаСхема URIЛокальнаяfileHDFShdfsAzurewasbAmazon S3s3, s3nFTPftp...Другие утилитыhadoop fsck /user/sergun/test.txt -files -blocks -rackshadoop distcp -update -delete -p -m 20 hdfs://namenode1/foo hdfs://namenode2/fooJava API: основные типыСериализация: обертки типовpackage org.apache.hadoop.ioРеализацииWritableComparable<T>interface Writable {void readFields(DataInput in) throws IOException;void write(DataOutput out) throws IOException;}BooleanWritableByteWritableDoubleWritableinterface java.lang.Comparable<T> {int compareTo(T o);}interface WritableComparable<T> extends Writable, java.lang.Comparable<T>Для ключей: WritableComparable<T>Для значений: WritableFloatWritableIntWritableLongWritableTextNullWritableRawComparatorpackage org.apache.hadoop.io;import java.util.Comparator;public interface RawComparator<T> extends Comparator<T>{public int compare(byte[] b1,int s1, int l1, byte[] b2, int s2, int l2);}Распределитель (Mapper)package org.apache.hadoop.mapreduce;public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends Object {protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)throws IOException, InterruptedExceptionprotected void map(KEYIN key, VALUEIN value, /*org.apache.hadoop.mapreduce.Reducer.*/Contextcontext)throws IOException, InterruptedException...}class Reducer.Context{void write(KEYOUT key,VALUEOUT value) throws IOException, InterruptedException...}Реализации распределителейРаспределительОписаниеFieldSelectionMapper<K,V>unix cut Mapper<K,V,Text,Text>InverseMapper<K,V>Mapper<K,V,V,K>, меняющий местами ключ изначениеRegexMapper<K>Mapper<K,Text,Text,LongWritable>Образец: mapreduce.mapper.regexГруппа:mapreduce.mapper.regexmapper.groupВозвращает: (совпадение, 1)TokenCountMapper<K>Разбивает строку на токены с помощьюStringTokenizerMapper<K,Text,Text,LongWritable>(токен,1)MultiThreadedMapper<K,V>Многонитиевой распределитель.Число нитей в пуле: mapreduce.mapper.multithreadedmapper.threadsРедуктор(Reducer)package org.apache.hadoop.mapreduce;public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends Object {protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)throws IOException, InterruptedExceptionprotected void reduce(KEYIN key, Iterable<VALUEIN> values, /*org.apache.hadoop.mapreduce.Mapper.*/Context context)throws IOException, InterruptedException...}class Reducer.Context{void write(KEYOUT key,VALUEOUT value) throws IOException, InterruptedException...}CombinerCombinersum_reducer(‘the’, [1,1,1,1,1])=sum_reducer(‘the’, sum_reducer(‘the’,[1,1,1]), sum_reducer(‘the’,[1,1]))Типы агрегатных функцийПримерыОписаниеДистрибутивныеmax, min, sum, countФункцию можно вычислятьрекурсивно на подмножествахданныхАлгебраическиеavg, varФункция можно вычислить наподмножествах данных спомощью несколькихдистрибутивных функцийХолистическиеmedian, quantilesФункцию нельзя вычислитьрекурсивно на подмножествахданныхCombinerpublic static class ReduceClass extends Reducer<Text,IntWritable,Text,IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}context.write(key, new IntWritable(sum));}}job.setReducerClass(ReduceClass.class);job.setCombinerClass(ReduceClass.class);Тасовка в Hadoop MapReduceВ чем отличие от тасовки+сортировки в Google MapReduce?combinerПреимущества от использования Combiner● Уменьшение трафика между Mappers и Reducers● Уменьшение объемов записи на диск в Mappers иReducersПередача параметров в программуПередача параметровpublic class WordCount {public static class MapClass ...public static class ReduceClass ...public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word count");...import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class ToolWordCount extends Configured implements Tool {...public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Configuration(), new ToolWordCount(), args);System.exit(res);}FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));@Overridepublic int run(String[] args) throws Exception {Configuration conf = this.getConf();job.setJarByClass(WordCount.class);job.setMapperClass(MapClass.class);job.setReducerClass(ReduceClass.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);Job job = Job.getInstance(conf, "word count");FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setNumReduceTasks(2);job.setJarByClass(WordCount.class);job.setMapperClass(MapClass.class);job.setReducerClass(ReduceClass.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);System.exit(job.waitForCompletion(true) ? 0 : 1);}}return job.waitForCompletion(true) ? 0 : 1;}}Теперь можно передавать параметры$ hadoop jar wc.jar input output$ hadoop jar wc.jar input output -D mapred.reduce.tasks=5$ hadoop jar wc.jar input output -conf local.xml$ hadoop jar wc.jar input output -conf cluster.xml$ hadoop jar wc.jar input output -fs hdfs://namenode -jt localhost:8021fs - замена “-D fs.defaultFS” (бывший fs.default.name) - NameNodejt - замена “-D yarn.resourcemanager.address” (бывший mapred.job.tracker) - основная “точка входа”в вычисления на кластереВходные форматыInputFormatsInputFormatОписаниеTextInputFormatКлюч: смещение начала строки в байтах(LongWritable)Значение: Строка (Text)KeyValueTextInputFormatПервый разделитель (key.value.separator.in.input.line, поумолчанию \t)делит строку на ключ (Text) и значение(Text)SequenceFileInputFormat<K,V>Двоичный сжатый формат (длясцепления задач)NLineInputFormatТо же, что и TextInputFormat, нопорциями по N строк (mapred.line.input.format.linespermap, поумолчанию 1)Разбиение на сплитыpublic abstract class InputSplit {public abstract long getLength() throws IOException, InterruptedException;public abstract String[] getLocations() throws IOException, InterruptedException;}public abstract class InputFormat<K, V> {public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException;}public interface RecordReader<K,V> {boolean next(K key, V value) throws IOException;K createKey();V createValue();long getPos() throws IOException;public void close() throws IOException;float getProgress() throws IOException;}MapperMapperMr Sherlock Holmes, who was usuallyvery late in the mornings, save uponthose not infrequent occasions whenhe was up all night, was seated at thebreakfast table.
Характеристики
Тип файла PDF
PDF-формат наиболее широко используется для просмотра любого типа файлов на любом устройстве. В него можно сохранить документ, таблицы, презентацию, текст, чертежи, вычисления, графики и всё остальное, что можно показать на экране любого устройства. Именно его лучше всего использовать для печати.
Например, если Вам нужно распечатать чертёж из автокада, Вы сохраните чертёж на флешку, но будет ли автокад в пункте печати? А если будет, то нужная версия с нужными библиотеками? Именно для этого и нужен формат PDF - в нём точно будет показано верно вне зависимости от того, в какой программе создали PDF-файл и есть ли нужная программа для его просмотра.