Лекция 12. Дополнительные возможности Apache Spark (1185420)
Текст из файла
Большие данные: распределенноехранение и обработка данных спомощью модели вычисленийMapReduceЛекция №8. Дополнительные возможностиApache Spark.http://bigdata.cs.msu.rubigdata@cs.msu.ruRDD и разделыx=sc.parallelize([(1,1),(3,1),(5,1),(7,1),(9,1)], numSlices=3)node4node6node5z=x.join(y)shufflenode4hash(key)%4==0node2node3node1hash(key)%4==1 hash(key)%4==2hash(key)%4==3y=sc.parallelize([(1,1),(3,1),(5,1),(7,1),(9,1)]).partitionBy(4)partitionery1=y.map(lambda pair: (pair[1], pair[0]))partitionery2=y.mapValues(lambda value: value+1)RDDDirected acyclic graph (DAG)● stages○tasksLazy evaluationRDD.persist(MEMORY_ONLY)ПланБиблиотеки Spark● SQL и DataFrame● Spark Streaming● MLlib● GraphX● SparkR(*)Сегодня● SQL и DataFrame● векторные типы данныхDataFrameТабличная структура данных, упрощающая предобработку данных встатистических пакетах.●●●По структуре и операциям DataFrame похож на таблицу в реляционнойБД.Отличается от таблицы в реляционной БД способом обработки данных(стат.запросов и манипуляции): вместо SQL используются конструкцииЯП.“Классические” реализации (хранение в ОЗУ):○○●Pandas DataFrame (Python)R DataFrame“Альтернативные” реализации○SAS datasetКонтекстыsc=SparkContext()sqlContext=HiveContext(sc)SQLContextHiveContextПример создания DataFrame52638252638252638220141025 http://www.1tv.ru/pvoice/66112,6612120141025 http://www.audiopoisk.com/artist/vika-i-gruppa-magadan/ 60316,6036320141025 http://zvukoff.ru/song/26322660665from pyspark.sql import SQLContext, RowsqlContext=SQLContext(sc)lines = sc.textFile("c:\\bigdata\\data\\web2.tsv")parts = lines.map(lambda l: l.split("\t"))log_entries = parts.map(lambda p: Row(user_id=p[0], dt=int(p[1]), url=p[2], times=p[3]))log_entries.count() # hacklog = sqlContext.createDataFrame(log_entries) # samplingRatio=NoneПрогресс[Stage 8:>(0 + 4) / 5][Stage 8:>(0 + 5) / 5][Stage 8:===========>(1 + 4) / 5][Stage 8:=======================>(2 + 3) / 5][Stage 8:===================================>(3 + 2) / 5][Stage 8:===============================================>(4 + 1) / 5][Stage 9:>(0 + 0) / 200][Stage 9:>(0 + 4) / 200][Stage 9:>(1 + 7) / 200][Stage 9:==>(10 + 4) / 200][Stage 9:==>(10 + 7) / 200][Stage 9:====>(16 + 4) / 200][Stage 9:======>(23 + 4) / 200][Stage 9:===========>(42 + 4) / 200][Stage 9:===============>(56 + 4) / 200][Stage 9:=================>(63 + 4) / 200]DataFrame>>> type(log)<class 'pyspark.sql.dataframe.DataFrame'>>>> log.columns['dt', 'times', 'url', 'user_id']>>> log.dtypes[('dt', 'bigint'), ('times', 'string'), ('url', 'string'), ('user_id','string')]>>> log.describe().show()+-------+-----------------+|summary|dt|+-------+-----------------+| count|1000000||mean|2.0141045538141E7|| stddev| 39.0280347955159||min|20141005||max|20141130|+-------+-----------------+DataFrame: фильтрация, UDF>>> log.count()100000>>> from pyspark.sql.functions import UserDefinedFunction>>> from pyspark.sql.types import StringType>>> from urllib.parse import urlparse>>> get_domain=UserDefinedFunction(lambda url: urlparse(url).hostname,StringType())>>> log2=log.withColumn('domain', get_domain(log.url))>>> log2[['domain']].distinct().count()65663>>> log2[(log2.dt>=20141101)&(log2.dt<20141201)][['domain']].distinct().count()23316>>> log2.filter((log2.dt>=20141101)&(log2.dt<20141201))[['domain']].distinct().count()23316DataFrame: Агрегация>>> from pyspark.sql.types import IntegerType>>> get_month=UserDefinedFunction(lambda dt: dt//100, IntegerType())>>> log2=log2.withColumn('month', get_month(log2.dt))>>> import pyspark.sql.functions as F>>> log2.groupby('month').agg(F.countDistinct('domain')).show()+------+----------------------+| month|COUNT(DISTINCT domain)|+------+----------------------+|201410|53496||201411|23316|+------+----------------------+DataFrame: select>>> log[['user_id','dt']].show(2)+-------+--------+|user_id|dt|+-------+--------+|0|20141017||0|20141009|+-------+--------+>>> log.select('user_id','dt').show(3) # the same>>> log.select(log.user_id, log.dt).show(3) # the same>>> log.select(log.user_id, get_month(log.dt).alias('m')).show(3)+-------+------+|user_id|x|+-------+------+|0|201410||0|201410|+-------+------+DataFrame: Кеширование>>> log2.cache()>>> log2.groupby('month').agg(F.countDistinct('domain')).show()DataFrame: Сохранение / загрузка>>>>>>>>>>>>>>>>>>>>>5>>>log2.write.save('res_json', format='json')log2.write.json('res_json')log2.write.partitionby('domain').json('res_json', mode='overwrite')log2.write.parquet('res_parquet')log2.write.mode('append').parquet('res_parquet')log2=sqlContext.read.load('res_json2', format='json')log2.rdd.getNumPartitions()log2.rdd.partitionerDataFrame: другие разновидности операций●●●●●●замена najoinзамена значенийorderBysamplingtoPandas()SQLSpark SQL>>> log2.registerTempTable('log2')>>> sqlContext.sql('select count(*) from log2')DataFrame[_c0: bigint]>>> sqlContext.sql('select count(*) from log2').collect()[Row(_c0=1000000)]>>> sqlContext.sql('select * from log2 where domain like \'%lenta.ru\' order by dt desc limit 3').collect()Row(domain='lenta.ru', dt=20141129, month=201411,times='84755', url='http://lenta.ru/news/2013/11/29/love/',user_id='1235907'), Row(domain='lenta.ru', dt=20141129,month=201411, times='74491', url='http://lenta.ru/news/2013/11/29/love/', user_id='1724463'), Row(domain='lenta.ru', dt=20141129, month=201411, times='73832',url='http://lenta.ru/news/2013/11/29/block/',user_id='1724463')]Spark SQL>>> sqlContext.sql('select user_id, domain, month, count(domain) over (partition by month, domain), count(domain) over(partition by month, domain, user_id) from log2').show(3)+-------+--------------------+------+---+---+|user_id|domain| month|_c3|_c4|+-------+--------------------+------+---+---+| 385625|10.0.110.245|201410| 1| 1|| 447272|1119997.1001golos.ru|201410| 1| 1|| 447272|1125714.1001golos.ru|201410| 1| 1|+-------+--------------------+------+---+---+>>>Типы для линейной алгебрыDenseVector>>> from pyspark.mllib.linalg import Vectors>>> v = Vectors.dense([1.0, 0., 2.0, 0.])>>> w = Vectors.dense([3.0, 4.0, 0., 0.])>>> v + wDenseVector([4.0, 4.0, 2.0, 0.0])>>> 2 - vDenseVector([1.0, 2.0, 0.0, 2.0])>>> v * wDenseVector([3.0, 0.0, 0.0, 0.0])>>> v.norm(2)2.2360679774997898>>> v.squared_distance(v)0.0>>> v.squared_distance(w)24.0>>> v[0]1.0>>> v[0]=2Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'DenseVector' object does not support item assignmentSparseVector>>> vv=SparseVector(4, [(0, 1.), (3, 10.)])>>> ww=SparseVector(4, [(0, 20.), (2, 100.)])>>> vv + wwTraceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: unsupported operand type(s) for +: 'SparseVector' and 'SparseVector'>>> vv.dot(ww)>>> 20.0>>> vv.norm(2)10.04987562112089>>> vv.squared_distance(vv)0.0>>> vv.squared_distance(w)10461.0>>> vv.dot(v)1.0>>> vv[1]0.0DenseMatrix>>> from pyspark.mllib.linalg import Matrices>>> m = Matrices.dense(numRows=2, numCols=4, [1,2,3,4,5,6,7,8])>>> print(m)DenseMatrix([[ 1., 3., 5., 7.],[ 2., 4., 6., 8.]])>>> m[0,1]>>> 3SparseMatrix>>> mm=SparseMatrix(numRows=2, numCols=2, colPtrs=[0, 2, 3], rowIndices=[0, 1,1], values=[2, 3, 4])>>> print(mm)2 X 2 CSRMatrix(0,0) 2.0(1,0) 3.0(1,1) 4.0>>> [[2, 0, 0], \...
[3, 4, 0], \... [0, 0, 0]]>>> mm[1,0]>>> 3.0# for column i#rowIndices[colPtrs[i]:colPtrs[i+1]]#values[colPtrs[i]:colPtrs[i+1]]CSC-формат (Compressed Sparse Column matrix)RowMatrix>>> from pyspark.mllib.linalg.distributed import *>>> rdd=sc.parallelize([[1,2,3],[4,5,6]])>>> rm=RowMatrix(rows=rdd, numRows=0, numCols=0)>>> rm.numRows<bound method RowMatrix.numRows of <pyspark.mllib.linalg.distributed.RowMatrix object at 0x0000000005295048>>>>> rm.numRows()[Stage 30:>(0 + 0) / 4]...2>>> rm.rowsMapPartitionsRDD[187] at mapPartitions at PythonMLLibAPI.scala:1480>>> rm.rows.first()[Stage 32:>(0 + 0) / 3]...DenseVector([1.0, 2.0, 3.0])>>> rm[0,0]Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'RowMatrix' object is not subscriptableRDD из DenseVectorIndexedRowMatrix>>> rdd2=sc.parallelize([IndexedRow(0,[1,2,3]),IndexedRow(1,[4,5,6])])>>> irm=IndexedRowMatrix(rows=rdd2)>>> irm.rows.first()[Stage 42:>(0 + 0) / 3]...IndexedRow(0, [1.0,2.0,3.0])CoordinateMatrix>>> rdd3=sc.parallelize([MatrixEntry(0,0, 2.0),MatrixEntry(99,99,100.)])>>> cm=CoordinateMatrix(entries=rdd3)>>> cm.entries.first()>>> cm.entries.first()[Stage 46:>(0 + 0) / 3]...MatrixEntry(0, 0, 2.0)>>> cm[0,0]Traceback (most recent call last):File "<stdin>", line 1, in <module>TypeError: 'CoordinateMatrix' object is not subscriptable>>> cm.numCols()[Stage 47:>(0 + 0) / 4]...100BlockMatrixblocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])bm = BlockMatrix(blocks, 3, 2)>>> bm.blocks.first()[Stage 52:>(0 + 3) / 3]...((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))Методы конвертации к другим типам распределенных матриц:● toLocalMatrix()● toIndexedRowMatrix()● toCoordinateMatrix()PySpark в JupyterСпасибо за внимание!.
Характеристики
Тип файла PDF
PDF-формат наиболее широко используется для просмотра любого типа файлов на любом устройстве. В него можно сохранить документ, таблицы, презентацию, текст, чертежи, вычисления, графики и всё остальное, что можно показать на экране любого устройства. Именно его лучше всего использовать для печати.
Например, если Вам нужно распечатать чертёж из автокада, Вы сохраните чертёж на флешку, но будет ли автокад в пункте печати? А если будет, то нужная версия с нужными библиотеками? Именно для этого и нужен формат PDF - в нём точно будет показано верно вне зависимости от того, в какой программе создали PDF-файл и есть ли нужная программа для его просмотра.