Лекция 9. Hadoop Streaming_ Joins (2015 Лекции)
Описание файла
Файл "Лекция 9. Hadoop Streaming_ Joins" внутри архива находится в папке "2015 Лекции". PDF-файл из архива "2015 Лекции", который расположен в категории "". Всё это находится в предмете "(смрхиод) современные методы распределенного хранения и обработки данных" из 10 семестр (2 семестр магистратуры), которые можно найти в файловом архиве МГУ им. Ломоносова. Не смотря на прямую связь этого архива с МГУ им. Ломоносова, его также можно найти и в других разделах. .
Просмотр PDF-файла онлайн
Текст из PDF
Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №5. Hadoop Streaming, Joins.http://bigdata.cs.msu.rubigdata@cs.msu.ruHadoop Streamingcat input.txt | my_mapper | sort -k1,1 | my_reducer > output.txtHadoop Streaming●●●●Реализация Mapper, Reducer, Combiner на любом языке программирования (популярны Unixутилиты, Python)Интерфейс взаимодействия - stdin, stdoutПрограммист ответственен за выделение ключей и значений из входных данныхКлючи и значения - текстовые, нет поддержки (*) бинарных данныхФинансовые данные#SYMBOL,SYSTEM,MOMENT,ID_DEAL,PRICE_DEAL,VOLUME,OPEN_POS,DIRECTIONSVH1,F,20110111100000080,255223067,30.46000,1,8714,SSVH1,F,20110111100000080,255223068,30.38000,1,8714,SSVH1,F,20110111100000080,255223069,30.32000,1,8714,SSVH1,F,20110111100000080,255223070,30.28000,2,8714,SSVH1,F,20110111100000080,255223071,30.25000,1,8714,SSVH1,F,20110111100000080,255223072,30.05000,1,8714,SSVH1,F,20110111100000080,255223073,30.05000,3,8714,SRIH1,F,20110111100000097,255223074,177885.00000,1,291758,BRIH1,F,20110111100000097,255223075,177935.00000,2,291758,BRIH1,F,20110111100000097,255223076,177980.00000,10,291758,BRIH1,F,20110111100000097,255223077,177995.00000,1,291758,BRIH1,F,20110111100000097,255223078,178100.00000,2,291758,BRIH1,F,20110111100000097,255223079,178200.00000,1,291758,BRIH1,F,20110111100000097,255223080,178205.00000,1,291758,BДневные минимумы цены каждого инструмента#SYMBOL,SYSTEM,MOMENT,ID_DEAL,PRICE_DEAL,VOLUME,OPEN_POS,DIRECTIONSVH1,F,20110111100000080,255223067,30.46000,1,8714,SSVH1,F,20110111100000080,255223069,30.32000,1,8714,SRIH1,F,20110111100000097,255223074,177885.00000,1,291758,BRIH1,F,20110111100000097,255223075,177935.00000,2,291758,BSVH1,F,20110111100000080,255223068,30.38000,1,8714,SRIH1,F,20110111100000097,255223076,177980.00000,10,291758,BRIH1,F,20110111100000097,255223077,177995.00000,1,291758,B...mapper.py#!/usr/bin/env pythonimport sysfor line in sys.stdin:if line.startswith('#'):continuesymbol, _, moment, _, price,_ = line.split(',',5)print '{}{}\t{}'.format(symbol, moment[:8],price)Выход mapper.pySVH120110111 30.46000SVH120110111 30.32000RIH120110111 177885.00000RIH120110111 177935.00000SVH120110111 30.38000RIH120110111 177980.00000RIH120110111 177995.00000Дневные минимумы цен каждого инструментаВход reducer.py (отсортирован по ключам)SVH120110111 30.46000SVH120110111 30.32000SVH120110111 30.38000RIH120110111 177885.00000RIH120110111 177935.00000RIH120110111 177980.00000RIH120110111 177995.00000Запускhadoop jar hadoop-streaming-x.x.x.jar \-mapper mapper.py \-reducer reducer.py \-combiner reducer.py \-files mapper.py,reducer.py\-input wasb://financedata@bigdatamsu.blob.core.windows.net/ \-output /minmax \-numReduceTasks=10reducer.py#!/usr/bin/env pythonimport sys(prev_key, min_price) = (None, None)for line in sys.stdin:key, price = line.split('\t')price=float(price)if key!=prev_key:if prev_key!=None:print '{}\t{}'.format(key, min_price)min_price = Noneprev_key=keyif min_price==None or price<min_price:min_price=priceif prev_key != None:print '{}\t{}'.format(key, min_price)Отладкаcat input.txt | mapper.py | sort -k1,1 | reducer.py > output.txtyarn log -applicationId application_1446656107130_0006Значения по умолчаниюhadoop jar hadoop-streaming-*.jar \-inputformat org.apache.hadoop.mapred.TextInputFormat \-mapper /bin/cat \-partitioner org.apache.hadoop.mapred.lib.HashPartitioner \-numReduceTasks 1 \-reducer org.apache.hadoop.mapred.lib.IdentityReducer \-outputformat org.apache.hadoop.mapred.TextOutputFormatTextInputFormat не передает в Mapper ключ (смещения строкотносительно начала файла)!Для других входных форматов того же можно достигнуть с помощьюstream.map.input.ignoreKey=trueУправление разделителями ключей и значенийСвойствоПоумолчаниюОписаниеstream.map.output.field.separator\tОжидаемый разделитель ключей и значений,генерируемых Mapperstream.num.map.output.key.fields1Номер разделителя, после которогоначинается значение в выходе Mapperstream.reduce.output.field.separator\tОжидаемый разделитель ключей и значений,генерируемых Reducerstream.num.reduce.output.key.fields1Номер разделителя, после которогоначинается значение в выходе Reducerstream.map.input.field.separator\tРазделитель ключей и значений на входеMapperstream.reduce.input.field.separator\tРазделитель ключей и значений на входеReducerУправление разделителями ключей и значенийKeyFieldBasedPartitionermapper_part.py#!/usr/bin/env pythonimport sysfor line in sys.stdin:if line.startswith('#'):continuesymbol, _, moment, _, price,_ = line.split(',',5)print '{}.{}.{}'.format(symbol, moment[:8],price)Запускhadoop jar hadoop-streaming-x.x.x.jar \-D stream.map.output.field.separator=.
\-D stream.num.map.output.key.fields=2 \-D mapreduce.map.output.key.field.separator=. \-D mapreduce.partition.keypartitioner.options=-k1,1 \-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner-mapper mapper_part.py \-reducer reducer.py \-combiner reducer.py \-files mapper_part.py,reducer.py\-input wasb://financedata@bigdatamsu.blob.core.windows.net/ \-output /minmax \-numReduceTasks=10В результате для partitioning будет использовано название инструмента.KeyFieldBasedComparatorЗапускhadoop jar hadoop-streaming-x.x.x.jar \-D stream.map.output.field.separator=.
\-D stream.num.map.output.key.fields=2 \-D mapreduce.map.output.key.field.separator=. \-D mapreduce.partition.keycomparator.options=’-k1,1 -k2,2nr’ \-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \-mapper mapper_part.py \-reducer reducer.py \-combiner reducer.py \-files mapper_part.py,reducer.py\-input wasb://financedata@bigdatamsu.blob.core.windows.net/ \-output /minmax \-numReduceTasks=10Получаем сортировку по инструменту в прямом и по датам в обратномпорядке.AggregateDoubleValueSumLongValueMaxLongValueMinLongValueSumStringValueMaxStringValueMinUniqValueCountValueHistogramВыход Mapper<имя функции>:<ключ>\t<значение>Пример “Дневной объем торгов” (значениеDEAL_PRICE * VOLUME)DoubleValueSum:RIH120110111\t355870.0DoubleValueSum:RIH120110111\t355885.0...AggregateЗапускhadoop jar hadoop-streaming-x.x.x.jar \-mapper mapper_aggr.py \-reducer aggregate \-combiner aggregate \-files mapper_aggr.py \-input wasb://financedata@bigdatamsu.blob.core.windows.net/ \-output /minmax \-numReduceTasks=10ValueHistogramФормирует статистику по строковым значениям:● кол-во уникальных значений● min● median● max● avg● stdValueHistogram:<ключ>\t<значение>\t<количество>илиValueHistogram:<ключ>\t<значение>(количество=1)JoinsJoinsФайл SVH1MOMENT201101111000201101111005201101111010201101111015Файл RIH1CLOSEMOMENTCLOSE20110111100517100201101111010173002011011110151720030.130.530.630.2...INNERJOIN...MOMENTCLOSE_SVH1CLOSE_RIH120110111100530.51710020110111101030.61730020110111101530.217200...Joins●●●на стороне Mapperна стороне Reducerс использованием DistributedCacheJoin с помощью DistributedCachestatic class Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {private SymbolMetadata metadata;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {metadata = new SymbolMetadata();metadata.initialize(new File("symbol_names.txt"));}@Overrideprotected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {String name = metadata.getFullName(key.toString());int maxValue = Double.MIN_VALUE;for (DoubleWritable value : values) {maxValue = Math.max(maxValue, value.get());}context.write(new Text(name), new DoubleWritable(maxValue));}}hadoop jar finance.jar \-files symbol_name.txt /input /outputDistributed Cache - только для случая “Один из источников данных небольшой по размеру”!Join на стороне Mapper: CompositeInputFormatconf.setInputFormat(CompositeInputFormat.class);String joinStatement = CompositeInputFormat.compose("inner", SequenceFileInputFormat.class, "/input");conf.set("mapreduce.join.expr", joinStatement);Тип значений в Mapper: org.apache.hadoop.mapreduce.lib.join.TupleWritableОсобенности1.