Главная » Просмотр файлов » Лекция 12. Дополнительные возможности Apache Spark

Лекция 12. Дополнительные возможности Apache Spark (1185420)

Файл №1185420 Лекция 12. Дополнительные возможности Apache Spark (2015 Лекции)Лекция 12. Дополнительные возможности Apache Spark (1185420)2020-08-25СтудИзба
Просмтор этого файла доступен только зарегистрированным пользователям. Но у нас супер быстрая регистрация: достаточно только электронной почты!

Текст из файла

Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №8. Дополнительные возможностиApache Spark.http://bigdata.cs.msu.rubigdata@cs.msu.ruRDD и разделыx=sc.parallelize([(1,1),(3,1),(5,1),(7,1),(9,1)], numSlices=3)node4node6node5z=x.join(y)shufflenode4hash(key)%4==0node2node3node1hash(key)%4==1 hash(key)%4==2hash(key)%4==3y=sc.parallelize([(1,1),(3,1),(5,1),(7,1),(9,1)]).partitionBy(4)partitionery1=y.map(lambda pair: (pair[1], pair[0]))partitionery2=y.mapValues(lambda value: value+1)RDDDirected acyclic graph (DAG)● stages○tasksLazy evaluationRDD.persist(MEMORY_ONLY)ПланБиблиотеки Spark● SQL и DataFrame● Spark Streaming● MLlib● GraphX● SparkR(*)Сегодня● SQL и DataFrame● векторные типы данныхDataFrameТабличная структура данных, упрощающая предобработку данных встатистических пакетах.●●●По структуре и операциям DataFrame похож на таблицу в реляционнойБД.Отличается от таблицы в реляционной БД способом обработки данных(стат.запросов и манипуляции): вместо SQL используются конструкцииЯП.“Классические” реализации (хранение в ОЗУ):○○●Pandas DataFrame (Python)R DataFrame“Альтернативные” реализации○SAS datasetКонтекстыsc=SparkContext()sqlContext=HiveContext(sc)SQLContextHiveContextПример создания DataFrame52638252638252638220141025 http://www.1tv.ru/pvoice/66112,6612120141025 http://www.audiopoisk.com/artist/vika-i-gruppa-magadan/ 60316,6036320141025 http://zvukoff.ru/song/26322660665from pyspark.sql import SQLContext, RowsqlContext=SQLContext(sc)lines = sc.textFile("c:\\bigdata\\data\\web2.tsv")parts = lines.map(lambda l: l.split("\t"))log_entries = parts.map(lambda p: Row(user_id=p[0], dt=int(p[1]), url=p[2], times=p[3]))log_entries.count() # hacklog = sqlContext.createDataFrame(log_entries) # samplingRatio=NoneПрогресс[Stage 8:>(0 + 4) / 5][Stage 8:>(0 + 5) / 5][Stage 8:===========>(1 + 4) / 5][Stage 8:=======================>(2 + 3) / 5][Stage 8:===================================>(3 + 2) / 5][Stage 8:===============================================>(4 + 1) / 5][Stage 9:>(0 + 0) / 200][Stage 9:>(0 + 4) / 200][Stage 9:>(1 + 7) / 200][Stage 9:==>(10 + 4) / 200][Stage 9:==>(10 + 7) / 200][Stage 9:====>(16 + 4) / 200][Stage 9:======>(23 + 4) / 200][Stage 9:===========>(42 + 4) / 200][Stage 9:===============>(56 + 4) / 200][Stage 9:=================>(63 + 4) / 200]DataFrame>>> type(log)<class 'pyspark.sql.dataframe.DataFrame'>>>> log.columns['dt', 'times', 'url', 'user_id']>>> log.dtypes[('dt', 'bigint'), ('times', 'string'), ('url', 'string'), ('user_id','string')]>>> log.describe().show()+-------+-----------------+|summary|dt|+-------+-----------------+| count|1000000||mean|2.0141045538141E7|| stddev| 39.0280347955159||min|20141005||max|20141130|+-------+-----------------+DataFrame: фильтрация, UDF>>> log.count()100000>>> from pyspark.sql.functions import UserDefinedFunction>>> from pyspark.sql.types import StringType>>> from urllib.parse import urlparse>>> get_domain=UserDefinedFunction(lambda url: urlparse(url).hostname,StringType())>>> log2=log.withColumn('domain', get_domain(log.url))>>> log2[['domain']].distinct().count()65663>>> log2[(log2.dt>=20141101)&(log2.dt<20141201)][['domain']].distinct().count()23316>>> log2.filter((log2.dt>=20141101)&(log2.dt<20141201))[['domain']].distinct().count()23316DataFrame: Агрегация>>> from pyspark.sql.types import IntegerType>>> get_month=UserDefinedFunction(lambda dt: dt//100, IntegerType())>>> log2=log2.withColumn('month', get_month(log2.dt))>>> import pyspark.sql.functions as F>>> log2.groupby('month').agg(F.countDistinct('domain')).show()+------+----------------------+| month|COUNT(DISTINCT domain)|+------+----------------------+|201410|53496||201411|23316|+------+----------------------+DataFrame: select>>> log[['user_id','dt']].show(2)+-------+--------+|user_id|dt|+-------+--------+|0|20141017||0|20141009|+-------+--------+>>> log.select('user_id','dt').show(3) # the same>>> log.select(log.user_id, log.dt).show(3) # the same>>> log.select(log.user_id, get_month(log.dt).alias('m')).show(3)+-------+------+|user_id|x|+-------+------+|0|201410||0|201410|+-------+------+DataFrame: Кеширование>>> log2.cache()>>> log2.groupby('month').agg(F.countDistinct('domain')).show()DataFrame: Сохранение / загрузка>>>>>>>>>>>>>>>>>>>>>5>>>log2.write.save('res_json', format='json')log2.write.json('res_json')log2.write.partitionby('domain').json('res_json', mode='overwrite')log2.write.parquet('res_parquet')log2.write.mode('append').parquet('res_parquet')log2=sqlContext.read.load('res_json2', format='json')log2.rdd.getNumPartitions()log2.rdd.partitionerDataFrame: другие разновидности операций●●●●●●замена najoinзамена значенийorderBysamplingtoPandas()SQLSpark SQL>>> log2.registerTempTable('log2')>>> sqlContext.sql('select count(*) from log2')DataFrame[_c0: bigint]>>> sqlContext.sql('select count(*) from log2').collect()[Row(_c0=1000000)]>>> sqlContext.sql('select * from log2 where domain like \'%lenta.ru\' order by dt desc limit 3').collect()Row(domain='lenta.ru', dt=20141129, month=201411,times='84755', url='http://lenta.ru/news/2013/11/29/love/',user_id='1235907'), Row(domain='lenta.ru', dt=20141129,month=201411, times='74491', url='http://lenta.ru/news/2013/11/29/love/', user_id='1724463'), Row(domain='lenta.ru', dt=20141129, month=201411, times='73832',url='http://lenta.ru/news/2013/11/29/block/',user_id='1724463')]Spark SQL>>> sqlContext.sql('select user_id, domain, month, count(domain) over (partition by month, domain), count(domain) over(partition by month, domain, user_id) from log2').show(3)+-------+--------------------+------+---+---+|user_id|domain| month|_c3|_c4|+-------+--------------------+------+---+---+| 385625|10.0.110.245|201410| 1| 1|| 447272|1119997.1001golos.ru|201410| 1| 1|| 447272|1125714.1001golos.ru|201410| 1| 1|+-------+--------------------+------+---+---+>>>Типы для линейной алгебрыDenseVector>>> from pyspark.mllib.linalg import Vectors>>> v = Vectors.dense([1.0, 0., 2.0, 0.])>>> w = Vectors.dense([3.0, 4.0, 0., 0.])>>> v + wDenseVector([4.0, 4.0, 2.0, 0.0])>>> 2 - vDenseVector([1.0, 2.0, 0.0, 2.0])>>> v * wDenseVector([3.0, 0.0, 0.0, 0.0])>>> v.norm(2)2.2360679774997898>>> v.squared_distance(v)0.0>>> v.squared_distance(w)24.0>>> v[0]1.0>>> v[0]=2Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'DenseVector' object does not support item assignmentSparseVector>>> vv=SparseVector(4, [(0, 1.), (3, 10.)])>>> ww=SparseVector(4, [(0, 20.), (2, 100.)])>>> vv + wwTraceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: unsupported operand type(s) for +: 'SparseVector' and 'SparseVector'>>> vv.dot(ww)>>> 20.0>>> vv.norm(2)10.04987562112089>>> vv.squared_distance(vv)0.0>>> vv.squared_distance(w)10461.0>>> vv.dot(v)1.0>>> vv[1]0.0DenseMatrix>>> from pyspark.mllib.linalg import Matrices>>> m = Matrices.dense(numRows=2, numCols=4, [1,2,3,4,5,6,7,8])>>> print(m)DenseMatrix([[ 1., 3., 5., 7.],[ 2., 4., 6., 8.]])>>> m[0,1]>>> 3SparseMatrix>>> mm=SparseMatrix(numRows=2, numCols=2, colPtrs=[0, 2, 3], rowIndices=[0, 1,1], values=[2, 3, 4])>>> print(mm)2 X 2 CSRMatrix(0,0) 2.0(1,0) 3.0(1,1) 4.0>>> [[2, 0, 0], \...

[3, 4, 0], \... [0, 0, 0]]>>> mm[1,0]>>> 3.0# for column i#rowIndices[colPtrs[i]:colPtrs[i+1]]#values[colPtrs[i]:colPtrs[i+1]]CSC-формат (Compressed Sparse Column matrix)RowMatrix>>> from pyspark.mllib.linalg.distributed import *>>> rdd=sc.parallelize([[1,2,3],[4,5,6]])>>> rm=RowMatrix(rows=rdd, numRows=0, numCols=0)>>> rm.numRows<bound method RowMatrix.numRows of <pyspark.mllib.linalg.distributed.RowMatrix object at 0x0000000005295048>>>>> rm.numRows()[Stage 30:>(0 + 0) / 4]...2>>> rm.rowsMapPartitionsRDD[187] at mapPartitions at PythonMLLibAPI.scala:1480>>> rm.rows.first()[Stage 32:>(0 + 0) / 3]...DenseVector([1.0, 2.0, 3.0])>>> rm[0,0]Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'RowMatrix' object is not subscriptableRDD из DenseVectorIndexedRowMatrix>>> rdd2=sc.parallelize([IndexedRow(0,[1,2,3]),IndexedRow(1,[4,5,6])])>>> irm=IndexedRowMatrix(rows=rdd2)>>> irm.rows.first()[Stage 42:>(0 + 0) / 3]...IndexedRow(0, [1.0,2.0,3.0])CoordinateMatrix>>> rdd3=sc.parallelize([MatrixEntry(0,0, 2.0),MatrixEntry(99,99,100.)])>>> cm=CoordinateMatrix(entries=rdd3)>>> cm.entries.first()>>> cm.entries.first()[Stage 46:>(0 + 0) / 3]...MatrixEntry(0, 0, 2.0)>>> cm[0,0]Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'CoordinateMatrix' object is not subscriptable>>> cm.numCols()[Stage 47:>(0 + 0) / 4]...100BlockMatrixblocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])bm = BlockMatrix(blocks, 3, 2)>>> bm.blocks.first()[Stage 52:>(0 + 3) / 3]...((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))Методы конвертации к другим типам распределенных матриц:● toLocalMatrix()● toIndexedRowMatrix()● toCoordinateMatrix()PySpark в JupyterСпасибо за внимание!.

Характеристики

Тип файла
PDF-файл
Размер
248,57 Kb
Материал
Тип материала
Высшее учебное заведение

Тип файла PDF

PDF-формат наиболее широко используется для просмотра любого типа файлов на любом устройстве. В него можно сохранить документ, таблицы, презентацию, текст, чертежи, вычисления, графики и всё остальное, что можно показать на экране любого устройства. Именно его лучше всего использовать для печати.

Например, если Вам нужно распечатать чертёж из автокада, Вы сохраните чертёж на флешку, но будет ли автокад в пункте печати? А если будет, то нужная версия с нужными библиотеками? Именно для этого и нужен формат PDF - в нём точно будет показано верно вне зависимости от того, в какой программе создали PDF-файл и есть ли нужная программа для его просмотра.

Список файлов лекций

Свежие статьи
Популярно сейчас
Зачем заказывать выполнение своего задания, если оно уже было выполнено много много раз? Его можно просто купить или даже скачать бесплатно на СтудИзбе. Найдите нужный учебный материал у нас!
Ответы на популярные вопросы
Да! Наши авторы собирают и выкладывают те работы, которые сдаются в Вашем учебном заведении ежегодно и уже проверены преподавателями.
Да! У нас любой человек может выложить любую учебную работу и зарабатывать на её продажах! Но каждый учебный материал публикуется только после тщательной проверки администрацией.
Вернём деньги! А если быть более точными, то автору даётся немного времени на исправление, а если не исправит или выйдет время, то вернём деньги в полном объёме!
Да! На равне с готовыми студенческими работами у нас продаются услуги. Цены на услуги видны сразу, то есть Вам нужно только указать параметры и сразу можно оплачивать.
Отзывы студентов
Ставлю 10/10
Все нравится, очень удобный сайт, помогает в учебе. Кроме этого, можно заработать самому, выставляя готовые учебные материалы на продажу здесь. Рейтинги и отзывы на преподавателей очень помогают сориентироваться в начале нового семестра. Спасибо за такую функцию. Ставлю максимальную оценку.
Лучшая платформа для успешной сдачи сессии
Познакомился со СтудИзбой благодаря своему другу, очень нравится интерфейс, количество доступных файлов, цена, в общем, все прекрасно. Даже сам продаю какие-то свои работы.
Студизба ван лав ❤
Очень офигенный сайт для студентов. Много полезных учебных материалов. Пользуюсь студизбой с октября 2021 года. Серьёзных нареканий нет. Хотелось бы, что бы ввели подписочную модель и сделали материалы дешевле 300 рублей в рамках подписки бесплатными.
Отличный сайт
Лично меня всё устраивает - и покупка, и продажа; и цены, и возможность предпросмотра куска файла, и обилие бесплатных файлов (в подборках по авторам, читай, ВУЗам и факультетам). Есть определённые баги, но всё решаемо, да и администраторы реагируют в течение суток.
Маленький отзыв о большом помощнике!
Студизба спасает в те моменты, когда сроки горят, а работ накопилось достаточно. Довольно удобный сайт с простой навигацией и огромным количеством материалов.
Студ. Изба как крупнейший сборник работ для студентов
Тут дофига бывает всего полезного. Печально, что бывают предметы по которым даже одного бесплатного решения нет, но это скорее вопрос к студентам. В остальном всё здорово.
Спасательный островок
Если уже не успеваешь разобраться или застрял на каком-то задание поможет тебе быстро и недорого решить твою проблему.
Всё и так отлично
Всё очень удобно. Особенно круто, что есть система бонусов и можно выводить остатки денег. Очень много качественных бесплатных файлов.
Отзыв о системе "Студизба"
Отличная платформа для распространения работ, востребованных студентами. Хорошо налаженная и качественная работа сайта, огромная база заданий и аудитория.
Отличный помощник
Отличный сайт с кучей полезных файлов, позволяющий найти много методичек / учебников / отзывов о вузах и преподователях.
Отлично помогает студентам в любой момент для решения трудных и незамедлительных задач
Хотелось бы больше конкретной информации о преподавателях. А так в принципе хороший сайт, всегда им пользуюсь и ни разу не было желания прекратить. Хороший сайт для помощи студентам, удобный и приятный интерфейс. Из недостатков можно выделить только отсутствия небольшого количества файлов.
Спасибо за шикарный сайт
Великолепный сайт на котором студент за не большие деньги может найти помощь с дз, проектами курсовыми, лабораторными, а также узнать отзывы на преподавателей и бесплатно скачать пособия.
Популярные преподаватели
Добавляйте материалы
и зарабатывайте!
Продажи идут автоматически
6358
Авторов
на СтудИзбе
311
Средний доход
с одного платного файла
Обучение Подробнее