PySpark大数据分析与应用
上QQ阅读APP看书,第一时间看更新

1.2 Spark大数据技术框架

Spark 是一个开源的、通用的并行计算框架,支持分布式数据处理。其最大的特点是基于内存进行计算,这样可以显著提高处理速度,尤其是对于那些需要多次访问同一数据集的迭代算法。Spark与Hadoop生态系统兼容,它可以运行在Hadoop的YARN资源管理器上,并且可以使用HDFS作为其文件系统。Spark由多个组件组成,包括Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图计算库)。这些组件使得Spark能够一站式解决多种业务和应用需求,如批处理、结构化数据查询、流式计算、机器学习和图计算等。Spark 的设计理念是灵活性和易用性,使其适用于多种应用场景,特别是那些需要对特定数据集进行多次操作或迭代计算的场景。

1.2.1 Spark简介

Spark 作为新一代大数据处理引擎,其设计理念基于内存存储式计算和高效的容错机制,以便于交互式查询和迭代计算。自推出以来,Spark 就迅速成为社区的热门项目,本书写作时,活跃度在Apache所有开源项目中排第3位。

2009年,Spark诞生,由加利福尼亚大学伯克利分校AMPLab开发,是一个研究性项目。

2010年,Spark通过BSD许可协议正式对外开源发布。

2012年,第一篇关于Spark的论文发布,第一个正式版本Spark 0.6.0发布。

2013年,Spark成为Apache软件基金会项目,Spark Streaming、Spark MLlib、Shark (Spark on Hadoop)发布。

2014年,Spark成为Apache的顶级项目,5月底Spark 1.0.0发布,同时Spark Graphx和Spark SQL取代了Shark。

2015年,Spark推出了适用于大数据分析的DataFrame编程模型,此后,Spark在IT行业变得越来越受欢迎,许多公司开始重点部署或使用Spark取代MapReduce、Hive、Storm等其他大数据计算框架。

2016年,Spark推出了更强的数据分析工具DataSet。

2017年,Structured Streaming发布。

2018年,Spark 2.4.0发布,成为全球最大的开源项目。

2019年,Spark 3.0发布,性能相比Spark 2.4提升了2倍,提供结构化流的新用户界面(User Interface,UI),对Python支持更加友好,并且兼容ANSI SQL。

1.2.2 Spark特点

尽管Hadoop已经成为大数据技术的事实标准,并且MapReduce适用于对大规模数据集进行批处理操作,但Hadoop并不适用于实时数据处理。根据MapReduce的工作流程,它存在表达能力有限、磁盘I/O开销大和延迟高的缺点。相比之下,Spark基于内存进行计算,其计算性能得到了极大的提升。Spark主要有以下4个特点。

(1)高效性。Spark采用内存存储中间计算结果,可减少迭代运算的磁盘I/O,并通过并行计算有向无环图(Directed Acyclic Graph,DAG)的优化,减少不同任务之间的依赖,降低延迟等待时间。在内存中,Spark的运行速度比MapReduce快100倍。

(2)易用性。与仅支持Map和Reduce两种编程类型的算子的MapReduce不同,Spark提供超过80种不同的转换和行动算子,并采用函数式编程风格,使相同功能需要的代码量大大缩小。

(3)通用性。Spark 提供一栈式解决方案,可以用于批处理、交互式查询、实时流处理、机器学习和图计算等多种不同类型的处理。这些处理可以在同一个应用中无缝使用。对企业应用来说,可以使用一个平台来进行不同的工程实现,从而减少人力开发和平台部署成本。

(4)兼容性。Spark能够与很多开源框架兼容使用。例如,Spark可以使用 Hadoop YARN 和Apache Mesos作为其资源管理和调度器,并且可以从多种数据源读取数据,如HDFS、HBase、MySQL等。

1.2.3 Spark运行架构与流程

Spark运行架构指Spark Core架构。Spark Core是Spark的核心,其功能包含内存计算、任务调度、模式部署、存储管理、故障恢复等。

1.Spark运行架构

Spark运行架构包括四个主要组件:集群管理器(Cluster Manager)、应用的任务驱动器(Driver)、工作节点(Worker Node)以及执行进程(Executor)。这四个组件共同构成了Spark的运行环境,如图1-1所示。在这个架构中,集群管理器位于中心位置,左侧是任务驱动器,右侧是工作节点。

图1-1 Spark运行架构

用户编写的Spark应用程序包括一个Driver功能的代码和分布在集群中多个节点上运行的Executor代码。Spark运行架构包括的四个主要组件的具体说明如下。

(1)Driver:任务驱动器,负责启动运行main()方法并创建SparkContext对象。

(2)Cluster Manager:集群管理器,在集群上获取资源的外部服务,支持Standalone、Mesos和YARN这3种类型。

(3)Worker Node:工作节点,集群中运行Application代码的任意一个节点。

(4)Executor:运行在工作节点中的进程,负责运行 Task,并为应用程序存储数据,在这个过程中,可能会将数据写入内存或磁盘,进行Cache(缓存)。

运行在某个Executor上的工作单元称为Task(任务),Task是Executor中的一个线程。一个并行计算的Job(作业),由一组Task组成,每组Task称为Stage(阶段)。

在Spark中,每个工作节点上的Executor服务于不同的Application,它们之间是不可以共享数据的。与MapReduce计算框架相比,Spark采用的Executor具有两大优势。

(1)Executor利用多线程来执行具体任务,相比MapReduce的进程模型,使用的资源和启动开销要小很多。

(2)Executor中有一个BlockManager存储模块,会将内存和磁盘共同作为存储设备,当需要多轮迭代计算时,可以将中间结果存储到这个存储模块里,供下次需要时直接使用,而不需要从磁盘中读取,从而有效减少I/O开销。在交互式查询场景下,可以预先将数据缓存到BlockManager存储模块上,从而提高I/O性能。

2.Spark运行流程

Spark运行基本流程如图1-2所示。

图1-2 Spark运行基本流程

Spark运行基本流程具体步骤如下。

(1)Driver 创建一个 SparkContext 对象来构建 Spark Application 的运行环境, SparkContext向Cluster Manager(集群管理器)注册并申请运行Executor资源。

(2)Cluster Manager为Executor分配资源并启动Executor进程。

(3)SparkContext根据RDD依赖关系构建DAG(有向无环图),然后DAG Scheduler将DAG分解成多个Stage,并将每个Stage的TaskSet(任务集,即多组任务)发送给Task Scheduler(任务调度器)。Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor。

(4)Task 在 Executor 上运行,将执行结果反馈给 Task Scheduler,再反馈给 DAG Scheduler。运行完毕后,数据被写入存储系统,并向Cluster Manager注销Task,释放所有Task Scheduler资源。

关于DAG Scheduler与Task Scheduler具体的作用如下。

(1)DAG Scheduler 决定运行 Task 的理想位置,并将这些信息传递给下层的 Task Scheduler。DAG Scheduler还会将DAG分解成多个Stage,然后将Stage以TaskSet的形式提交给Task Scheduler。此外,DAG Scheduler还处理可能在Shuffle阶段因数据丢失所导致的失败,这有可能需要重新提交运行之前的Stage。

(2)Task Scheduler维护所有TaskSet,当Executor向Driver发送“心跳”信息时,Task Scheduler会根据其资源剩余情况分配相应的Task。另外,Task Scheduler还维护着所有Task的运行状态,重试失败的Task。

1.2.4 Spark RDD

Spark 的核心建立在统一的抽象弹性分布式数据集(Resilient Distributed Datasets, RDD)之上,这使得Spark的各个组件可以无缝集成,能够在同一个应用程序中完成大数据处理。

1.RDD产生背景

RDD的设计理念源自AMP Lab发表的论文“Resilient Distributed Datasets: A FaultTolerant Abstraction for In-Memory Cluster Computing”。在实际应用中,存在许多诸如机器学习、图算法等迭代式算法和交互式数据挖掘工具,这些应用场景的共同之处是,不同计算阶段之间会重用中间结果,即一个阶段的输出会作为下一个阶段的输入。为了满足这种需求,Spark创造性地设计出RDD,RDD提供了一个抽象的数据结构,使得开发者无需关心底层数据的分布式特性。通过将具体的应用逻辑表达为一系列 RDD 的转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化。这样可以避免对中间结果的存储,从而大大降低数据复制、磁盘I/O和序列化开销。

2.RDD概念与特点

RDD 是分布式对象集合,本质上一个 RDD 是一个只读的分区记录集合,每个 RDD可以分成多个分区,这些分区是数据集的片段。不同 RDD 的分区可以保存在集群中的不同节点上,从而实现在不同节点上的并行计算。

RDD的主要特点如下。

(1)弹性计算。弹性计算包括如下内容。

① 存储弹性,指内存和磁盘之间的自动切换。当计算过程内存不足时,内存与磁盘进行数据交换。

② 容错弹性,指数据丢失可自动恢复。基于 RDD 之间的“血缘关系”(详见本小节第4点),数据丢失时可以自动恢复。

③ 计算弹性,指计算出错时的重试机制。当计算出错时,可以进行重试。

④ 分片弹性,指根据需要可重新分片。

(2)分布式数据存储计算。RDD数据被分割到不同服务器节点的内存上,以实现分布式计算的目的。RDD可以看作Spark中一个抽象的对象。

(3)延迟计算。RDD转换操作采用惰性机制,执行转换操作后,相应的计算并不会立即开始。只有当执行行动操作时;才会真正开始计算。延迟计算让Spark能够更全面地查看DAG后进行更多优化计算。

(4)不可变性。RDD 数据采用只读模式,不能直接修改,只能通过相关的转换操作生成新的数据来间接达到修改的目的。不可变性是高并发(多线程)系统的方法,如果同时读、写(更新),并发程序就更难实现,且不可变数据也更容易在内存中存储和操作。

(5)可分区。作为分布式计算框架,Spark 支持数据可分区,用户可以在任何现有的RDD上执行分区操作。

3.RDD基本操作

RDD的基本操作包括RDD构建操作、RDD转换(Transformation)操作和RDD行动(Action)操作。下面介绍RDD的基本操作。

(1)RDD构建操作

在 Spark 中,计算是通过一系列对 RDD 的操作来完成的,计算的第一步就是构建RDD。RDD主要有以下3种构建方式。

① 从集合中构建RDD。

② 在现有RDD的基础上构建新的RDD。

③ 从外部数据源(如本地文件、HDFS、数据库等)中读取数据来构建RDD。

(2)RDD转换操作

在介绍RDD转换操作前,要先了解算子的概念。分布式对象上的API称为算子,本地对象上的API称为方法或函数。RDD转换操作是指从一个RDD中生成一个新的RDD。需要注意的是,RDD中的所有转换都是延迟加载的,并不会直接计算结果。只有遇到行动算子时,这些转换操作才会真正进行。这种设计让Spark能更加高效地运行。RDD转换操作通过诸如map()、filter()、flatMap()、reduceByKey()等具有不同功能的转换算子实现。常用的RDD转换算子如表1-1所示。

表1-1 常用的RDD转换算子

(3)RDD行动操作

RDD行动操作用于执行计算并按指定方式输出结果。行动操作接收RDD作为输入,但返回非RDD类型的值或结果。在RDD执行过程中,真正的计算发生在RDD行动操作。RDD行动操作通过如reduce()、count()、take()、countByKey()等各具功能的行动算子实现, RDD常用的行动算子如表1-2所示。

表1-2 RDD常用的行动算子

4.RDD血缘关系

RDD 具有一个重要的特性——血缘关系(Lineage)。血缘关系用于记录一个 RDD 是如何从其父RDD计算得到的。当某个RDD丢失时,可以根据其血缘关系从其父RDD重新计算。

一个RDD执行过程的实例,如图1-3所示。在RDD执行过程中,系统从输入中逻辑上生成了A和C两个RDD,经过一系列转换操作,逻辑上生成了F这个RDD。

图1-3 一个RDD执行过程的实例

Spark记录了RDD之间的生成和依赖关系。当F进行行动操作时,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算。

上述一系列处理称为血缘关系,即 DAG 拓扑排序的结果。在血缘关系中,下一代的RDD依赖于上一代的RDD。在图1-3所示实例中,B依赖于A,D依赖于C,而E依赖于B和D。

5.RDD之间的依赖关系

RDD之间存在依赖关系,用户可以通过已有的RDD转换生成新的RDD。新、旧RDD之间的联系称为依赖关系,分为窄依赖和宽依赖两种。

窄依赖表现为父RDD的一个分区对应子RDD的一个分区,或父RDD的多个分区对应子RDD的一个分区。RDD之间的窄依赖如图1-4所示。典型的窄依赖操作包括map()、filter()、union()、join()等。

图1-4 RDD之间的窄依赖

宽依赖表现为父RDD的一个分区对应子RDD的多个分区。RDD之间的宽依赖如图1-5所示。宽依赖典型的操作包括groupByKey()、sortByKey()等。

图1-5 RDD之间的宽依赖

Spark 的依赖关系设计使其具有良好的容错性,并大大提升了执行速度。RDD通过血缘关系记录了它是如何从其他RDD中演变过来的。当某个RDD的部分分区数据丢失时,它可以通过血缘关系获取足够的信息,重新计算和恢复丢失的数据分区。

相对而言,窄依赖的失败恢复更为高效,只需要根据父 RDD 的分区重新计算丢失的分区即可,而不需要重新计算父 RDD 的所有分区。而对于宽依赖来说,即使只是单个节点失效导致 RDD 的一个分区失效,也需要重新计算父RDD的所有分区,开销较大。

宽依赖操作类似于将父 RDD 中所有分区的记录进行“洗牌”,数据被打散后在子RDD中进行重组。

6.阶段划分

用户提交的计算任务是一个由RDD构成的DAG,如果RDD的转换是宽依赖,那么宽依赖转换就将这个 DAG 分为不同的阶段。由于宽依赖会带来洗牌,因此不同阶段不能并行计算,后面阶段的RDD计算需要等待前面阶段RDD的所有分区全部计算完毕后才能进行。这类似于在MapReduce中,默认情况下,Reduce阶段的计算必须等待所有Map任务完成后才能开始。

在对作业中的所有操作划分阶段时,一般会按照倒序进行,即从行动操作开始,遇到窄依赖操作则划分到同一个执行阶段;遇到宽依赖操作则划分一个新的执行阶段。后面的阶段需要等待前面的所有阶段执行完毕后才可以执行,这样阶段之间根据依赖关系就构成了一个大粒度的DAG。

DAG阶段划分的详细过程,如图1-6所示。

图1-6 DAG阶段划分的详细过程

假设根据HDFS中读入数据生成3个不同的RDD,分别为A、C和E,经过一系列转换操作后得到新的 RDD G,并将结果保存到 HDFS 中。在图1-6所示的 DAG 中,只有groupByKey()、join()操作是宽依赖,Spark会以此为边界将其前后划分成不同的阶段。

同时可以注意到,在Stage 2中,从map()到union()都是窄依赖,这两步操作可以形成流水线操作。即通过map()操作生成的分区可以不用等待整个RDD计算结束,而是继续进行union()操作,这样大大提高了计算的效率。

7.持久化

RDD是惰性求值的,每次对某个RDD调用行动操作时都会重新计算该RDD及其依赖。如果需要多次使用同一个RDD,那么消耗会非常大。为了避免多次计算同一个RDD,可以对RDD数据进行持久化。

persist()和cache()是用于将任意RDD缓存到内存或磁盘文件系统中的方法。缓存是容错的,如果一个RDD分片丢失,可以通过构建RDD的转换操作自动重构。已缓存的RDD被使用时,存取速度会大大提升。一般情况下,Executor 60%的内存用于缓存RDD数据,剩下的40%用于执行任务。

需要注意的是cache()只能将RDD缓存到内存中,是persist()的特例方法。而persist()可以让用户根据需求指定一个持久化级别,如表1-3所示。

表1-3 持久化级别

对于MEMORY_AND_DISK和MEMORY_AND_DISK_SER级别,系统会首先将数据保存在内存中,如果内存不够,那么将溢出部分写入磁盘中。另外,为了提高缓存的容错性,可以在持久化级别名称的后面加上“_2”,将持久化数据存为两份,如MEMORY_ONLY_2。不同持久化级别的目的是满足内存使用和CPU效率权衡上的不同需求。可以通过如下步骤选择合适的持久化级别。

(1)如果RDD可以很好地与默认的存储级别(MEMORY_ONLY)契合,那么不需要做任何修改。MEMORY_ONLY是CPU使用效率最高的选项,该存储级别使得RDD的操作尽可能快。

(2)如果RDD不能与默认的存储级别较好契合,那么可以尝试使用MEMORY_ONLY_SER,并选择一个快速序列化的库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问。

(3)除非数据集的计算量特别大或需要过滤大量数据,否则应尽量避免将数据存储至硬盘上。重新计算一个分区的速度与从硬盘中读取的速度基本差不多。

(4)如果希望拥有较强的故障恢复能力,可以使用复制存储级别(MEMORY_ONLY_2)。所有的存储级别都有通过重新计算丢失数据来恢复错误的容错机制。复制存储级别可以让任务在RDD上持续运行,而不需要等待丢失的分区被重新计算。

在不需要缓存RDD时,应及时使用unpersist()算子来释放缓存的RDD数据。

Spark RDD的设计原理体现了在数据和计算资源已知的前提下,各个RDD有效地分工协作以最大化提高计算效率的思想。这与人们在生活、工作中“团结协作”和“互帮互助”非常相似。当团队面对重大任务时,只有科学地协作,每个人才能发挥更大的作用,效率也会大大提高。

1.2.5 Spark生态圈

Spark生态圈也称为伯克利数据分析软件栈(Berkeley Data Analytics Stack,BDAS), Spark 的设计遵循“一个软件栈满足不同应用场景”的理念,逐渐形成了一套完整的生态系统。Spark生态圈如图1-7所示。

图1-7 Spark生态圈

Spark 生态圈的组件可以应用于机器学习、数据挖掘、数据库、信息检索、自然语言处理和语音识别等多个领域。Spark生态圈以Spark Core为核心,从HDFS、Amazon S3、Tachyon(分布式内存文件系统)等数据源中读取数据,可以使用Mesos、YARN或自身携带的Standalone作为资源管理器调度作业,完成应用程序的计算。这些应用程序可以来自不同的组件,如Spark Streaming的实时处理应用、采样近似查询引擎BlinkDB的权衡查询、Spark SQL的即时查询、MLBase/Spark MLlib的机器学习和Spark GraphX的图处理等。Spark 这种大一统的软件栈也能给人们的生活和工作带来启示,一个人掌握的技能越多,才能在竞争中立于不败之地。

需要说明的是,无论是Spark Streaming、Spark SQL、Spark MLlib,还是Spark GraphX,都可以使用Spark Core的API处理问题。它们的方法几乎是通用的,处理的数据也可以共享,因此Spark可以完成不同应用之间数据的无缝集成。

1.Spark Core

Spark Core作为Spark引擎的核心,提供基于内存的分布式计算,在Hadoop原生的MapReduce引擎的基础上继承其优势、弥补其不足,减少计算过程当中的迭代运算,大大提升计算效率。Spark Core强大功能体现在其包含Spark基础和核心的功能,如内存计算、任务调度、部署模式、故障恢复、存储管理等,主要面向数据批处理。Spark Core建立在统一的抽象RDD之上,因此能够以基本一致的方式应对不同的大数据处理场景。Spark Core通常简称为Spark。

2.Spark SQL

Spark SQL是用于处理结构化数据的组件,允许开发人员直接处理RDD,以及查询存储在 Hive、HBase 上的外部数据。Spark SQL 的一个重要特点是能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,并进行更复杂的数据分析。本书将在第3章中对Spark SQL框架及其使用做进一步介绍。

3.Spark Streaming

Spark Streaming是对实时数据流进行高吞吐量、容错处理的流式处理系统,其核心思想是将数据分解成一系列短小的批处理作业,每个短小的批处理作业都可以使用 Spark Core进行快速处理。Spark Streaming可以对多种数据源,如Kafka、Flume和传输控制协议(Transmission Control Protocol,TCP)套接字等进行类似map()、reduce()和join()等操作,并将结果保存到外部文件系统或数据库中,或应用到实时仪表盘上。本书将在第4章中对Spark Streaming框架及其使用做进一步介绍。

4.Spark MLlib

Spark MLlib机器学习库实现一些常见的机器学习算法和实用程序。Spark MLlib降低了机器学习的门槛,开发人员只要具备一定的理论知识就能进行机器学习相关的工作。本书将在第5章中对Spark MLlib库及其使用做进一步介绍。

5.Spark GraphX

Spark GraphX是Spark中用于图计算的API,可以认为是GraphLab和Pregel在Spark上的重写及优化。与其他分布式图计算框架相比,Spark GraphX最大的贡献是在Spark之上提供一栈式数据解决方案,可以方便且高效地完成图计算的一整套流水作业。