Лекция 9. Hadoop Streaming_ Joins (1185417)
Текст из файла
Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №5. Hadoop Streaming, Joins.http://bigdata.cs.msu.rubigdata@cs.msu.ruHadoop Streamingcat input.txt | my_mapper | sort -k1,1 | my_reducer > output.txtHadoop Streaming●●●●Реализация Mapper, Reducer, Combiner на любом языке программирования (популярны Unixутилиты, Python)Интерфейс взаимодействия - stdin, stdoutПрограммист ответственен за выделение ключей и значений из входных данныхКлючи и значения - текстовые, нет поддержки (*) бинарных данныхФинансовые данные#SYMBOL,SYSTEM,MOMENT,ID_DEAL,PRICE_DEAL,VOLUME,OPEN_POS,DIRECTIONSVH1,F,20110111100000080,255223067,30.46000,1,8714,SSVH1,F,20110111100000080,255223068,30.38000,1,8714,SSVH1,F,20110111100000080,255223069,30.32000,1,8714,SSVH1,F,20110111100000080,255223070,30.28000,2,8714,SSVH1,F,20110111100000080,255223071,30.25000,1,8714,SSVH1,F,20110111100000080,255223072,30.05000,1,8714,SSVH1,F,20110111100000080,255223073,30.05000,3,8714,SRIH1,F,20110111100000097,255223074,177885.00000,1,291758,BRIH1,F,20110111100000097,255223075,177935.00000,2,291758,BRIH1,F,20110111100000097,255223076,177980.00000,10,291758,BRIH1,F,20110111100000097,255223077,177995.00000,1,291758,BRIH1,F,20110111100000097,255223078,178100.00000,2,291758,BRIH1,F,20110111100000097,255223079,178200.00000,1,291758,BRIH1,F,20110111100000097,255223080,178205.00000,1,291758,BДневные минимумы цены каждого инструмента#SYMBOL,SYSTEM,MOMENT,ID_DEAL,PRICE_DEAL,VOLUME,OPEN_POS,DIRECTIONSVH1,F,20110111100000080,255223067,30.46000,1,8714,SSVH1,F,20110111100000080,255223069,30.32000,1,8714,SRIH1,F,20110111100000097,255223074,177885.00000,1,291758,BRIH1,F,20110111100000097,255223075,177935.00000,2,291758,BSVH1,F,20110111100000080,255223068,30.38000,1,8714,SRIH1,F,20110111100000097,255223076,177980.00000,10,291758,BRIH1,F,20110111100000097,255223077,177995.00000,1,291758,B...mapper.py#!/usr/bin/env pythonimport sysfor line in sys.stdin:if line.startswith('#'):continuesymbol, _, moment, _, price,_ = line.split(',',5)print '{}{}\t{}'.format(symbol, moment[:8],price)Выход mapper.pySVH120110111 30.46000SVH120110111 30.32000RIH120110111 177885.00000RIH120110111 177935.00000SVH120110111 30.38000RIH120110111 177980.00000RIH120110111 177995.00000Дневные минимумы цен каждого инструментаВход reducer.py (отсортирован по ключам)SVH120110111 30.46000SVH120110111 30.32000SVH120110111 30.38000RIH120110111 177885.00000RIH120110111 177935.00000RIH120110111 177980.00000RIH120110111 177995.00000Запускhadoop jar hadoop-streaming-x.x.x.jar \-mapper mapper.py \-reducer reducer.py \-combiner reducer.py \-files mapper.py,reducer.py\-input wasb://financedata@bigdatamsu.blob.core.windows.net/ \-output /minmax \-numReduceTasks=10reducer.py#!/usr/bin/env pythonimport sys(prev_key, min_price) = (None, None)for line in sys.stdin:key, price = line.split('\t')price=float(price)if key!=prev_key:if prev_key!=None:print '{}\t{}'.format(key, min_price)min_price = Noneprev_key=keyif min_price==None or price<min_price:min_price=priceif prev_key != None:print '{}\t{}'.format(key, min_price)Отладкаcat input.txt | mapper.py | sort -k1,1 | reducer.py > output.txtyarn log -applicationId application_1446656107130_0006Значения по умолчаниюhadoop jar hadoop-streaming-*.jar \-inputformat org.apache.hadoop.mapred.TextInputFormat \-mapper /bin/cat \-partitioner org.apache.hadoop.mapred.lib.HashPartitioner \-numReduceTasks 1 \-reducer org.apache.hadoop.mapred.lib.IdentityReducer \-outputformat org.apache.hadoop.mapred.TextOutputFormatTextInputFormat не передает в Mapper ключ (смещения строкотносительно начала файла)!Для других входных форматов того же можно достигнуть с помощьюstream.map.input.ignoreKey=trueУправление разделителями ключей и значенийСвойствоПоумолчаниюОписаниеstream.map.output.field.separator\tОжидаемый разделитель ключей и значений,генерируемых Mapperstream.num.map.output.key.fields1Номер разделителя, после которогоначинается значение в выходе Mapperstream.reduce.output.field.separator\tОжидаемый разделитель ключей и значений,генерируемых Reducerstream.num.reduce.output.key.fields1Номер разделителя, после которогоначинается значение в выходе Reducerstream.map.input.field.separator\tРазделитель ключей и значений на входеMapperstream.reduce.input.field.separator\tРазделитель ключей и значений на входеReducerУправление разделителями ключей и значенийKeyFieldBasedPartitionermapper_part.py#!/usr/bin/env pythonimport sysfor line in sys.stdin:if line.startswith('#'):continuesymbol, _, moment, _, price,_ = line.split(',',5)print '{}.{}.{}'.format(symbol, moment[:8],price)Запускhadoop jar hadoop-streaming-x.x.x.jar \-D stream.map.output.field.separator=.
\-D stream.num.map.output.key.fields=2 \-D mapreduce.map.output.key.field.separator=. \-D mapreduce.partition.keypartitioner.options=-k1,1 \-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner-mapper mapper_part.py \-reducer reducer.py \-combiner reducer.py \-files mapper_part.py,reducer.py\-input wasb://financedata@bigdatamsu.blob.core.windows.net/ \-output /minmax \-numReduceTasks=10В результате для partitioning будет использовано название инструмента.KeyFieldBasedComparatorЗапускhadoop jar hadoop-streaming-x.x.x.jar \-D stream.map.output.field.separator=.
\-D stream.num.map.output.key.fields=2 \-D mapreduce.map.output.key.field.separator=. \-D mapreduce.partition.keycomparator.options=’-k1,1 -k2,2nr’ \-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \-mapper mapper_part.py \-reducer reducer.py \-combiner reducer.py \-files mapper_part.py,reducer.py\-input wasb://financedata@bigdatamsu.blob.core.windows.net/ \-output /minmax \-numReduceTasks=10Получаем сортировку по инструменту в прямом и по датам в обратномпорядке.AggregateDoubleValueSumLongValueMaxLongValueMinLongValueSumStringValueMaxStringValueMinUniqValueCountValueHistogramВыход Mapper<имя функции>:<ключ>\t<значение>Пример “Дневной объем торгов” (значениеDEAL_PRICE * VOLUME)DoubleValueSum:RIH120110111\t355870.0DoubleValueSum:RIH120110111\t355885.0...AggregateЗапускhadoop jar hadoop-streaming-x.x.x.jar \-mapper mapper_aggr.py \-reducer aggregate \-combiner aggregate \-files mapper_aggr.py \-input wasb://financedata@bigdatamsu.blob.core.windows.net/ \-output /minmax \-numReduceTasks=10ValueHistogramФормирует статистику по строковым значениям:● кол-во уникальных значений● min● median● max● avg● stdValueHistogram:<ключ>\t<значение>\t<количество>илиValueHistogram:<ключ>\t<значение>(количество=1)JoinsJoinsФайл SVH1MOMENT201101111000201101111005201101111010201101111015Файл RIH1CLOSEMOMENTCLOSE20110111100517100201101111010173002011011110151720030.130.530.630.2...INNERJOIN...MOMENTCLOSE_SVH1CLOSE_RIH120110111100530.51710020110111101030.61730020110111101530.217200...Joins●●●на стороне Mapperна стороне Reducerс использованием DistributedCacheJoin с помощью DistributedCachestatic class Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {private SymbolMetadata metadata;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {metadata = new SymbolMetadata();metadata.initialize(new File("symbol_names.txt"));}@Overrideprotected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {String name = metadata.getFullName(key.toString());int maxValue = Double.MIN_VALUE;for (DoubleWritable value : values) {maxValue = Math.max(maxValue, value.get());}context.write(new Text(name), new DoubleWritable(maxValue));}}hadoop jar finance.jar \-files symbol_name.txt /input /outputDistributed Cache - только для случая “Один из источников данных небольшой по размеру”!Join на стороне Mapper: CompositeInputFormatconf.setInputFormat(CompositeInputFormat.class);String joinStatement = CompositeInputFormat.compose("inner", SequenceFileInputFormat.class, "/input");conf.set("mapreduce.join.expr", joinStatement);Тип значений в Mapper: org.apache.hadoop.mapreduce.lib.join.TupleWritableОсобенности1.
Характеристики
Тип файла PDF
PDF-формат наиболее широко используется для просмотра любого типа файлов на любом устройстве. В него можно сохранить документ, таблицы, презентацию, текст, чертежи, вычисления, графики и всё остальное, что можно показать на экране любого устройства. Именно его лучше всего использовать для печати.
Например, если Вам нужно распечатать чертёж из автокада, Вы сохраните чертёж на флешку, но будет ли автокад в пункте печати? А если будет, то нужная версия с нужными библиотеками? Именно для этого и нужен формат PDF - в нём точно будет показано верно вне зависимости от того, в какой программе создали PDF-файл и есть ли нужная программа для его просмотра.