Лекция 11. Введение в Apache Spark (1185419)
Текст из файла
Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №7. Введение в Apache Spark.http://bigdata.cs.msu.rubigdata@cs.msu.ru●●УзелУзелУзел●Используя принцип локальностиданных параллельно прочитать измассива данных в HDFS иобработать данные в MappersТасовка (сортировка данных поключу, передача по сети наReducers)Параллельная обработкасгрупированных по ключам данныхна Reducers и запись результата вHDFSЧТЕНИЕmapтасовкаreduceзапись●●●●Hadoop обычно ориентирован на решение одной задачи в рамках однойпрограммы MapReduceХотя существуют “императивные” возможности для объединениянескольких MapReduce программ: Job.addDependingJob(Job), JobControl,ControlledJob, ChainMapper, ChainReducerHadoop MapReduce - неинтерактивен: подразумевает написаниепрограммы на Java (либо C++, либо ЯП, используемом в Streaming)Hadoop неэффективен в задачах, где требуется повторное чтениеданных (итеративные алгоритмы - практически любой алгоритммашинного обучения)●●●●●●Интерактивная обработка/анализа данных○ Программа на Spark - декларативное описание ациклического графаобработки набора данных (Программа для Hadoop MapReduce - несколькоунаследованных классов и переопределенных функций)Итеративные алгоритмы в ОЗУВстроенная поддержка Scala, Java, Python, RПоддержка локальности данных в HDFSПоддержка YARNБиблиотеки:○ SQL и DataFrame○ машинное обучение (MLlib)○ обработка событий (Streaming)○ обработка графов (GraphX)○ 150+ third-party проектовhttps://amplab.cs.berkeley.edu/Авторы: Matei Zaharia et al.BlinkDBЛитератураMatei Zaharia at al.
Resilient Distributed Datasets: A Fault-TolerantAbstraction for In-Memory Cluster Computing. NSDI (2012)http://usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdfCourse “Scalable Machine Learning”https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1xSpark Programming Guidehttp://spark.apache.org/docs/latest/programming-guide.htmlMicrosoft Dryadhttp://research.microsoft.com/en-us/projects/dryad/https://github.com/MicrosoftResearch/DryadЗапуск интерактивного интерпретатора SparkScala:bin/spark-shellPython:bin/pyspark(для выбора Python нужной версии используется переменная окруженияPYSPARK_PYTHON)Снижение детализации при журналировании на экранconf/log4j.properties.template→conf/log4j.propertieslog4j.rootCategory=INFO, console→log4j.rootCategory=WARN, consolePySpark shell...15/11/22 15:55:11 INFO MemoryStore: MemoryStore started with capacity 530.0 MB15/11/22 15:55:12 INFO HttpFileServer: HTTP File server directory is C:\Users\sergun\AppData\Local\Temp\spark-770a985e-06a7-45e3-ac57-158808bb5f05\httpd-a0a9181e-68a6-4351-85a2-b2845c03489415/11/22 15:55:12 INFO HttpServer: Starting HTTP Server15/11/22 15:55:12 INFO Utils: Successfully started service 'HTTP file server' onport 53171.15/11/22 15:55:12 INFO SparkEnv: Registering OutputCommitCoordinator15/11/22 15:55:13 INFO Utils: Successfully started service 'SparkUI' on port 4040.15/11/22 15:55:13 INFO SparkUI: Started SparkUI at http://192.168.56.1:404015/11/22 15:55:13 WARN MetricsSystem: Using default name DAGScheduler for sourcebecause spark.app.id is not set.15/11/22 15:55:13 INFO Executor: Starting executor ID driver on host localhost15/11/22 15:55:14 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53190.15/11/22 15:55:14 INFO NettyBlockTransferService: Server created on 5319015/11/22 15:55:14 INFO BlockManagerMaster: Trying to register BlockManager15/11/22 15:55:14 INFO BlockManagerMasterEndpoint: Registering block manager localhost:53190 with 530.0 MB RAM, BlockManagerId(driver, localhost, 53190)15/11/22 15:55:14 INFO BlockManagerMaster: Registered BlockManagerWelcome to______/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//__ / .__/\_,_/_/ /_/\_\ version 1.5.2/_/Using Python version 3.4.2 (v3.4.2:ab2c023a9432, Oct 6 2014 22:16:31)SparkContext available as sc, HiveContext available as sqlContext.>>>Компоненты SparkРабочий узелИсполнительЗадачаЗадачаПрограмма-драйверРабочий узелSparkContextИсполнительЗадачаЗадачаИсполнительЗадачаЗадачаRDD#1Устойчивый распределенный набор данных (Resilient Distributed Dataset, RDD)textFilelineslines=sc.textFile(‘wasb://web@bigdatamsu.blob.core.windows.net/web.tsv’)lines.getNumPartitions()urls=lines.map(lambda l: l.split(‘\t’)[2])from urllib.parse import urlparse # from urlparse import urlparsedomain_count=urls.map(lambda u: (urlparse(u).hostname,1))domain_stat=domain_count.reduceByKey(lambda x,y: x+y,numPartitions=3)top_stat=domain_stat.filter(lambda d:d[1] >=100)top_stat.saveAsTextFile(’top_stat.txt’)Операции с RDD:● создание RDD● преобразования● действия●●●mapurlsmapdomain_countreduceByKeydomain_statfiltertop_statsaveAsTextFileRDD состоит из разделов (partitions), которые могут вычислятьсяпараллельноПреобразования RDD образуют направленный ациклический граф(DAG)Вычисления происходят в момент обращения к действию (отложенныевычисления).RDD#2lines=sc.textFile(‘wasb://web@bigdatamsu.blob.core.windows.net/web.tsv’)urls=lines.map(lambda l: l.split(‘\t’)[2])from urllib.parse import urlparsedomain_count=urls.map(lambda u: (urlparse(u).hostname,1))domain_stat=domain_count.reduceByKey(lambda x,y: x+y,numPartitions=3)top_stat=domain_stat.filter(lambda d:d[1] >=100)top_stat.count()top_stat.saveAsTextFile(‘top_stat.txt’)При обращении к действию граф RDD будетвычисляться повторно!Кеширование RDDlines=sc.textFile(‘wasb://web@bigdatamsu.blob.core.windows.net/web.tsv’)urls=lines.map(lambda l: l.split(‘\t’)[2])from urllib.parse import urlparsedomain_count=urls.map(lambda u: (urlparse(u).hostname,1))domain_stat=domain_count.reduceByKey(lambda x,y: x+y,numPartitions=3)top_stat=domain_stat.filter(lambda d:d[1] >=100)top_stat.persist(MEMORY_ONLY)top_stat.count()top_stat.saveAsTextFile(‘top_stat.txt’)Уровень персистентностиMEMORY_ONLYВ PySpark в любом случае используетсясериализация Python pickleПо умолчанию*.
Хранение разделовRDD в памяти. Не уместившиесяразделы пересчитываются.MEMORY_AND_DISKХранение разделов RDD в памяти. Неуместившиеся разделы сохраняютсяна диск.MEMORY_ONLY_SERАналогично MEMORY_ONLY, ноиспользуется сериализация(сокращение объема, увеличениепроцессорной нагрузки)MEMORY_AND_DISK_SERRDD.persist(level=MEMORY_ONLY)RDD.cache() #MEMORY_ONLYRDD.unpersist()ОписаниеDISK_ONLYMEMORY_ONLY_2,MEMORY_DISK_2 и т.д.OFF_HEAP (experimental)Аналогично MEMORY_AND_DISK, ноиспользуется сериализация(сокращение объема, увеличениепроцессорной нагрузки)Хранение разделов RDD на диске.“Реплицирование” разделов наисполнителях на других уззлахОбщий пул памяти в TachyonУменьшение GCРаздел не теряется при сбое узлаСоотношение скоростей50ГБ/сек1ГБ/сек(10Гб/сек)1x50x500МБ/сек100x100МБ/сек500x0,3ГБ/сек166xзадача 3mapзадача 2Узкая (narrow) зависимость - раздел используетсяпри расчете не более чем одного “дочернего”разделазадача 1●задача 0RDD, стадии, задачиСтадия 0textFilelinesmapfilterurlsmap●Широкая (wide) зависимость - раздел можетбыть использован более чем одним “дочерним”разделомdomain_countreduceByKeyСтадия 1domain_statfilterreduceByKeysaveAsTextFileзадача 2задача 1задача 0Границы стадий:● преобразования с широкими зависимостями(точнее “тасовка” в рамках этих преобразований)● действия● уже кешированные RDDtop_statRDD.toDebugString() - отображаетзависимостиПреобразования одной RDDНазваниеОписаниеПримерRDD.map(action)Возвращает RDD из резуьлтатовприменения action: x→y к каждомуэлементу исходной RDDdomains=urls.map(lambda u: urlparse(u).hostname)RDD.flatMap(action)Применяет action: x→[y1,..,yn] ккаждому элементу исходной RDD,возвращает RDD, состояющую изэлементов всех списков.words=lines.flatMap(lambda l: l.split(‘’))RDD.filter(cond)Возвращает RDD из элементовисходной RDD, удовлетворяющихусловию cond.domains1=domains.filter(lambda d: notd.endswith(‘.рф’))RDD.distinct()Возвращает RDD из уникальныхэлементов исходной RDD.domains.distinct()RDD.sample(withReplacement,fraction, seed=None)Сэмплинг.domains(False, 0.1)Первичное создание RDDНазваниеОписаниеПримерSparkContext.textFile(name,minPartitions=None,use_unicode=True)Чтение данных из файла сиспользованием HadoopTextInputFormat.
Число разделовопределяется числом сплитов, либозадается явно.lines=sc.textFile(r‘c:\bigdata\data\web.tsv’)SparkContext.parallelize(c,numSlices=None)Создание RDD на основе обычнойколлекции Python.rdd=sc.parallelize([1,3,5,7,9,11],2)Преобразования двух RDDНазваниеRDD.union(other)RDD.intersection(other)RDD.subtract(other)RDD.cartesian(other)Действия над RDDНазваниеОписаниеRDD.collect()“Материализация” RDD в Python списокRDD.count()Число элементовRDD.countByValue()Сколько раз встречается каждое значение (возвращает список пар)RDD.take(num)Список из первых num элементовRDD.top(num)Список из num элементов в порядке убывания значенийRDD.takeOrdered(num,ordering)Список из num элементов в порядке убывания значений согласно заданномуотношению порядкаRDD.takeSample(withReplacement,num,seed=None)Аналог sampleRDD.reduce(func)“Свертка” с помощью заданной бинарной функции.
Не сработает для пустой RDD.RDD.fold(zero,func)“Свертка” с помощью заданной бинарной функции и нуляRDD.aggregate(zeroValue,seqOp,combOp)Аналог reduce, но для вычислений агрегатов вроде “среднего”.RDD.foreach(func)Выполнить действие над каждым элементом.AggregatesumCount=nums.aggregate((0,0),lambda acc,value: (acc[0]+value, acc[1]+1),lambda acc1,acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1]))sumCount[0]/sumCount[1]Работа с парами ключ/значениеПреобразования RDD● Агрегирование/группировка по ключу● Соединения (joins) по ключу● Сортировка по ключуАгрегирование/группировка по ключуНазваниеОписаниеRDD.groupByKey(numPartitions=None)Группировка значений по ключу.RDD.reduceByKey(func,numPartitions=None)“Свертка” значений для каждого ключа.RDD.foldByKey(zeroValue, func,numPartitions=None)“Свертка” значений для каждого ключа (с указанием 0 элемента)RDD.aggregateByKey(zeroValue, seqFunc,combFunc, numPartitions=None)Аналог aggregate.RDD.combineByKey(createCombiner,mergeValue, mergeCombiners,numPartitions=None)data = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )sumCount = data.combineByKey(lambda value: (value, 1),lambda x, value: (x[0] + value, x[1] + 1),lambda x, y: (x[0] + y[0], x[1] + y[1]))averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label,value_sum / count))СоединениеНазваниеОписаниеRDD.join(other, numPartitions=None)Внутреннее соединение.RDD.leftOuterJoin(other, numPartitions=None)Левое внешнее соединение.RDD.rightOuterJoin(other, numPartitions=None)Правое внешнее соединение.Производительность операций с парами ключ-значениеdaily_stat...joinedprint(yearly_stat.getNumPartitions())100print(daily_stat.getNumPartitions())5print(yearly_stat.partitioner)Noneprint(daily_stat.partitioner)Noneyearly_statdef mymap(l):domain,freq=l.split(‘\t‘)freq=int(freq)return domain, freqyearly_lines=sc.textFile(‘/yearly_stat.tsv’)yearly_stat=full_lines.map(mymap)# каждый деньdaily_lines=sc.textFile(‘/20151126_stat.tsv’)daily_stat=delta_lines.map(mymap)joined=yearly_stat.join(daily_stat)...shuffleshuffleПроизводительность операций с парами ключ-значениеdaily_stat...joinedyearly_statdef mymap(l):domain,freq=l.split(‘\t‘)freq=int(freq)return domain, freqyearly_lines=sc.textFile(‘/yearly_stat.tsv’)yearly_stat=full_lines.map(mymap) \.partitionBy(50) \.persist()# каждый деньdaily_lines=sc.textFile(‘/20151126_stat.tsv’)daily_stat=delta_lines.map(mymap)joined=yearly_stat.join(daily_stat)......shuffleКакие преобразования порождают атрибут partitioner?Какие преобразования удаляют partitioner из родительской RDD?Какие преобразования наследуют partitioner из родительской RDD?Зачем нужны функции mapValues и flatMapValues, если есть map и flatMap?СортировкаНазваниеОписаниеRDD.sortBy(keyfunc, ascending=True, numPartitions=None)Сортировка RDDRDD.sortByKey(ascending=True, numPartitions=None,keyfunc=None)Сортировка RDD ключ-значение по ключуДополнительные возможности●●аккумуляторышироковещательные переменные (broadcast variables)Stand-alone программыfrom pyspark import SparkConf, SparkContextconf=SparkConf().setMaster(“local”).setAppName(“My App”)conf.set(‘spark.ui.port’,’36000’)sc=SparkContext(conf)Варианты запуска (--master)Пример:bin/spark-submit --master spark://host:7077 --executor-memory 10g my_script.pyОбщий вид:bin/spark-submit [опции] <jar|python файл> [аргументы_командной_строки]Тип URLОписаниеspark://host:portАдрес диспетчера кластера Spark Standalone (порт обычно7077)mesos://host:portАдрес диспетчера кластера Mesos (порт обычно 5050)yarnАдрес ResourceManager YARN берется из конфигурационныхфайлов, находящихся в HADOOP_CONF_DIRlocalЗапуск в локальном режиме на 1 ядреlocal[N]Запуск в локальном режиме на N ядрахlocal[*]Запуск в локальном режиме на всех ядрахПараметры spark-submit / pysparkПараметрОписание--masterВариант запуска (или адрес диспетчера кластера)--deploy-modeГде запускать драйвер: на машине, где запущен spark-submit (client, по умолчанию), на узле в кластере (cluster)--classОсновной класс (для Java-приложений)--nameДружественное имя приложения--jarsЗависимости Java-приложений--filesФайлы для копирования на узлы--py-filesБиблиотеки и модули Python для копирования на узлы и включения в PYTHONPATH--confПроизвольная настройка в формате имя=значение, например, spark.ui.port=36000--properties-fileКонфигурационный файл.
По умолчанию ищется conf/spark-defaults.conf--executor-memoryОбъем памяти каждого исполнителя (По умолчанию 1G)--driver-memoryОбъем памяти драйвера (По умолчанию 1024M)--exector-coresЧисло ядер исполнителя (По умолчанию 1)--driver-coresЧисло ядер драйвера (По умолчанию 1)--num-executorsЧисло исполнителей (По умолчанию 2)Настройки ОЗУNodeManagerExecutor Containerspark.yarn.executor.memoryOverhead (~10%)spark.executor.memoryspark.shuffle.memoryFraction(0.4)spark.storage.memoryFraction(0.6)Домашнее заданиеБудет разослано по e-mail..
Характеристики
Тип файла PDF
PDF-формат наиболее широко используется для просмотра любого типа файлов на любом устройстве. В него можно сохранить документ, таблицы, презентацию, текст, чертежи, вычисления, графики и всё остальное, что можно показать на экране любого устройства. Именно его лучше всего использовать для печати.
Например, если Вам нужно распечатать чертёж из автокада, Вы сохраните чертёж на флешку, но будет ли автокад в пункте печати? А если будет, то нужная версия с нужными библиотеками? Именно для этого и нужен формат PDF - в нём точно будет показано верно вне зависимости от того, в какой программе создали PDF-файл и есть ли нужная программа для его просмотра.