Лекция 11. Введение в Apache Spark (2015 Лекции)
Описание файла
Файл "Лекция 11. Введение в Apache Spark" внутри архива находится в папке "2015 Лекции". PDF-файл из архива "2015 Лекции", который расположен в категории "". Всё это находится в предмете "(смрхиод) современные методы распределенного хранения и обработки данных" из 10 семестр (2 семестр магистратуры), которые можно найти в файловом архиве МГУ им. Ломоносова. Не смотря на прямую связь этого архива с МГУ им. Ломоносова, его также можно найти и в других разделах. .
Просмотр PDF-файла онлайн
Текст из PDF
Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №7. Введение в Apache Spark.http://bigdata.cs.msu.rubigdata@cs.msu.ru●●УзелУзелУзел●Используя принцип локальностиданных параллельно прочитать измассива данных в HDFS иобработать данные в MappersТасовка (сортировка данных поключу, передача по сети наReducers)Параллельная обработкасгрупированных по ключам данныхна Reducers и запись результата вHDFSЧТЕНИЕmapтасовкаreduceзапись●●●●Hadoop обычно ориентирован на решение одной задачи в рамках однойпрограммы MapReduceХотя существуют “императивные” возможности для объединениянескольких MapReduce программ: Job.addDependingJob(Job), JobControl,ControlledJob, ChainMapper, ChainReducerHadoop MapReduce - неинтерактивен: подразумевает написаниепрограммы на Java (либо C++, либо ЯП, используемом в Streaming)Hadoop неэффективен в задачах, где требуется повторное чтениеданных (итеративные алгоритмы - практически любой алгоритммашинного обучения)●●●●●●Интерактивная обработка/анализа данных○ Программа на Spark - декларативное описание ациклического графаобработки набора данных (Программа для Hadoop MapReduce - несколькоунаследованных классов и переопределенных функций)Итеративные алгоритмы в ОЗУВстроенная поддержка Scala, Java, Python, RПоддержка локальности данных в HDFSПоддержка YARNБиблиотеки:○ SQL и DataFrame○ машинное обучение (MLlib)○ обработка событий (Streaming)○ обработка графов (GraphX)○ 150+ third-party проектовhttps://amplab.cs.berkeley.edu/Авторы: Matei Zaharia et al.BlinkDBЛитератураMatei Zaharia at al.
Resilient Distributed Datasets: A Fault-TolerantAbstraction for In-Memory Cluster Computing. NSDI (2012)http://usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdfCourse “Scalable Machine Learning”https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1xSpark Programming Guidehttp://spark.apache.org/docs/latest/programming-guide.htmlMicrosoft Dryadhttp://research.microsoft.com/en-us/projects/dryad/https://github.com/MicrosoftResearch/DryadЗапуск интерактивного интерпретатора SparkScala:bin/spark-shellPython:bin/pyspark(для выбора Python нужной версии используется переменная окруженияPYSPARK_PYTHON)Снижение детализации при журналировании на экранconf/log4j.properties.template→conf/log4j.propertieslog4j.rootCategory=INFO, console→log4j.rootCategory=WARN, consolePySpark shell...15/11/22 15:55:11 INFO MemoryStore: MemoryStore started with capacity 530.0 MB15/11/22 15:55:12 INFO HttpFileServer: HTTP File server directory is C:\Users\sergun\AppData\Local\Temp\spark-770a985e-06a7-45e3-ac57-158808bb5f05\httpd-a0a9181e-68a6-4351-85a2-b2845c03489415/11/22 15:55:12 INFO HttpServer: Starting HTTP Server15/11/22 15:55:12 INFO Utils: Successfully started service 'HTTP file server' onport 53171.15/11/22 15:55:12 INFO SparkEnv: Registering OutputCommitCoordinator15/11/22 15:55:13 INFO Utils: Successfully started service 'SparkUI' on port 4040.15/11/22 15:55:13 INFO SparkUI: Started SparkUI at http://192.168.56.1:404015/11/22 15:55:13 WARN MetricsSystem: Using default name DAGScheduler for sourcebecause spark.app.id is not set.15/11/22 15:55:13 INFO Executor: Starting executor ID driver on host localhost15/11/22 15:55:14 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53190.15/11/22 15:55:14 INFO NettyBlockTransferService: Server created on 5319015/11/22 15:55:14 INFO BlockManagerMaster: Trying to register BlockManager15/11/22 15:55:14 INFO BlockManagerMasterEndpoint: Registering block manager localhost:53190 with 530.0 MB RAM, BlockManagerId(driver, localhost, 53190)15/11/22 15:55:14 INFO BlockManagerMaster: Registered BlockManagerWelcome to______/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//__ / .__/\_,_/_/ /_/\_\ version 1.5.2/_/Using Python version 3.4.2 (v3.4.2:ab2c023a9432, Oct 6 2014 22:16:31)SparkContext available as sc, HiveContext available as sqlContext.>>>Компоненты SparkРабочий узелИсполнительЗадачаЗадачаПрограмма-драйверРабочий узелSparkContextИсполнительЗадачаЗадачаИсполнительЗадачаЗадачаRDD#1Устойчивый распределенный набор данных (Resilient Distributed Dataset, RDD)textFilelineslines=sc.textFile(‘wasb://web@bigdatamsu.blob.core.windows.net/web.tsv’)lines.getNumPartitions()urls=lines.map(lambda l: l.split(‘\t’)[2])from urllib.parse import urlparse # from urlparse import urlparsedomain_count=urls.map(lambda u: (urlparse(u).hostname,1))domain_stat=domain_count.reduceByKey(lambda x,y: x+y,numPartitions=3)top_stat=domain_stat.filter(lambda d:d[1] >=100)top_stat.saveAsTextFile(’top_stat.txt’)Операции с RDD:● создание RDD● преобразования● действия●●●mapurlsmapdomain_countreduceByKeydomain_statfiltertop_statsaveAsTextFileRDD состоит из разделов (partitions), которые могут вычислятьсяпараллельноПреобразования RDD образуют направленный ациклический граф(DAG)Вычисления происходят в момент обращения к действию (отложенныевычисления).RDD#2lines=sc.textFile(‘wasb://web@bigdatamsu.blob.core.windows.net/web.tsv’)urls=lines.map(lambda l: l.split(‘\t’)[2])from urllib.parse import urlparsedomain_count=urls.map(lambda u: (urlparse(u).hostname,1))domain_stat=domain_count.reduceByKey(lambda x,y: x+y,numPartitions=3)top_stat=domain_stat.filter(lambda d:d[1] >=100)top_stat.count()top_stat.saveAsTextFile(‘top_stat.txt’)При обращении к действию граф RDD будетвычисляться повторно!Кеширование RDDlines=sc.textFile(‘wasb://web@bigdatamsu.blob.core.windows.net/web.tsv’)urls=lines.map(lambda l: l.split(‘\t’)[2])from urllib.parse import urlparsedomain_count=urls.map(lambda u: (urlparse(u).hostname,1))domain_stat=domain_count.reduceByKey(lambda x,y: x+y,numPartitions=3)top_stat=domain_stat.filter(lambda d:d[1] >=100)top_stat.persist(MEMORY_ONLY)top_stat.count()top_stat.saveAsTextFile(‘top_stat.txt’)Уровень персистентностиMEMORY_ONLYВ PySpark в любом случае используетсясериализация Python pickleПо умолчанию*.
Хранение разделовRDD в памяти. Не уместившиесяразделы пересчитываются.MEMORY_AND_DISKХранение разделов RDD в памяти. Неуместившиеся разделы сохраняютсяна диск.MEMORY_ONLY_SERАналогично MEMORY_ONLY, ноиспользуется сериализация(сокращение объема, увеличениепроцессорной нагрузки)MEMORY_AND_DISK_SERRDD.persist(level=MEMORY_ONLY)RDD.cache() #MEMORY_ONLYRDD.unpersist()ОписаниеDISK_ONLYMEMORY_ONLY_2,MEMORY_DISK_2 и т.д.OFF_HEAP (experimental)Аналогично MEMORY_AND_DISK, ноиспользуется сериализация(сокращение объема, увеличениепроцессорной нагрузки)Хранение разделов RDD на диске.“Реплицирование” разделов наисполнителях на других уззлахОбщий пул памяти в TachyonУменьшение GCРаздел не теряется при сбое узлаСоотношение скоростей50ГБ/сек1ГБ/сек(10Гб/сек)1x50x500МБ/сек100x100МБ/сек500x0,3ГБ/сек166xзадача 3mapзадача 2Узкая (narrow) зависимость - раздел используетсяпри расчете не более чем одного “дочернего”разделазадача 1●задача 0RDD, стадии, задачиСтадия 0textFilelinesmapfilterurlsmap●Широкая (wide) зависимость - раздел можетбыть использован более чем одним “дочерним”разделомdomain_countreduceByKeyСтадия 1domain_statfilterreduceByKeysaveAsTextFileзадача 2задача 1задача 0Границы стадий:● преобразования с широкими зависимостями(точнее “тасовка” в рамках этих преобразований)● действия● уже кешированные RDDtop_statRDD.toDebugString() - отображаетзависимостиПреобразования одной RDDНазваниеОписаниеПримерRDD.map(action)Возвращает RDD из резуьлтатовприменения action: x→y к каждомуэлементу исходной RDDdomains=urls.map(lambda u: urlparse(u).hostname)RDD.flatMap(action)Применяет action: x→[y1,..,yn] ккаждому элементу исходной RDD,возвращает RDD, состояющую изэлементов всех списков.words=lines.flatMap(lambda l: l.split(‘’))RDD.filter(cond)Возвращает RDD из элементовисходной RDD, удовлетворяющихусловию cond.domains1=domains.filter(lambda d: notd.endswith(‘.рф’))RDD.distinct()Возвращает RDD из уникальныхэлементов исходной RDD.domains.distinct()RDD.sample(withReplacement,fraction, seed=None)Сэмплинг.domains(False, 0.1)Первичное создание RDDНазваниеОписаниеПримерSparkContext.textFile(name,minPartitions=None,use_unicode=True)Чтение данных из файла сиспользованием HadoopTextInputFormat.
Число разделовопределяется числом сплитов, либозадается явно.lines=sc.textFile(r‘c:\bigdata\data\web.tsv’)SparkContext.parallelize(c,numSlices=None)Создание RDD на основе обычнойколлекции Python.rdd=sc.parallelize([1,3,5,7,9,11],2)Преобразования двух RDDНазваниеRDD.union(other)RDD.intersection(other)RDD.subtract(other)RDD.cartesian(other)Действия над RDDНазваниеОписаниеRDD.collect()“Материализация” RDD в Python списокRDD.count()Число элементовRDD.countByValue()Сколько раз встречается каждое значение (возвращает список пар)RDD.take(num)Список из первых num элементовRDD.top(num)Список из num элементов в порядке убывания значенийRDD.takeOrdered(num,ordering)Список из num элементов в порядке убывания значений согласно заданномуотношению порядкаRDD.takeSample(withReplacement,num,seed=None)Аналог sampleRDD.reduce(func)“Свертка” с помощью заданной бинарной функции.
Не сработает для пустой RDD.RDD.fold(zero,func)“Свертка” с помощью заданной бинарной функции и нуляRDD.aggregate(zeroValue,seqOp,combOp)Аналог reduce, но для вычислений агрегатов вроде “среднего”.RDD.foreach(func)Выполнить действие над каждым элементом.AggregatesumCount=nums.aggregate((0,0),lambda acc,value: (acc[0]+value, acc[1]+1),lambda acc1,acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1]))sumCount[0]/sumCount[1]Работа с парами ключ/значениеПреобразования RDD● Агрегирование/группировка по ключу● Соединения (joins) по ключу● Сортировка по ключуАгрегирование/группировка по ключуНазваниеОписаниеRDD.groupByKey(numPartitions=None)Группировка значений по ключу.RDD.reduceByKey(func,numPartitions=None)“Свертка” значений для каждого ключа.RDD.foldByKey(zeroValue, func,numPartitions=None)“Свертка” значений для каждого ключа (с указанием 0 элемента)RDD.aggregateByKey(zeroValue, seqFunc,combFunc, numPartitions=None)Аналог aggregate.RDD.combineByKey(createCombiner,mergeValue, mergeCombiners,numPartitions=None)data = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )sumCount = data.combineByKey(lambda value: (value, 1),lambda x, value: (x[0] + value, x[1] + 1),lambda x, y: (x[0] + y[0], x[1] + y[1]))averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label,value_sum / count))СоединениеНазваниеОписаниеRDD.join(other, numPartitions=None)Внутреннее соединение.RDD.leftOuterJoin(other, numPartitions=None)Левое внешнее соединение.RDD.rightOuterJoin(other, numPartitions=None)Правое внешнее соединение.Производительность операций с парами ключ-значениеdaily_stat...joinedprint(yearly_stat.getNumPartitions())100print(daily_stat.getNumPartitions())5print(yearly_stat.partitioner)Noneprint(daily_stat.partitioner)Noneyearly_statdef mymap(l):domain,freq=l.split(‘\t‘)freq=int(freq)return domain, freqyearly_lines=sc.textFile(‘/yearly_stat.tsv’)yearly_stat=full_lines.map(mymap)# каждый деньdaily_lines=sc.textFile(‘/20151126_stat.tsv’)daily_stat=delta_lines.map(mymap)joined=yearly_stat.join(daily_stat)...shuffleshuffleПроизводительность операций с парами ключ-значениеdaily_stat...joinedyearly_statdef mymap(l):domain,freq=l.split(‘\t‘)freq=int(freq)return domain, freqyearly_lines=sc.textFile(‘/yearly_stat.tsv’)yearly_stat=full_lines.map(mymap) \.partitionBy(50) \.persist()# каждый деньdaily_lines=sc.textFile(‘/20151126_stat.tsv’)daily_stat=delta_lines.map(mymap)joined=yearly_stat.join(daily_stat)......shuffleКакие преобразования порождают атрибут partitioner?Какие преобразования удаляют partitioner из родительской RDD?Какие преобразования наследуют partitioner из родительской RDD?Зачем нужны функции mapValues и flatMapValues, если есть map и flatMap?СортировкаНазваниеОписаниеRDD.sortBy(keyfunc, ascending=True, numPartitions=None)Сортировка RDDRDD.sortByKey(ascending=True, numPartitions=None,keyfunc=None)Сортировка RDD ключ-значение по ключуДополнительные возможности●●аккумуляторышироковещательные переменные (broadcast variables)Stand-alone программыfrom pyspark import SparkConf, SparkContextconf=SparkConf().setMaster(“local”).setAppName(“My App”)conf.set(‘spark.ui.port’,’36000’)sc=SparkContext(conf)Варианты запуска (--master)Пример:bin/spark-submit --master spark://host:7077 --executor-memory 10g my_script.pyОбщий вид:bin/spark-submit [опции] <jar|python файл> [аргументы_командной_строки]Тип URLОписаниеspark://host:portАдрес диспетчера кластера Spark Standalone (порт обычно7077)mesos://host:portАдрес диспетчера кластера Mesos (порт обычно 5050)yarnАдрес ResourceManager YARN берется из конфигурационныхфайлов, находящихся в HADOOP_CONF_DIRlocalЗапуск в локальном режиме на 1 ядреlocal[N]Запуск в локальном режиме на N ядрахlocal[*]Запуск в локальном режиме на всех ядрахПараметры spark-submit / pysparkПараметрОписание--masterВариант запуска (или адрес диспетчера кластера)--deploy-modeГде запускать драйвер: на машине, где запущен spark-submit (client, по умолчанию), на узле в кластере (cluster)--classОсновной класс (для Java-приложений)--nameДружественное имя приложения--jarsЗависимости Java-приложений--filesФайлы для копирования на узлы--py-filesБиблиотеки и модули Python для копирования на узлы и включения в PYTHONPATH--confПроизвольная настройка в формате имя=значение, например, spark.ui.port=36000--properties-fileКонфигурационный файл.
По умолчанию ищется conf/spark-defaults.conf--executor-memoryОбъем памяти каждого исполнителя (По умолчанию 1G)--driver-memoryОбъем памяти драйвера (По умолчанию 1024M)--exector-coresЧисло ядер исполнителя (По умолчанию 1)--driver-coresЧисло ядер драйвера (По умолчанию 1)--num-executorsЧисло исполнителей (По умолчанию 2)Настройки ОЗУNodeManagerExecutor Containerspark.yarn.executor.memoryOverhead (~10%)spark.executor.memoryspark.shuffle.memoryFraction(0.4)spark.storage.memoryFraction(0.6)Домашнее заданиеБудет разослано по e-mail..