3.1 为什么说RDD和DataSet是Spark的灵魂
Spark建立在抽象的RDD上,使得它可以用一致的方式处理大数据不同的应用场景,把所有需要处理的数据转化成为RDD,然后对RDD进行一系列的算子运算,从而得到结果。RDD是一个容错的、并行的数据结构,可以将数据存储到内存和磁盘中,并能控制数据分区,且提供了丰富的API来操作数据。Spark一体化、多元化的解决方案极大地减少了开发和维护的人力成本和部署平台的物力成本,并在性能方面有极大的优势,特别适合于迭代计算,如机器学习和图计算;同时,Spark对Scala和Python交互式shell的支持也极大地方便了通过shell直接使用Spark集群来验证解决问题的方法,这对于原型开发至关重要,对数据分析人员有着无法拒绝的吸引力。
3.1.1 RDD的定义及五大特性剖析
RDD是分布式内存的一个抽象概念,是一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,能横跨集群所有节点并行计算,是一种基于工作集的应用抽象。
RDD底层存储原理:其数据分布存储于多台机器上,事实上,每个RDD的数据都以Block的形式存储于多台机器上,每个Executor会启动一个BlockManagerSlave,并管理一部分Block;而Block的元数据由Driver节点上的BlockManagerMaster保存,BlockManagerSlave生成Block后向BlockManagerMaster注册该Block,BlockManagerMaster管理RDD与Block的关系,当RDD不再需要存储的时候,将向BlockManagerSlave发送指令删除相应的Block。
BlockManager管理RDD的物理分区,每个Block就是节点上对应的一个数据块,可以存储在内存或者磁盘上。而RDD中的Partition是一个逻辑数据块,对应相应的物理块Block。本质上,一个RDD在代码中相当于数据的一个元数据结构,存储着数据分区及其逻辑结构映射关系,存储着RDD之前的依赖转换关系。
BlockManager在每个节点上运行管理Block(Driver和Executors),它提供一个接口检索本地和远程的存储变量,如memory、disk、off-heap。使用BlockManager前必须先初始化。
BlockManager.scala的部分源码如下所示。
1. private[spark] class BlockManager( 2. executorId: String, 3. rpcEnv: RpcEnv, 4. val master: BlockManagerMaster, 5. val serializerManager: SerializerManager, 6. val conf: SparkConf, 7. memoryManager: MemoryManager, 8. mapOutputTracker: MapOutputTracker, 9. shuffleManager: ShuffleManager, 10. val blockTransferService: BlockTransferService, 11. securityManager: SecurityManager, 12. numUsableCores: Int) 13. extends BlockDataManager with BlockEvictionHandler with Logging {
BlockManagerMaster会持有整个Application的Block的位置、Block所占用的存储空间等元数据信息,在Spark的Driver的DAGScheduler中,就是通过这些信息来确认数据运行的本地性的。Spark支持重分区,数据通过Spark默认的或者用户自定义的分区器决定数据块分布在哪些节点。RDD的物理分区是由Block-Manager管理的,每个Block就是节点上对应的一个数据块,可以存储在内存或者磁盘。而RDD中的partition是一个逻辑数据块,对应相应的物理块Block。本质上,一个RDD在代码中相当于数据的一个元数据结构(一个RDD就是一组分区),存储着数据分区及Block、Node等的映射关系,以及其他元数据信息,存储着RDD之前的依赖转换关系。分区是一个逻辑概念,Transformation前后的新旧分区在物理上可能是同一块内存存储。
Spark通过读取外部数据创建RDD,或通过其他RDD执行确定的转换Transformation操作(如map、union和groubByKey)而创建,从而构成了线性依赖关系,或者说血统关系(Lineage),在数据分片丢失时可以从依赖关系中恢复自己独立的数据分片,对其他数据分片或计算机没有影响,基本没有检查点开销,使得实现容错的开销很低,失效时只需要重新计算RDD分区,就可以在不同节点上并行执行,而不需要回滚(Roll Back)整个程序。落后任务(即运行很慢的节点)是通过任务备份,重新调用执行进行处理的。
因为RDD本身支持基于工作集的运用,所以可以使Spark的RDD持久化(persist)到内存中,在并行计算中高效重用。多个查询时,我们就可以显性地将工作集中的数据缓存到内存中,为后续查询提供复用,这极大地提升了查询的速度。在Spark中,一个RDD就是一个分布式对象集合,每个RDD可分为多个片(Partitions),而分片可以在集群环境的不同节点上计算。
RDD作为泛型的抽象的数据结构,支持两种计算操作算子:Transformation(变换)与Action(行动)。且RDD的写操作是粗粒度的,读操作既可以是粗粒度的,也可以是细粒度的。
RDD.scala的源码如下。
1. /** 每个RDD都有5个主要特性 2. *-分区列表 3. *-每个分区都有一个计算函数 4. *-依赖于其他RDD的列表 5. *- 数据类型(Key-Value)的RDD分区器 6. *- 每个分区都有一个分区位置列表 7. */ 8. abstract class RDD[T: ClassTag]( 9. @transient private var _sc: SparkContext, 10. @transient private var deps: Seq[Dependency[_]] 11. ) extends Serializable with Logging {
其中,SparkContext是Spark功能的主要入口点,一个SparkContext代表一个集群连接,可以用其在集群中创建RDD、累加变量、广播变量等,在每一个可用的JVM中只有一个SparkContext,在创建一个新的SparkContext之前,必须先停止该JVM中可用的SparkContext,这种限制可能最终会被修改。SparkContext被实例化时需要一个SparkConf对象去描述应用的配置信息,在这个配置对象中设置的信息,会覆盖系统默认的配置。
RDD五大特性:
(1)分区列表(a list of partitions)。Spark RDD是被分区的,每一个分区都会被一个计算任务(Task)处理,分区数决定并行计算数量,RDD的并行度默认从父RDD传给子RDD。默认情况下,一个HDFS上的数据分片就是一个Partition,RDD分片数决定了并行计算的力度,可以在创建RDD时指定RDD分片个数,如果不指定分区数量,当RDD从集合创建时,则默认分区数量为该程序所分配到的资源的CPU核数(每个Core可以承载2~4个Partition),如果是从HDFS文件创建,默认为文件的Block数。
(2)每一个分区都有一个计算函数(a function for computing each split)。每个分区都会有计算函数,Spark的RDD的计算函数是以分片为基本单位的,每个RDD都会实现compute函数,对具体的分片进行计算,RDD中的分片是并行的,所以是分布式并行计算。有一点非常重要,就是由于RDD有前后依赖关系,遇到宽依赖关系,例如,遇到reduceBykey等宽依赖操作的算子,Spark将根据宽依赖划分Stage,Stage内部通过Pipeline操作,通过Block Manager获取相关的数据,因为具体的split要从外界读数据,也要把具体的计算结果写入外界,所以用了一个管理器,具体的split都会映射成BlockManager的Block,而具体split会被函数处理,函数处理的具体形式是以任务的形式进行的。
(3)依赖于其他RDD的列表(a list of dependencies on other RDDs)。RDD的依赖关系,由于RDD每次转换都会生成新的RDD,所以RDD会形成类似流水线的前后依赖关系,当然,宽依赖就不类似于流水线了,宽依赖后面的RDD具体的数据分片会依赖前面所有的RDD的所有的数据分片,这时数据分片就不进行内存中的Pipeline,这时一般是跨机器的。因为有前后的依赖关系,所以当有分区数据丢失的时候,Spark会通过依赖关系重新计算,算出丢失的数据,而不是对RDD所有的分区进行重新计算。RDD之间的依赖有两种:窄依赖(Narrow Dependency)、宽依赖(Wide Dependency)。RDD是Spark的核心数据结构,通过RDD的依赖关系形成调度关系。通过对RDD的操作形成整个Spark程序。
RDD有Narrow Dependency和Wide Dependency两种不同类型的依赖,其中的Narrow Dependency指的是每一个parent RDD的Partition最多被child RDD的一个Partition所使用,而Wide Dependency指的是多个child RDD的Partition会依赖于同一个parent RDD的Partition。可以从两个方面来理解RDD之间的依赖关系:一方面是该RDD的parent RDD是什么;另一方面是依赖于parent RDD的哪些Partitions;根据依赖于parent RDD的Partitions的不同情况,Spark将Dependency分为宽依赖和窄依赖两种。Spark中宽依赖指的是生成的RDD的每一个partition都依赖于父RDD的所有partition,宽依赖典型的操作有groupByKey、sortByKey等,宽依赖意味着shuffle操作,这是Spark划分Stage边界的依据,Spark中宽依赖支持两种Shuffle Manager,即HashShuffleManager和SortShuffleManager,前者是基于Hash的Shuffle机制,后者是基于排序的Shuffle机制。Spark 2.2现在的版本中已经没有Hash Shuffle的方式。
(4)key-value数据类型的RDD分区器(-Optionally,a Partitioner for key-value RDDS),控制分区策略和分区数。每个key-value形式的RDD都有Partitioner属性,它决定了RDD如何分区。当然,Partition的个数还决定每个Stage的Task个数。RDD的分片函数,想控制RDD的分片函数的时候可以分区(Partitioner)传入相关的参数,如HashPartitioner、RangePartitioner,它本身针对key-value的形式,如果不是key-value的形式,它就不会有具体的Partitioner。Partitioner本身决定了下一步会产生多少并行的分片,同时,它本身也决定了当前并行(parallelize)Shuffle输出的并行数据,从而使Spark具有能够控制数据在不同节点上分区的特性,用户可以自定义分区策略,如Hash分区等。Spark提供了“partitionBy”运算符,能通过集群对RDD进行数据再分配来创建一个新的RDD。
(5)每个分区都有一个优先位置列表(-Optionally,a list of preferred locations to compute each split on)。它会存储每个Partition的优先位置,对于一个HDFS文件来说,就是每个Partition块的位置。观察运行spark集群的控制台会发现Spark的具体计算,具体分片前,它已经清楚地知道任务发生在什么节点上,也就是说,任务本身是计算层面的、代码层面的,代码发生运算之前已经知道它要运算的数据在什么地方,有具体节点的信息。这就符合大数据中数据不动代码动的特点。数据不动代码动的最高境界是数据就在当前节点的内存中。这时有可能是memory级别或Alluxio级别的,Spark本身在进行任务调度时候,会尽可能将任务分配到处理数据的数据块所在的具体位置。据Spark的RDD.Scala源码函数getPreferredLocations可知,每次计算都符合完美的数据本地性。
RDD类源码文件中的4个方法和一个属性对应上述阐述的RDD的5大特性。
RDD.scala的源码如下。
1. 2. /** 3. * :: DeveloperApi :: 4. * 通过子类实现给定分区的计算 5. */ 6. @DeveloperApi 7. def compute(split: Partition, context: TaskContext): Iterator[T] 8. 9. /** * 通过子类实现,返回一个RDD分区列表,这个方法仅只被调用一次,它是安全地执行一次 * 耗时计算数组中的分区必须符合以下属性设置 10. * 'rdd.partitions.zipWithIndex.forall { case (partition, index) => * partition.index == index }' 11. */ 12. protected def getPartitions: Array[Partition] 13. 14. /** *返回对父RDD的依赖列表,这个方法仅只被调用一次,它是安全地执行一次耗时计算 15. */ 16. protected def getDependencies: Seq[Dependency[_]] = deps 17. 18. /** * 可选的,指定优先位置,输入参数是split分片,输出结果是一组优先的节点位置 19. */ 20. protected def getPreferredLocations(split: Partition): Seq[String] = Nil 21. 22. /** 可选的,通过子类来实现。指定如何分区 */ 23. @transient val partitioner: Option[Partitioner] = None
其中,TaskContext是读取或改变执行任务的环境,用org.apache.spark.TaskContext.get()可返回当前可用的TaskContext,可以调用内部的函数访问正在运行任务的环境信息。Partitioner是一个对象,定义了如何在key-Value类型的RDD元素中用Key分区,从0到numPartitions-1区间内映射每一个Key到Partition ID。Partition是一个RDD的分区标识符。Partition.scala的源码如下。
1. trait Partition extends Serializable { 2. /** * 获取父RDD的分区索引 3. */ 4. 5. def index: Int 6. 7. //最好默认实现HashCode 8. override def hashCode(): Int = index 9. override def equals(other: Any): Boolean = super.equals(other) 10. }
3.1.2 DataSet的定义及内部机制剖析
DataSet是可以并行使用函数或关系操作转换特定域对象的强类型集合。每个DataSet有一个非类型化的DataFrame。DataFrame是Dataset[Row]的别名。DataSet中可用的算子分为转换算子和行动算子。转换算子可以产生新的DataSet;行动算子将触发计算和返回结果。转换算子包括map、filter、select和聚集算子,如groupBy。行动算子包括count、show,或者将数据保存到文件系统中。
DataSet是“懒加载”的,即只有在行动算子被触发时,才进行计算操作。本质上,DataSet表示一个逻辑计划,它描述了生成数据所需的计算。当行动算子被触发时,Spark查询优化器将优化逻辑计划,生成一个并行、分布式有效执行的物理计划。使用explain函数可以查看逻辑计划以及优化的物理计划。
为了有效地支持特定领域的对象,编码器[Encoder]是必需的。编码器将特定类型T转换为Spark的内部类型。例如,给定一个类Person有两个属性,包括‘名字’(string)和‘年龄’(int),编码器告诉Spark在运行生成代码时将Person对象序列化成二进制数据。二进制数据通常占用更少的内存以及更优化的数据处理效率(如列存储格式)。可以使用schema函数来查看了解数据的内部二进制结构。
通常有两种创建数据集Dataset的方法。
方法一:最常见的方式是Spark在SparkSession中使用read功能读入存储系统中的文件。例如,Scala版本:可以使用spark.read.parquet方式读入parquet格式的文件,使用as方法转换为[Person]数据类型的DataSet。Java版本:使用spark.read.parquet方式读入parquet格式的文件,在as方法中使用编码器对Person.class数据类型进行编码,生成DataSet。
1. * {{{ 2. * val people = spark.read.parquet("...").as[Person] //Scala 3. * Dataset<Person> people = spark.read().parquet("...").as(Encoders .bean(Person.class)); //Java 4. * }}}
方法二:DataSet也可以通过现有DataSet进行转换创建。例如,在现有的DataSet中使用过滤算子,创建一个新的数据集。下面看一个生成新DataSet的例子。Scala版本:在已有的Dataset[Person]中使用map转换函数,获取Person中的姓名,将生成新的Dataset[String];Java版本:在已有的数据集Dataset<String>中使用map转换函数,通过(Person p) -> p.name获取Person中的姓名,编码器指定姓名属性的类型为String类型,生成新的姓名的数据集Dataset<String>。(Person p) -> p.name这种写法为Lambda表达式,这是Java 8之后才有的新特性。
1. ** {{{ 2. * val names = people.map(_.name) //在Scala中,names是一个String类型的DataSet 3. * Dataset<String> names = people.map((Person p) -> p.name, Encoders. STRING)); 4. //in Java 8 5. * }}}
通过各种特定领域的语言(DSL)定义的功能: Dataset(类), [Column]和[functions]等非类型化数据集的操作也可以。这些操作非常类似于R或Python语言在数据表中的抽象操作。在scala中,我们使用apply方法,从people的数据集中选择“年龄”这一列;在Java中使用"col"方法,通过people.col("age")获取到年龄列。
1. * 从DataSet中选择一列,在Scala中使用apply方法,在Java中使用col方法 2. * {{{ 3. * val ageCol = people("age") //在Scala中 4. * Column ageCol = people.col("age"); //在Java中 5. * }}}
注意,[Column]类型也可以通过它的各种函数来操作。例如,以下代码在人员数据集中创建一个新的列,每个人的年龄增加10。在Scala中使用的方法是people("age") + 10;在Java中使用plus方法。
1. * {{{ 2. * //下面创建一个新的列,每个人的年龄增加10 3. * people("age") + 10 //在Scala中 4. * people.col("age").plus(10); //在Java中 5. * }}}
下面是一个更具体的例子:使用spark.read.parquet分别读入parquet格式的人员数据及部门数据,过滤出年龄大于30岁的人员,根据部门ID和部门数据进行join,然后按照姓名、性别分组,再使用agg方法,调用内置函数avg计算出部门中的平均工资、人员的最大年龄。Scala版本代码如下。
1. * {{{ 2. * //使用SparkSession 创建 Dataset[Row] 3. * val people = spark.read.parquet("...") 4. * val department = spark.read.parquet("...") 5. * 6. * people.filter("age > 30") 7. * .join(department, people("deptId") === department("id")) 8. * .groupBy(department("name"), "gender") 9. * .agg(avg(people("salary")), max(people("age"))) 10. * }}}
以上例子的Java版本代码如下。
1. * {{{ 2. * //To create Dataset<Row> using SparkSession 3. * Dataset<Row> people = spark.read().parquet("..."); 4. * Dataset<Row> department = spark.read().parquet("..."); 5. * 6. * people.filter("age".gt(30)) 7. * .join(department, people.col("deptId").equalTo(department("id"))) 8. * .groupBy(department.col("name"), "gender") 9. * .agg(avg(people.col("salary")), max(people.col("age"))); 10. * }}}