Лекция 12. Дополнительные возможности Apache Spark (2015 Лекции)
Описание файла
Файл "Лекция 12. Дополнительные возможности Apache Spark" внутри архива находится в папке "2015 Лекции". PDF-файл из архива "2015 Лекции", который расположен в категории "". Всё это находится в предмете "(смрхиод) современные методы распределенного хранения и обработки данных" из 10 семестр (2 семестр магистратуры), которые можно найти в файловом архиве МГУ им. Ломоносова. Не смотря на прямую связь этого архива с МГУ им. Ломоносова, его также можно найти и в других разделах. .
Просмотр PDF-файла онлайн
Текст из PDF
Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №8. Дополнительные возможностиApache Spark.http://bigdata.cs.msu.rubigdata@cs.msu.ruRDD и разделыx=sc.parallelize([(1,1),(3,1),(5,1),(7,1),(9,1)], numSlices=3)node4node6node5z=x.join(y)shufflenode4hash(key)%4==0node2node3node1hash(key)%4==1 hash(key)%4==2hash(key)%4==3y=sc.parallelize([(1,1),(3,1),(5,1),(7,1),(9,1)]).partitionBy(4)partitionery1=y.map(lambda pair: (pair[1], pair[0]))partitionery2=y.mapValues(lambda value: value+1)RDDDirected acyclic graph (DAG)● stages○tasksLazy evaluationRDD.persist(MEMORY_ONLY)ПланБиблиотеки Spark● SQL и DataFrame● Spark Streaming● MLlib● GraphX● SparkR(*)Сегодня● SQL и DataFrame● векторные типы данныхDataFrameТабличная структура данных, упрощающая предобработку данных встатистических пакетах.●●●По структуре и операциям DataFrame похож на таблицу в реляционнойБД.Отличается от таблицы в реляционной БД способом обработки данных(стат.запросов и манипуляции): вместо SQL используются конструкцииЯП.“Классические” реализации (хранение в ОЗУ):○○●Pandas DataFrame (Python)R DataFrame“Альтернативные” реализации○SAS datasetКонтекстыsc=SparkContext()sqlContext=HiveContext(sc)SQLContextHiveContextПример создания DataFrame52638252638252638220141025 http://www.1tv.ru/pvoice/66112,6612120141025 http://www.audiopoisk.com/artist/vika-i-gruppa-magadan/ 60316,6036320141025 http://zvukoff.ru/song/26322660665from pyspark.sql import SQLContext, RowsqlContext=SQLContext(sc)lines = sc.textFile("c:\\bigdata\\data\\web2.tsv")parts = lines.map(lambda l: l.split("\t"))log_entries = parts.map(lambda p: Row(user_id=p[0], dt=int(p[1]), url=p[2], times=p[3]))log_entries.count() # hacklog = sqlContext.createDataFrame(log_entries) # samplingRatio=NoneПрогресс[Stage 8:>(0 + 4) / 5][Stage 8:>(0 + 5) / 5][Stage 8:===========>(1 + 4) / 5][Stage 8:=======================>(2 + 3) / 5][Stage 8:===================================>(3 + 2) / 5][Stage 8:===============================================>(4 + 1) / 5][Stage 9:>(0 + 0) / 200][Stage 9:>(0 + 4) / 200][Stage 9:>(1 + 7) / 200][Stage 9:==>(10 + 4) / 200][Stage 9:==>(10 + 7) / 200][Stage 9:====>(16 + 4) / 200][Stage 9:======>(23 + 4) / 200][Stage 9:===========>(42 + 4) / 200][Stage 9:===============>(56 + 4) / 200][Stage 9:=================>(63 + 4) / 200]DataFrame>>> type(log)<class 'pyspark.sql.dataframe.DataFrame'>>>> log.columns['dt', 'times', 'url', 'user_id']>>> log.dtypes[('dt', 'bigint'), ('times', 'string'), ('url', 'string'), ('user_id','string')]>>> log.describe().show()+-------+-----------------+|summary|dt|+-------+-----------------+| count|1000000||mean|2.0141045538141E7|| stddev| 39.0280347955159||min|20141005||max|20141130|+-------+-----------------+DataFrame: фильтрация, UDF>>> log.count()100000>>> from pyspark.sql.functions import UserDefinedFunction>>> from pyspark.sql.types import StringType>>> from urllib.parse import urlparse>>> get_domain=UserDefinedFunction(lambda url: urlparse(url).hostname,StringType())>>> log2=log.withColumn('domain', get_domain(log.url))>>> log2[['domain']].distinct().count()65663>>> log2[(log2.dt>=20141101)&(log2.dt<20141201)][['domain']].distinct().count()23316>>> log2.filter((log2.dt>=20141101)&(log2.dt<20141201))[['domain']].distinct().count()23316DataFrame: Агрегация>>> from pyspark.sql.types import IntegerType>>> get_month=UserDefinedFunction(lambda dt: dt//100, IntegerType())>>> log2=log2.withColumn('month', get_month(log2.dt))>>> import pyspark.sql.functions as F>>> log2.groupby('month').agg(F.countDistinct('domain')).show()+------+----------------------+| month|COUNT(DISTINCT domain)|+------+----------------------+|201410|53496||201411|23316|+------+----------------------+DataFrame: select>>> log[['user_id','dt']].show(2)+-------+--------+|user_id|dt|+-------+--------+|0|20141017||0|20141009|+-------+--------+>>> log.select('user_id','dt').show(3) # the same>>> log.select(log.user_id, log.dt).show(3) # the same>>> log.select(log.user_id, get_month(log.dt).alias('m')).show(3)+-------+------+|user_id|x|+-------+------+|0|201410||0|201410|+-------+------+DataFrame: Кеширование>>> log2.cache()>>> log2.groupby('month').agg(F.countDistinct('domain')).show()DataFrame: Сохранение / загрузка>>>>>>>>>>>>>>>>>>>>>5>>>log2.write.save('res_json', format='json')log2.write.json('res_json')log2.write.partitionby('domain').json('res_json', mode='overwrite')log2.write.parquet('res_parquet')log2.write.mode('append').parquet('res_parquet')log2=sqlContext.read.load('res_json2', format='json')log2.rdd.getNumPartitions()log2.rdd.partitionerDataFrame: другие разновидности операций●●●●●●замена najoinзамена значенийorderBysamplingtoPandas()SQLSpark SQL>>> log2.registerTempTable('log2')>>> sqlContext.sql('select count(*) from log2')DataFrame[_c0: bigint]>>> sqlContext.sql('select count(*) from log2').collect()[Row(_c0=1000000)]>>> sqlContext.sql('select * from log2 where domain like \'%lenta.ru\' order by dt desc limit 3').collect()Row(domain='lenta.ru', dt=20141129, month=201411,times='84755', url='http://lenta.ru/news/2013/11/29/love/',user_id='1235907'), Row(domain='lenta.ru', dt=20141129,month=201411, times='74491', url='http://lenta.ru/news/2013/11/29/love/', user_id='1724463'), Row(domain='lenta.ru', dt=20141129, month=201411, times='73832',url='http://lenta.ru/news/2013/11/29/block/',user_id='1724463')]Spark SQL>>> sqlContext.sql('select user_id, domain, month, count(domain) over (partition by month, domain), count(domain) over(partition by month, domain, user_id) from log2').show(3)+-------+--------------------+------+---+---+|user_id|domain| month|_c3|_c4|+-------+--------------------+------+---+---+| 385625|10.0.110.245|201410| 1| 1|| 447272|1119997.1001golos.ru|201410| 1| 1|| 447272|1125714.1001golos.ru|201410| 1| 1|+-------+--------------------+------+---+---+>>>Типы для линейной алгебрыDenseVector>>> from pyspark.mllib.linalg import Vectors>>> v = Vectors.dense([1.0, 0., 2.0, 0.])>>> w = Vectors.dense([3.0, 4.0, 0., 0.])>>> v + wDenseVector([4.0, 4.0, 2.0, 0.0])>>> 2 - vDenseVector([1.0, 2.0, 0.0, 2.0])>>> v * wDenseVector([3.0, 0.0, 0.0, 0.0])>>> v.norm(2)2.2360679774997898>>> v.squared_distance(v)0.0>>> v.squared_distance(w)24.0>>> v[0]1.0>>> v[0]=2Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'DenseVector' object does not support item assignmentSparseVector>>> vv=SparseVector(4, [(0, 1.), (3, 10.)])>>> ww=SparseVector(4, [(0, 20.), (2, 100.)])>>> vv + wwTraceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: unsupported operand type(s) for +: 'SparseVector' and 'SparseVector'>>> vv.dot(ww)>>> 20.0>>> vv.norm(2)10.04987562112089>>> vv.squared_distance(vv)0.0>>> vv.squared_distance(w)10461.0>>> vv.dot(v)1.0>>> vv[1]0.0DenseMatrix>>> from pyspark.mllib.linalg import Matrices>>> m = Matrices.dense(numRows=2, numCols=4, [1,2,3,4,5,6,7,8])>>> print(m)DenseMatrix([[ 1., 3., 5., 7.],[ 2., 4., 6., 8.]])>>> m[0,1]>>> 3SparseMatrix>>> mm=SparseMatrix(numRows=2, numCols=2, colPtrs=[0, 2, 3], rowIndices=[0, 1,1], values=[2, 3, 4])>>> print(mm)2 X 2 CSRMatrix(0,0) 2.0(1,0) 3.0(1,1) 4.0>>> [[2, 0, 0], \...
[3, 4, 0], \... [0, 0, 0]]>>> mm[1,0]>>> 3.0# for column i#rowIndices[colPtrs[i]:colPtrs[i+1]]#values[colPtrs[i]:colPtrs[i+1]]CSC-формат (Compressed Sparse Column matrix)RowMatrix>>> from pyspark.mllib.linalg.distributed import *>>> rdd=sc.parallelize([[1,2,3],[4,5,6]])>>> rm=RowMatrix(rows=rdd, numRows=0, numCols=0)>>> rm.numRows<bound method RowMatrix.numRows of <pyspark.mllib.linalg.distributed.RowMatrix object at 0x0000000005295048>>>>> rm.numRows()[Stage 30:>(0 + 0) / 4]...2>>> rm.rowsMapPartitionsRDD[187] at mapPartitions at PythonMLLibAPI.scala:1480>>> rm.rows.first()[Stage 32:>(0 + 0) / 3]...DenseVector([1.0, 2.0, 3.0])>>> rm[0,0]Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'RowMatrix' object is not subscriptableRDD из DenseVectorIndexedRowMatrix>>> rdd2=sc.parallelize([IndexedRow(0,[1,2,3]),IndexedRow(1,[4,5,6])])>>> irm=IndexedRowMatrix(rows=rdd2)>>> irm.rows.first()[Stage 42:>(0 + 0) / 3]...IndexedRow(0, [1.0,2.0,3.0])CoordinateMatrix>>> rdd3=sc.parallelize([MatrixEntry(0,0, 2.0),MatrixEntry(99,99,100.)])>>> cm=CoordinateMatrix(entries=rdd3)>>> cm.entries.first()>>> cm.entries.first()[Stage 46:>(0 + 0) / 3]...MatrixEntry(0, 0, 2.0)>>> cm[0,0]Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'CoordinateMatrix' object is not subscriptable>>> cm.numCols()[Stage 47:>(0 + 0) / 4]...100BlockMatrixblocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])bm = BlockMatrix(blocks, 3, 2)>>> bm.blocks.first()[Stage 52:>(0 + 3) / 3]...((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))Методы конвертации к другим типам распределенных матриц:● toLocalMatrix()● toIndexedRowMatrix()● toCoordinateMatrix()PySpark в JupyterСпасибо за внимание!.