Главная » Все файлы » Просмотр файлов из архивов » PDF-файлы » Лекция 12. Дополнительные возможности Apache Spark

Лекция 12. Дополнительные возможности Apache Spark (2015 Лекции)

PDF-файл Лекция 12. Дополнительные возможности Apache Spark (2015 Лекции) (СМРХиОД) Современные методы распределенного хранения и обработки данных (63229): Лекции - 10 семестр (2 семестр магистратуры)Лекция 12. Дополнительные возможности Apache Spark (2015 Лекции) - PDF (63229) - СтудИзба2020-08-25СтудИзба

Описание файла

Файл "Лекция 12. Дополнительные возможности Apache Spark" внутри архива находится в папке "2015 Лекции". PDF-файл из архива "2015 Лекции", который расположен в категории "". Всё это находится в предмете "(смрхиод) современные методы распределенного хранения и обработки данных" из 10 семестр (2 семестр магистратуры), которые можно найти в файловом архиве МГУ им. Ломоносова. Не смотря на прямую связь этого архива с МГУ им. Ломоносова, его также можно найти и в других разделах. .

Просмотр PDF-файла онлайн

Текст из PDF

Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №8. Дополнительные возможностиApache Spark.http://bigdata.cs.msu.rubigdata@cs.msu.ruRDD и разделыx=sc.parallelize([(1,1),(3,1),(5,1),(7,1),(9,1)], numSlices=3)node4node6node5z=x.join(y)shufflenode4hash(key)%4==0node2node3node1hash(key)%4==1 hash(key)%4==2hash(key)%4==3y=sc.parallelize([(1,1),(3,1),(5,1),(7,1),(9,1)]).partitionBy(4)partitionery1=y.map(lambda pair: (pair[1], pair[0]))partitionery2=y.mapValues(lambda value: value+1)RDDDirected acyclic graph (DAG)● stages○tasksLazy evaluationRDD.persist(MEMORY_ONLY)ПланБиблиотеки Spark● SQL и DataFrame● Spark Streaming● MLlib● GraphX● SparkR(*)Сегодня● SQL и DataFrame● векторные типы данныхDataFrameТабличная структура данных, упрощающая предобработку данных встатистических пакетах.●●●По структуре и операциям DataFrame похож на таблицу в реляционнойБД.Отличается от таблицы в реляционной БД способом обработки данных(стат.запросов и манипуляции): вместо SQL используются конструкцииЯП.“Классические” реализации (хранение в ОЗУ):○○●Pandas DataFrame (Python)R DataFrame“Альтернативные” реализации○SAS datasetКонтекстыsc=SparkContext()sqlContext=HiveContext(sc)SQLContextHiveContextПример создания DataFrame52638252638252638220141025 http://www.1tv.ru/pvoice/66112,6612120141025 http://www.audiopoisk.com/artist/vika-i-gruppa-magadan/ 60316,6036320141025 http://zvukoff.ru/song/26322660665from pyspark.sql import SQLContext, RowsqlContext=SQLContext(sc)lines = sc.textFile("c:\\bigdata\\data\\web2.tsv")parts = lines.map(lambda l: l.split("\t"))log_entries = parts.map(lambda p: Row(user_id=p[0], dt=int(p[1]), url=p[2], times=p[3]))log_entries.count() # hacklog = sqlContext.createDataFrame(log_entries) # samplingRatio=NoneПрогресс[Stage 8:>(0 + 4) / 5][Stage 8:>(0 + 5) / 5][Stage 8:===========>(1 + 4) / 5][Stage 8:=======================>(2 + 3) / 5][Stage 8:===================================>(3 + 2) / 5][Stage 8:===============================================>(4 + 1) / 5][Stage 9:>(0 + 0) / 200][Stage 9:>(0 + 4) / 200][Stage 9:>(1 + 7) / 200][Stage 9:==>(10 + 4) / 200][Stage 9:==>(10 + 7) / 200][Stage 9:====>(16 + 4) / 200][Stage 9:======>(23 + 4) / 200][Stage 9:===========>(42 + 4) / 200][Stage 9:===============>(56 + 4) / 200][Stage 9:=================>(63 + 4) / 200]DataFrame>>> type(log)<class 'pyspark.sql.dataframe.DataFrame'>>>> log.columns['dt', 'times', 'url', 'user_id']>>> log.dtypes[('dt', 'bigint'), ('times', 'string'), ('url', 'string'), ('user_id','string')]>>> log.describe().show()+-------+-----------------+|summary|dt|+-------+-----------------+| count|1000000||mean|2.0141045538141E7|| stddev| 39.0280347955159||min|20141005||max|20141130|+-------+-----------------+DataFrame: фильтрация, UDF>>> log.count()100000>>> from pyspark.sql.functions import UserDefinedFunction>>> from pyspark.sql.types import StringType>>> from urllib.parse import urlparse>>> get_domain=UserDefinedFunction(lambda url: urlparse(url).hostname,StringType())>>> log2=log.withColumn('domain', get_domain(log.url))>>> log2[['domain']].distinct().count()65663>>> log2[(log2.dt>=20141101)&(log2.dt<20141201)][['domain']].distinct().count()23316>>> log2.filter((log2.dt>=20141101)&(log2.dt<20141201))[['domain']].distinct().count()23316DataFrame: Агрегация>>> from pyspark.sql.types import IntegerType>>> get_month=UserDefinedFunction(lambda dt: dt//100, IntegerType())>>> log2=log2.withColumn('month', get_month(log2.dt))>>> import pyspark.sql.functions as F>>> log2.groupby('month').agg(F.countDistinct('domain')).show()+------+----------------------+| month|COUNT(DISTINCT domain)|+------+----------------------+|201410|53496||201411|23316|+------+----------------------+DataFrame: select>>> log[['user_id','dt']].show(2)+-------+--------+|user_id|dt|+-------+--------+|0|20141017||0|20141009|+-------+--------+>>> log.select('user_id','dt').show(3) # the same>>> log.select(log.user_id, log.dt).show(3) # the same>>> log.select(log.user_id, get_month(log.dt).alias('m')).show(3)+-------+------+|user_id|x|+-------+------+|0|201410||0|201410|+-------+------+DataFrame: Кеширование>>> log2.cache()>>> log2.groupby('month').agg(F.countDistinct('domain')).show()DataFrame: Сохранение / загрузка>>>>>>>>>>>>>>>>>>>>>5>>>log2.write.save('res_json', format='json')log2.write.json('res_json')log2.write.partitionby('domain').json('res_json', mode='overwrite')log2.write.parquet('res_parquet')log2.write.mode('append').parquet('res_parquet')log2=sqlContext.read.load('res_json2', format='json')log2.rdd.getNumPartitions()log2.rdd.partitionerDataFrame: другие разновидности операций●●●●●●замена najoinзамена значенийorderBysamplingtoPandas()SQLSpark SQL>>> log2.registerTempTable('log2')>>> sqlContext.sql('select count(*) from log2')DataFrame[_c0: bigint]>>> sqlContext.sql('select count(*) from log2').collect()[Row(_c0=1000000)]>>> sqlContext.sql('select * from log2 where domain like \'%lenta.ru\' order by dt desc limit 3').collect()Row(domain='lenta.ru', dt=20141129, month=201411,times='84755', url='http://lenta.ru/news/2013/11/29/love/',user_id='1235907'), Row(domain='lenta.ru', dt=20141129,month=201411, times='74491', url='http://lenta.ru/news/2013/11/29/love/', user_id='1724463'), Row(domain='lenta.ru', dt=20141129, month=201411, times='73832',url='http://lenta.ru/news/2013/11/29/block/',user_id='1724463')]Spark SQL>>> sqlContext.sql('select user_id, domain, month, count(domain) over (partition by month, domain), count(domain) over(partition by month, domain, user_id) from log2').show(3)+-------+--------------------+------+---+---+|user_id|domain| month|_c3|_c4|+-------+--------------------+------+---+---+| 385625|10.0.110.245|201410| 1| 1|| 447272|1119997.1001golos.ru|201410| 1| 1|| 447272|1125714.1001golos.ru|201410| 1| 1|+-------+--------------------+------+---+---+>>>Типы для линейной алгебрыDenseVector>>> from pyspark.mllib.linalg import Vectors>>> v = Vectors.dense([1.0, 0., 2.0, 0.])>>> w = Vectors.dense([3.0, 4.0, 0., 0.])>>> v + wDenseVector([4.0, 4.0, 2.0, 0.0])>>> 2 - vDenseVector([1.0, 2.0, 0.0, 2.0])>>> v * wDenseVector([3.0, 0.0, 0.0, 0.0])>>> v.norm(2)2.2360679774997898>>> v.squared_distance(v)0.0>>> v.squared_distance(w)24.0>>> v[0]1.0>>> v[0]=2Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'DenseVector' object does not support item assignmentSparseVector>>> vv=SparseVector(4, [(0, 1.), (3, 10.)])>>> ww=SparseVector(4, [(0, 20.), (2, 100.)])>>> vv + wwTraceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: unsupported operand type(s) for +: 'SparseVector' and 'SparseVector'>>> vv.dot(ww)>>> 20.0>>> vv.norm(2)10.04987562112089>>> vv.squared_distance(vv)0.0>>> vv.squared_distance(w)10461.0>>> vv.dot(v)1.0>>> vv[1]0.0DenseMatrix>>> from pyspark.mllib.linalg import Matrices>>> m = Matrices.dense(numRows=2, numCols=4, [1,2,3,4,5,6,7,8])>>> print(m)DenseMatrix([[ 1., 3., 5., 7.],[ 2., 4., 6., 8.]])>>> m[0,1]>>> 3SparseMatrix>>> mm=SparseMatrix(numRows=2, numCols=2, colPtrs=[0, 2, 3], rowIndices=[0, 1,1], values=[2, 3, 4])>>> print(mm)2 X 2 CSRMatrix(0,0) 2.0(1,0) 3.0(1,1) 4.0>>> [[2, 0, 0], \...

[3, 4, 0], \... [0, 0, 0]]>>> mm[1,0]>>> 3.0# for column i#rowIndices[colPtrs[i]:colPtrs[i+1]]#values[colPtrs[i]:colPtrs[i+1]]CSC-формат (Compressed Sparse Column matrix)RowMatrix>>> from pyspark.mllib.linalg.distributed import *>>> rdd=sc.parallelize([[1,2,3],[4,5,6]])>>> rm=RowMatrix(rows=rdd, numRows=0, numCols=0)>>> rm.numRows<bound method RowMatrix.numRows of <pyspark.mllib.linalg.distributed.RowMatrix object at 0x0000000005295048>>>>> rm.numRows()[Stage 30:>(0 + 0) / 4]...2>>> rm.rowsMapPartitionsRDD[187] at mapPartitions at PythonMLLibAPI.scala:1480>>> rm.rows.first()[Stage 32:>(0 + 0) / 3]...DenseVector([1.0, 2.0, 3.0])>>> rm[0,0]Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'RowMatrix' object is not subscriptableRDD из DenseVectorIndexedRowMatrix>>> rdd2=sc.parallelize([IndexedRow(0,[1,2,3]),IndexedRow(1,[4,5,6])])>>> irm=IndexedRowMatrix(rows=rdd2)>>> irm.rows.first()[Stage 42:>(0 + 0) / 3]...IndexedRow(0, [1.0,2.0,3.0])CoordinateMatrix>>> rdd3=sc.parallelize([MatrixEntry(0,0, 2.0),MatrixEntry(99,99,100.)])>>> cm=CoordinateMatrix(entries=rdd3)>>> cm.entries.first()>>> cm.entries.first()[Stage 46:>(0 + 0) / 3]...MatrixEntry(0, 0, 2.0)>>> cm[0,0]Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'CoordinateMatrix' object is not subscriptable>>> cm.numCols()[Stage 47:>(0 + 0) / 4]...100BlockMatrixblocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])bm = BlockMatrix(blocks, 3, 2)>>> bm.blocks.first()[Stage 52:>(0 + 3) / 3]...((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))Методы конвертации к другим типам распределенных матриц:● toLocalMatrix()● toIndexedRowMatrix()● toCoordinateMatrix()PySpark в JupyterСпасибо за внимание!.

Свежие статьи
Популярно сейчас
Как Вы думаете, сколько людей до Вас делали точно такое же задание? 99% студентов выполняют точно такие же задания, как и их предшественники год назад. Найдите нужный учебный материал на СтудИзбе!
Ответы на популярные вопросы
Да! Наши авторы собирают и выкладывают те работы, которые сдаются в Вашем учебном заведении ежегодно и уже проверены преподавателями.
Да! У нас любой человек может выложить любую учебную работу и зарабатывать на её продажах! Но каждый учебный материал публикуется только после тщательной проверки администрацией.
Вернём деньги! А если быть более точными, то автору даётся немного времени на исправление, а если не исправит или выйдет время, то вернём деньги в полном объёме!
Да! На равне с готовыми студенческими работами у нас продаются услуги. Цены на услуги видны сразу, то есть Вам нужно только указать параметры и сразу можно оплачивать.
Отзывы студентов
Ставлю 10/10
Все нравится, очень удобный сайт, помогает в учебе. Кроме этого, можно заработать самому, выставляя готовые учебные материалы на продажу здесь. Рейтинги и отзывы на преподавателей очень помогают сориентироваться в начале нового семестра. Спасибо за такую функцию. Ставлю максимальную оценку.
Лучшая платформа для успешной сдачи сессии
Познакомился со СтудИзбой благодаря своему другу, очень нравится интерфейс, количество доступных файлов, цена, в общем, все прекрасно. Даже сам продаю какие-то свои работы.
Студизба ван лав ❤
Очень офигенный сайт для студентов. Много полезных учебных материалов. Пользуюсь студизбой с октября 2021 года. Серьёзных нареканий нет. Хотелось бы, что бы ввели подписочную модель и сделали материалы дешевле 300 рублей в рамках подписки бесплатными.
Отличный сайт
Лично меня всё устраивает - и покупка, и продажа; и цены, и возможность предпросмотра куска файла, и обилие бесплатных файлов (в подборках по авторам, читай, ВУЗам и факультетам). Есть определённые баги, но всё решаемо, да и администраторы реагируют в течение суток.
Маленький отзыв о большом помощнике!
Студизба спасает в те моменты, когда сроки горят, а работ накопилось достаточно. Довольно удобный сайт с простой навигацией и огромным количеством материалов.
Студ. Изба как крупнейший сборник работ для студентов
Тут дофига бывает всего полезного. Печально, что бывают предметы по которым даже одного бесплатного решения нет, но это скорее вопрос к студентам. В остальном всё здорово.
Спасательный островок
Если уже не успеваешь разобраться или застрял на каком-то задание поможет тебе быстро и недорого решить твою проблему.
Всё и так отлично
Всё очень удобно. Особенно круто, что есть система бонусов и можно выводить остатки денег. Очень много качественных бесплатных файлов.
Отзыв о системе "Студизба"
Отличная платформа для распространения работ, востребованных студентами. Хорошо налаженная и качественная работа сайта, огромная база заданий и аудитория.
Отличный помощник
Отличный сайт с кучей полезных файлов, позволяющий найти много методичек / учебников / отзывов о вузах и преподователях.
Отлично помогает студентам в любой момент для решения трудных и незамедлительных задач
Хотелось бы больше конкретной информации о преподавателях. А так в принципе хороший сайт, всегда им пользуюсь и ни разу не было желания прекратить. Хороший сайт для помощи студентам, удобный и приятный интерфейс. Из недостатков можно выделить только отсутствия небольшого количества файлов.
Спасибо за шикарный сайт
Великолепный сайт на котором студент за не большие деньги может найти помощь с дз, проектами курсовыми, лабораторными, а также узнать отзывы на преподавателей и бесплатно скачать пособия.
Популярные преподаватели
Добавляйте материалы
и зарабатывайте!
Продажи идут автоматически
5137
Авторов
на СтудИзбе
440
Средний доход
с одного платного файла
Обучение Подробнее