这里有一些令人难以置信的数字供你参考。有超过5亿条推文、900亿封电子邮件、6500万条WhatsApp,以上这些都是在一天之内发送的!Facebook在24小时内能天生4PB的数据。这是难以置信的!
当然,这也带来了寻衅。一个数据科学团队如何捕获这么多的数据?你如何处理它并从中建立机器学习模型?如果你是一名数据科学家或数据工程师,这些都是令人愉快的问题。
Spark正能应对这些问题。Spark是用Scala编写的,它供应了Scala、JAVA、Python和R的接口. PySpark一起事情的API。PySpark是用Python编写的Python API用来支持Spark的。
处理大数据的一种传统办法是利用像Hadoop这样的分布式框架,但这些框架须要在硬盘上实行大量的读写操作。事实上韶光和速率都非常昂贵。打算能力同样是一个主要的障碍。
PySpark以一种高效且易于理解的办法处理这一问题。因此,在本文中,我们将开始学习有关它的所有内容。我们将理解什么是Spark,如何在你的机器上安装它,然后我们将深入研究不同的Spark组件。本文附有代码。
目录Spark是什么?在你的打算机上安装Apache Spark什么是Spark运用程序?什么是Spark会话?Spark的分区转换惰性打算Spark中的数据类型1.Spark是什么?Apache Spark是一个开源的分布式集群打算框架,用于快速处理、查询和剖析大数据。
它是当今企业中最有效的数据处理框架。利用Spark的本钱很高,由于它须要大量的内存进行打算,但它仍旧是数据科学家和大数据工程师的最爱。在本文中,你将看到为什么会涌现这种情形。
常日依赖于Map-Reduce的框架的组织现在正在转向Apache Spark框架。Spark实行内存打算,比Hadoop等Map Reduce框架快100倍。Spark在数据科学家中很受欢迎,由于它将数据分布和缓存放入了内存中,并且帮助他们优化大数据上的机器学习算法。
我建议查看Spark的官方页面,理解更多细节。它有大量的文档,是Spark很好参考教程:https://spark.apache.org/
2.安装Apache Spark2.1. 下载Apache Spark
安装Spark的一个大略方法是通过pip。但是,根据Spark的官方文档,这不是推举的方法,由于Spark的Python包并不打算取代所有其他情形。
在实现基本功能时,你很可能会碰着很多缺点。它只适用于与现有集群(独立的Spark、YARN或Mesos)进行交互。
因此,第一步是从这里下载Apache Spark的最新版本。解压并移动压缩文件:
tar xzvf spark-2.4.4-bin-hadoop2.7.tgz mv spark-2.4.4-bin-hadoop2.7 sparksudo mv spark/ /usr/lib/
2.2. 安装JAVA
确保在系统中安装了JAVA。我强烈推举JAVA 8,由于众所周知,Spark2在JAVA 9和其他方面存在问题:
sudo apt install default-jresudo apt install openjdk-8-jdk
2.3. 安装Scala构建工具(SBT)
当你处理一个包含很少源代码文件的小型项目时,手动编译它们会更随意马虎。但是,如果你正在处理一个包含数百个源代码文件的大型项目呢?在这种情形下,你须要利用构建工具。
SBT是Scala构建工具的缩写,它管理你的Spark项目以及你在代码中利用的库的依赖关系。
请记住,如果你利用的是PySpark,就不须要安装它。但是如果你利用JAVA或Scala构建Spark运用程序,那么你须要在你的机器上安装SBT。运行以下命令安装SBT:
echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.listcurl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key addsudo apt-get updatesudo apt-get install sbt
2.4. 配置SPARK
接下来,打开Spark的配置目录,复制默认的Spark环境模板。它已经以spark-env.sh.template的形式涌现了。利用编辑器打开:
cd /usr/lib/spark/conf/ cp spark-env.sh.template spark-env.sh sudo gedit spark-env.sh
现在,在文件spark-env.sh中。添加JAVA_HOME,并将内存限定SPARKWORKERMEMORY进行赋值。这里,我把它分配为4GB:
## 添加变量JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64SPARK_WORKER_MEMORY=4g
2.5. 设置Spark环境变量
利用下面的命令打开并编辑bashrc文件。这个bashrc文件是一个脚本,每当你开始一个新的终端会话就会实行:
## 打开bashrcsudo gedit ~/bashrc
文件中添加以下环境变量:
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export SBT_HOME=/usr/share/sbt/bin/sbt-launch.jar export SPARK_HOME=/usr/lib/sparkexport PATH=$PATH:$JAVA_HOME/binexport PATH=$PATH:$SBT_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbinexport PYSPARK_DRIVER_PYTHON=jupyterexport PYSPARK_DRIVER_PYTHON_OPTS='notebook'export PYSPARK_PYTHON=python3export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
现在,更新bashrc文件。这将在更新脚本的情形下重新启动终端会话:
source ~/.bashrc
现在,在终端中输入pyspark,它将在默认浏览器中打开Jupyter和一个自动初始化变量名为sc的Spark环境(它是Spark做事的入口点):
3.什么是Spark运用程序?
Spark运用程序是Spark高下文的一个实例。它由一个驱动进程和一组实行程序进程组成。
驱动进程卖力掩护关于Spark运用程序的信息、相应代码、分发和调度实行器中的事情。驱动进程是非常主要的,它是Spark运用程序的核心,并在运用程序的生命周期内掩护所有干系信息。
实行器卖力实际实行驱动程序分配给他们的事情。因此,每个实行器只卖力两件事:
实行由驱动程序分配给它的任务将实行程序上的打算状态报告回驱动程序节点4.什么是Spark会话?我们知道一个驱动进程掌握着Spark运用程序。驱动程序进程将自己作为一个称为Spark会话的工具供应给用户。
Spark会话实例可以利用Spark在集群中实行用户自定义操作。在Scala和Python中,当你启动掌握台时,Spark会话变量便是可用的:
5.Spark的分区
分区意味着完全的数据不会涌如今一个地方。它被分成多个块,这些块被放置在不同的节点上。
如果只有一个分区,纵然有数千个实行器,Spark的并行度也只有一个。其余,如果有多个分区,但只有一个实行器,Spark的并行度仍旧只有一个,由于只有一个打算资源。
在Spark中,较低级别的api许可我们定义分区的数量。
让我们举一个大略的例子来理解分区是如何帮助我们得到更快的结果的。我们将在10到1000之间创建一个包含2000万个随机数的列表,并对大于200的数字进行计数。
让我们看看我们能多快做到这只一个分区:
from random import randint # 创建一个随机数字的列表在10到1000之间my_large_list = [randint(10,1000) for x in range(0,20000000)]# 创建一个分区的列表my_large_list_one_partition = sc.parallelize(my_large_list,numSlices=1)# 检讨分区数量print(my_large_list_one_partition.getNumPartitions())# >> 1# 筛选数量大于即是200的数字my_large_list_one_partition = my_large_list_one_partition.filter(lambda x : x >= 200)# 在jupyter中运行代码 # 实行以下命令来打算韶光%%time# 列表中元素的数量print(my_large_list_one_partition.count())# >> 16162207
利用一个分区时,花了34.5毫秒来筛选数字:
现在,让我们将分区的数量增加到5和检讨实行韶光:
# 创建五个分区my_large_list_with_five_partition = sc.parallelize(my_large_list, numSlices=5)# 筛选数量大于即是200的数字my_large_list_with_five_partition = my_large_list_with_five_partition.filter(lambda x : x >= 200)%%time # 列表中元素的数量print(my_large_list_with_five_partition.count())# >> 16162207
利用5个分区时,花了11.1毫秒来筛选数字:
6.转换
在Spark中,数据构造是不可变的。这意味着一旦创建它们就不能变动。但是如果我们不能改变它,我们该如何利用它呢?
因此,为了进行变动,我们须要指示Spark如何修正数据。这些指令称为转换。
回忆一下我们在上面看到的例子。我们哀求Spark过滤大于200的数字——这实质上是一种转换。Spark有两种类型的转换:
窄转换:在窄转换中,打算单个分区结果所需的所有元素都位于父RDD的单个分区中。例如,如果希望过滤小于100的数字,可以在每个分区上分别实行此操作。转换后的新分区仅依赖于一个分区来打算结果宽转换:在宽转换中,打算单个分区的结果所需的所有元素可能位于父RDD的多个分区中。例如,如果你想打算数字个数,那么你的转换依赖于所有的分区来打算终极的结果7.惰性打算假设你有一个包含数百万行的非常大的数据文件。你须要通过一些操作来进行剖析,比如映射、过滤、随机分割,乃至是最基本的加减法。
现在,对付大型数据集,纵然是一个基本的转换也须要实行数百万个操作。
在处理大数据时,优化这些操作至关主要,Spark以一种非常有创意的办法处理它。你所须要做的便是见告Spark你想要对数据集进行哪些转换,Spark将掩护一系列转换。当你向Spark要求结果时,它将找出最佳路径并实行所需的转换并给出结果。
现在,让我们举个例子。你有一个1gb的文本文件,并创建了10个分区。你还实行了一些转换,末了哀求查看第一行。在这种情形下,Spark将只从第一个分区读取文件,在不须要读取全体文件的情形下供应结果。
让我们举几个实际的例子来看看Spark是如何实行惰性打算的。在第一步中,我们创建了一个包含1000万个数字的列表,并创建了一个包含3个分区的RDD:
# 创建一个样本列表my_list = [i for i in range(1,10000000)]# 并行处理数据rdd_0 = sc.parallelize(my_list,3)rdd_0
接下来,我们将实行一个非常基本的转换,比如每个数字加4。请把稳,Spark此时还没有启动任何转换。它只记录了一系列RDD运算图形式的转换。你可以看到,利用函数toDebugString查看RDD运算图:
# 每个数增加4rdd_1 = rdd_0.map(lambda x : x+4)# RDD工具print(rdd_1)#获取RDD运算图print(rdd_1.toDebugString())
我们可以看到,PythonRDD[1]与ParallelCollectionRDD[0]是连接的。现在,让我们连续添加转换,将列表的所有元素加20。
你可能会认为直接增加24会先增加4后增加20一步更好。但是在这一步之后检讨RDD运算图:
# 每个数增加20rdd_2 = rdd_1.map(lambda x : x+20)# RDD 工具print(rdd_2)#获取RDD运算图print(rdd_2.toDebugString())
我们可以看到,它自动跳过了冗余步骤,并将在单个步骤中添加24。因此,Spark会自动定义实行操作的最佳路径,并且只在须要时实行转换。
让我们再举一个例子来理解惰性打算过程。
假设我们有一个文本文件,并创建了一个包含4个分区的RDD。现在,我们定义一些转换,如将文本数据转换为小写、将单词分割、为单词添加一些前缀等。
但是,当我们实行一个动作,比如获取转换数据的第一个元素时,这种情形下不须要查看完全的数据来实行要求的结果,以是Spark只在第一个分区上实行转换
# 创建一个文本文件的RDD,分区数量= 4my_text_file = sc.textFile('tokens_spark.txt',minPartitions=4)# RDD工具print(my_text_file)# 转换小写my_text_file = my_text_file.map(lambda x : x.lower())# 更新RDD工具print(my_text_file)print(my_text_file.toDebugString())
在这里,我们把单词小写,取得每个单词的前两个字符。
# 分割单词my_text_file = my_text_file.map(lambda x : x[:2])# RDD工具print(my_text_file)print(my_text_file.toDebugString())# 在所有的转换后得到第一个元素print(my_text_file.first())
我们创建了4个分区的文本文件。但是根据我们须要的结果,不须要在所有分区上读取和实行转换,因此Spack只在第一个分区实行。
如果我们想打算涌现了多少个单词呢?这种情形下我们须要读取所有的分区:
print(my_text_file.countApproxDistinct())
8.Spark MLlib的数据类型
MLlib是Spark的可扩展机器学习库。它包括一些常用的机器学习算法,如回归、分类、降维,以及一些对数据实行基本统计操作的工具。
在本文中,我们将详细谈论MLlib供应的一些数据类型。在往后的文章中,我们将谈论诸如特色提取和构建机器学习管道之类的主题。
8.1. 局部向量
MLlib支持两种类型确当地向量:稠密和稀疏。昔时夜多数数字为零时利用稀疏向量。要创建一个稀疏向量,你须要供应向量的长度——非零值的索引,这些值该当严格递增且非零值。
from pyspark.mllib.linalg import Vectors## 稠密向量print(Vectors.dense([1,2,3,4,5,6,0]))# >> DenseVector([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 0.0])### 稠密向量### Vectors.sparse( length, index_of_non_zero_values, non_zero_values)### 索引该当严格递增且非零值print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]))# >> SparseVector(10, {0: 1.0, 1: 5.0, 2: 3.0, 4: 5.0, 5: 7.0})print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]).toArray())# >> array([1., 5., 3., 0., 5., 7., 0., 0., 0., 0.])
8.2. 标签点
标签点(Labeled Point)是一个局部向量,个中每个向量都有一个标签。这可以用在监督学习中,你有一些目标的特色与这些特色对应的标签。
from pyspark.mllib.regression import LabeledPoint# 设置一个标签与一个稠密向量point_1 = LabeledPoint(1,Vectors.dense([1,2,3,4,5]))# 特色 print(point_1.features)# 标签print(point_1.label)
8.3. 局部矩阵
局部矩阵存储在一台机器上。MLlib同时支持稠密矩阵和稀疏矩阵。在稀疏矩阵中,非零项值按列为主顺序存储在压缩的稀疏列格式(CSC格式)中。
# 导入矩阵from pyspark.mllib.linalg import Matrices# 创建一个3行2列的稠密矩阵matrix_1 = Matrices.dense(3, 2, [1,2,3,4,5,6])print(matrix_1)# >> DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False)print(matrix_1.toArray())""">> array([[1., 4.], [2., 5.], [3., 6.]])"""# 创建一个稀疏矩阵matrix_2 = Matrices.sparse(3, 3, [0, 1, 2, 3], [0, 0, 2], [9, 6, 8])print(matrix_2)# SparseMatrix(3, 3, [0, 1, 2, 3], [0, 0, 2], [9.0, 6.0, 8.0], False)print(matrix_2.toArray())""">> array([[9., 6., 0.], [0., 0., 0.], [0., 0., 8.]])"""
8.4. 分布式矩阵
分布式矩阵存储在一个或多个rds中。选择得当的分布式矩阵格式是非常主要的。目前已经实现了四种类型的分布式矩阵:
行矩阵 每一行都是一个局部向量。可以在多个分区上存储行 像随机森林这样的算法可以利用行矩阵来实现,由于该算法将行划分为多个树。一棵树的结果不依赖于其他树。因此,我们可以利用分布式架构,对大数据的随机森林等算法进行并行处理# 分布式数据类型——行矩阵from pyspark.mllib.linalg.distributed import RowMatrix# 创建RDDrows = sc.parallelize([[1,2,3], [4,5,6], [7,8,9], [10,11,12]])# 创建一个分布式行矩阵row_matrix = RowMatrix(rows)print(row_matrix)# >> <pyspark.mllib.linalg.distributed.RowMatrix at 0x7f425884d7f0> print(row_matrix.numRows())# >> 4print(row_matrix.numCols())# >> 3索引行矩阵 它类似于行矩阵,个中行以有序的办法存储在多个分区中。为每行分配一个索引值。它用于序列很主要的算法,比如韶光序列数据 它可以从IndexedRow的RDD创建
# 索引行矩阵from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix#创建RDDindexed_rows = sc.parallelize([ IndexedRow(0, [0,1,2]), IndexedRow(1, [1,2,3]), IndexedRow(2, [3,4,5]), IndexedRow(3, [4,2,3]), IndexedRow(4, [2,2,5]), IndexedRow(5, [4,5,5])])# 创建IndexedRowMatrixindexed_rows_matrix = IndexedRowMatrix(indexed_rows)print(indexed_rows_matrix.numRows())# >> 6print(indexed_rows_matrix.numCols())# >> 3坐标矩阵 可以从MatrixEntry的RDD创建坐标矩阵 只有当矩阵的维数都很大时,我们才利用坐标矩阵
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry# 用MatrixEntry创建matrix_entries = sc.parallelize([MatrixEntry(0, 5, 2), MatrixEntry(1, 1, 1), MatrixEntry(1, 5, 4)])# 创建坐标矩阵c_matrix = CoordinateMatrix(matrix_entries)# 列数print(c_matrix.numCols())# >> 6# 行数print(c_matrix.numRows())# >> 2块矩阵 在一个块矩阵中,我们可以在不同的机器上存储一个大矩阵的不同子矩阵 我们须要指定块的尺寸。就像下面的例子,我们有3X3,对付每一个方块,我们可以通过供应坐标来指定一个矩阵
# 导入库from pyspark.mllib.linalg import Matricesfrom pyspark.mllib.linalg.distributed import BlockMatrix# 创建子矩阵块的RDDblocks = sc.parallelize([((0, 0), Matrices.dense(3, 3, [1, 2, 1, 2, 1, 2, 1, 2, 1])), ((1, 1), Matrices.dense(3, 3, [3, 4, 5, 3, 4, 5, 3, 4, 5])), ((2, 0), Matrices.dense(3, 3, [1, 1, 1, 1, 1, 1, 1, 1, 1]))])# 从子矩阵块的RDD中创建矩阵块,大小为3X3b_matrix = BlockMatrix(blocks, 3, 3) #每一块的列数print(b_matrix.colsPerBlock)# >> 3#每一块的行数print(b_matrix.rowsPerBlock)# >> 3# 把块矩阵转换为局部矩阵local_mat = b_matrix.toLocalMatrix()# 打印局部矩阵print(local_mat.toArray())""">> array([[1., 2., 1., 0., 0., 0.], [2., 1., 2., 0., 0., 0.], [1., 2., 1., 0., 0., 0.], [0., 0., 0., 3., 3., 3.], [0., 0., 0., 4., 4., 4.], [0., 0., 0., 5., 5., 5.], [1., 1., 1., 0., 0., 0.], [1., 1., 1., 0., 0., 0.], [1., 1., 1., 0., 0., 0.]])"""结尾
本日我们已经讲了很多了。Spark是数据科学中最迷人的措辞之一,我以为至少该当熟习它。
这只是我们PySpark学习旅程的开始!我操持在本系列中涵盖更多的内容,包括不同机器学习任务的多篇文章。
在即将揭橥的PySpark文章中,我们将看到如何进行特色提取、创建机器学习管道和构建模型。