2.2 Spark(内存计算框架)
随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐、用户行为分析等。因此,Hadoop生态系统又发展出以Spark为代表的新计算框架。相比MapReduce,Spark速度快,开发简单,并且能够同时兼顾批处理和实时数据分析。
Apache Spark是加州大学伯克利分校的AMPLabs开发的开源分布式轻量级通用计算框架,并于2014年2月成为Apache的顶级项目。由于Spark基于内存设计,使得它拥有比Hadoop更高的性能,并且对多语言(Scala、Java、Python)提供支持。Spark有点类似Hadoop MapReduce框架。Spark拥有Hadoop MapReduce所具有的优点,但不同于MapReduce的是,作业中间输出的结果可以保存在内存中,从而不再需要读写HDFS(MapReduce的中间结果要保存在文件系统上),因此,在性能上,Spark能比MapReduce框架快100倍左右(见图2-5),排序100TB的数据只需要20分钟左右。正是因为Spark主要是在内存中执行,所以Spark对内存的要求非常高,一个节点通常需要配置24GB的内存。在业界,我们有时把MapReduce称为批处理计算框架,把Spark称为实时计算框架、内存计算框架或流式计算框架。
Hadoop使用数据复制来实现容错性(I/O高,即输入/输出高),而Spark使用RDD(Resilient Distributed Datasets,弹性分布式数据集)数据存储模型来实现数据的容错性。RDD是只读的、分区记录的集合。如果一个RDD的一个分区丢失,RDD含有如何重建这个分区的相关信息。这就避免了使用数据复制来保证容错性的要求,从而减少了对磁盘的访问。通过RDD,后续步骤如果需要相同数据集时就不必重新计算或从磁盘加载,这个特性使得Spark非常适合流水线式的处理。
虽然Spark可以独立于Hadoop来运行,但是Spark还是需要一个集群管理器和一个分布式存储系统。对于集群管理,Spark支持Hadoop YARN、Apache Mesos和Spark原生集群。对于分布式存储,Spark可以使用HDFS、Cassandra、OpenStack Swift和Amazon S3。Spark支持Java、Python和Scala(Scala是Spark最推荐的编程语言,Spark和Scala能够紧密集成,Scala程序可以在Spark控制台上执行)。应该说,Spark和Hadoop生态系统中的上述工具紧密集成。Spark可以与Hadoop上的常用数据格式(如:Avro和Parquet)进行交互,能读写HBase等NoSQL数据库,它的流处理组件Spark Streaming能连续从Flume和Kafka之类的系统上读取数据,它的SQL库Spark SQL能和Hive MetaStore交互。
Spark可用来构建大型的、低延迟的数据分析应用程序。如图2-6所示,Spark包含了如下的库:Spark SQL,Spark Streaming, MLlib(用于机器学习)和GraphX。其中Spark SQL和Spark Streaming最受欢迎,大概60%左右的用户在使用这两个中的一个。而且Spark还能替代MapReduce成为Hive的底层执行引擎。
Spark的内存缓存使它适合于迭代计算。机器学习算法需要多次遍历训练集,可以将训练集缓存在内存里。在对数据集进行探索时,数据科学家可以在运行查询的时候将数据集放在内存,这样就节省了访问磁盘的开销。
图2-5 性能比较(数据来源:http://spark.apache.org/)
图2-6 Spark组件
虽然Spark目前被广泛认为是下一代Hadoop,但是Spark本身的复杂性也困扰着开发人员。Spark的批处理能力仍然比不过MapReduce,Spark SQL和Hive的SQL功能相比还有一定的差距,Spark的统计功能与R语言相比还没有可比性。
2.2.1 Spark SQL
Spark的存在是为了以快于MapReduce的速度进行分布式计算。Spark的设计者很快就了解到,大家还是想要用SQL来访问数据。Spark SQL就出现了,它是基于Spark引擎对HDFS上的数据集或已有的RDD执行SQL查询。有了Spark SQL就能在Spark程序里用SQL语句操作数据了。例如:
val sqlContext = new org.apache.spark.sql.SQLContext(sc) val persons = sqlContext.sql("SELECT name FROM people WHERE age >= 18 AND age <= 29")
上述两行代码是Scala的语法。这两行都声明了两个新变量。与Java不同的是,Scala在变量声明时指定变量类型。这个功能在Scala编程语言中称为类型推断。Scala会从上下文中分析出变量类型。只要在Scala中定义新变量,必须在变量名称之前加上val或var。带有val的变量是不可变变量,一旦给不可变变量赋值,就不能改变。而以var开头的变量则可以改变值。
Spark SQL在Spark圈中非常流行。Spark SQL的前身是Shark。我们简短回顾一下Shark的整个发展历史。对于熟悉RDBMS但又不理解MapReduce的技术人员来说,Hive提供快速上手的工具,它是第一个运行在Hadoop上的SQL工具。Hive基于MapReduce,但是MapReduce的中间过程消耗了大量的I/O(输入/输出),影响了运行效率。为了提高在Hadoop上的SQL的效率,一些工具开始产生,其中表现较为突出的是:MapR的Drill、Cloudera的Impala、Shark。其中Shark是伯克利实验室Spark生态环境的组件之一,它修改了内存管理、物理计划、执行三个模块,并使之能运行在Spark引擎上,从而使得SQL查询的速度得到10~100倍的提升。Shark依赖于Hive,例如,Shark采用Hive的语法解析器和查询优化器,这制约了Spark各个组件的相互集成,所以提出了Spark SQL项目。2014年6月1日,Shark项目组宣布停止对Shark的开发,将所有资源放在Spark SQL项目上。Spark SQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive,Spark SQL体系架构如图2-7所示。
图2-7 Spark SQL体系架构
Spark SQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了Spark SQL代码。由于摆脱了对Hive的依赖性,Spark SQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。在数据兼容方面,Spark不但兼容Hive,还可以从RDD、parquet文件、JSON文件中获取数据,未来版本甚至支持获取RDBMS数据以及Cassandra等NOSQL数据。在性能优化方面,除了采取内存列存储、字节码生成技术(Bytecode Generation)等优化技术外,将会引进Cost Model对查询进行动态评估、获取最佳物理计划,等等。在组件扩展方面,无论是SQL的语法解析器还是优化器都可以重新定义并进行扩展。
2.2.2 Spark Streaming
Spark Streaming是基于Spark引擎对数据流进行不间断处理。只要有新的数据出现,Spark Streaming就能对其进行准实时(数百毫秒级别的延时)的转换和处理。Spark Streaming的工作原理是在小间隔里对数据进行汇集从而形成小批量,然后在小批量数据上运行作业。
使用Spark Streaming编写的程序与编写Spark程序非常相似,在Spark程序中,主要通过操作RDD提供的接口,如Map、Reduce、Filter等,实现数据的批处理。而在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口类似。下面我们来看一个应用案例。
假定有一个电商网站,它买了几个搜索引擎(如百度)的很多关键词。当用户在各大搜索引擎上搜索数据时,搜索引擎会根据购买的关键字导流到电商网站的相关产品页面上,用户可能会购买这些产品。现在需要分析的是哪些搜索词带来的订单比较多,然后根据分析结果多投放这些转化率比较高的关键词,从而为电商网站带来更多的收益。
原先的做法是每天凌晨分析前一天的日志数据,这种方式实时性不高,而且由于日志量比较大,单台机器处理已经达到了瓶颈。现在选择了使用Spark Streaming + Kafka+Flume来处理这些日志,并且运行在YARN上以应对遇到的问题。
如图2-8所示,业务日志是分布到各台服务器上。由于业务量比较大,所以日志都是按小时切分的,我们采用Flume实时收集这些日志(图中步骤1),然后发送到Kafka集群(图中步骤2)。这里为什么不直接将原始日志直接发送到Spark Streaming呢?这是因为,如果Spark Streaming挂掉了,也不会影响到日志的实时收集。
图2-8 Spark Streaming应用案例
日志实时到达Kafka集群后,我们再通过Spark Streaming实时地从Kafka拉数据(图中步骤3),然后解析日志,并根据一定的逻辑过滤数据和分析订单和搜索词的关联性。我们使用Spark的KafkaUtils.createDirectStream API从Kafka中拉数据,代码片段如下:
val sparkConf = new SparkConf().setAppName("OrderSpark") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(2)) val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerAddress,"group.id" -> groupId) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](ssc, kafkaParams, Set(topic))
在上述代码中返回的messages是一个刚刚创建DStream,它是对RDD的封装,其上的很多操作都类似于RDD。 createDirectStream函数是Spark 1.3.0开始引入的,其内部实现是调用Kafka的低层次API,Spark本身维护Kafka偏移量等信息,所以可以保证数据零丢失。
为了能够在Spark Streaming程序挂掉后又能从断点处恢复,我们每隔2秒进行一次Checkpoint,这些Checkpoint文件存储在HDFS上(图中步骤4)的checkpoint目录中。我们可以在程序里面设置checkpoint目录(注意目录名的首字母是小写的):
ssc.checkpoint(checkpointDirectory)
如果需要从checkpoint目录中恢复,可以使用StreamingContext中的getOrCreate函数。为了让分析结果共享给其他系统使用,可以将分析后的数据重新发送到Kafka(图中步骤5)。最后,再单独启动了一个程序从Kafka中实时地将分析好的数据存放到MySQL中用于持久化存储(图中步骤6)。
Apache Spark是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象。Spark最大的特点就是快(Lightning-fast),比Hadoop MapReduce的处理速度快100倍。此外,Spark提供了简单易用的API,几行代码就能实现WordCount。本章介绍Spark的框架,包括Spark shell、RDD、Spark SQL、Spark Streaming等的基本使用方法。