大数据技术入门(第2版)
上QQ阅读APP看书,第一时间看更新

2.1 Hadoop框架

对于大数据而言,Hadoop就是用大量的廉价机器组成的集群去执行大规模运算,这包括大规模的计算和大规模的存储。近年来,Hadoop已经逐渐成为大数据分析领域最受欢迎的解决方案,像eBay这样大型的电子商务企业,一直在使用Hadoop技术从数据中挖掘价值,例如,通过大数据提高用户的搜索体验,识别和优化精准广告投放,以及通过点击流分析以理解用户如何使用它的在线市场平台等。目前,eBay的Hadoop集群总节点数超过10000多个,存储容量超过170PB。

Hadoop框架是用Java编写的,它的核心是HDFS(Hadoop分布式文件系统)和MapReduce。HDFS为大数据提供了存储,而MapReduce为大数据提供了计算。HDFS可以保存比一个机器的可用存储空间更大的文件,这是因为HDFS是一套具备可扩展能力的存储平台,能够将数据分发至成千上万分布式节点的低成本服务器之上,并让这些硬件设备以并行方式共同处理同一任务。Hadoop框架实现了名为MapReduce的编程范式,这个范式实现了大规模的计算:应用程序被分割成许多小部分,而每个部分在集群中的节点上并行执行(每个节点处理自己的数据)。MapReduce和分布式文件系统的设计,使得应用程序能够在成千上万台独立计算的电脑上运行并存取PB级的数据。

Hadoop框架包括Hadoop内核、MapReduce、HDFS和Hadoop YARN等。Hadoop也是一个生态系统,在这里面有很多的组件。除了HDFS和MapReduce,有NoSQL数据库的HBase,有数据仓库工具Hive,有Pig工作流语言,有在分布式系统中扮演重要角色的Zookeeper,有内存计算框架的Spark,有数据采集的Flume和Kafka。总之,用户可以在Hadoop平台上开发和部署任何的大数据应用程序。

总之,Hadoop是一个能够让用户轻松架构和使用的分布式计算平台。用户可以轻松地在Hadoop上开发和运行处理海量数据的应用程序。它主要有以下几个优点。

· 高扩展性:Hadoop可以扩展至数千个节点,对数据持续增长,数据量特别巨大的需求很合适。

· 高效:Hadoop能够在节点之间动态地移动数据,并保证各个节点的动态平衡,因此处理速度非常快。

· 高容错性:Hadoop能够自动保存数据的多个副本,并且能够自动将失败的任务重新分配。

· 低成本:Hadoop是开源项目,不仅从软件上节约成本,而且,Hadoop对硬件上的要求也不高,因此也从硬件上节约了一大笔成本。

2.1.1 HDFS(分布式文件系统)

HDFS是Hadoop Distribute File System(Hadoop分布式文件系统)的简称。HDFS是一个可运行在廉价机器上的可容错分布式文件系统。它既有分布式文件系统的共同点,又有自己的一些明显的特征。在海量数据的处理中,我们经常碰到一些大文件(几百GB甚至TB级别)。在常规的系统上,这些大文件的读和写需要花费大量的时间。HDFS优化了大文件的流式读取方式,它把一个大文件分割成一个或者多个数据块(默认的大小为64MB),分发到集群的节点上,从而实现了高吞吐量的数据访问,这个集群可有数百个节点,并支持千万级别的文件。因此,HDFS非常适合大规模数据集上的应用。

HDFS设计者认为硬件故障是经常发生的,所以采用了块复制的概念,让数据在集群的节点间进行复制(HDFS有一个复制因子参数,默认为3),从而实现了一个高度容错性的系统。当硬件出现故障(如硬盘坏了)的时候,复制的数据就可以保证数据的高可用性。正是因为这个容错的特点,HDFS适合部署在廉价的机器上。当然,一块数据和它的备份不能放在同一个机器上,否则这台机器挂了,备份也同样没办法找到。HDFS用一种机架位感知的办法,先把一份复制放入同机架上的机器,然后再复制一份到其他服务器,也许是不同数据中心的,这样如果某个数据点坏了,就从另一个机架上调用。除了机架位感知的办法,现在还有基于Erasure Code(纠删码)的方法。这本来是用在通信容错领域的办法,可以节约空间又达到容错的目的,感兴趣的读者可以去查询相关材料。

HDFS是一个主从结构。如图2-1所示,一个HDFS集群是由一个名字节点(NameNode)和多个数据节点(DataNode)组成,它们通常是在不同的机器上。HDFS将一个文件分割成一个或多个块,这些块被存储在一组数据节点中。名字节点用来操作文件命名空间的文件或目录操作,如:打开、关闭、重命名,等等。它同时确定块与数据节点的映射。数据节点负责来自文件系统客户的读写请求。数据节点同时还要执行块的创建、删除和来自名字节点的块复制指令。

一个名字节点保存着集群上所有文件的目录树,以及每个文件数据块的位置信息,它是一个管理文件命名空间和客户端访问文件的主服务器,但是它并不真正存储文件数据本身。数据节点通常是一个节点或一个机器,它真正的存放着文件数据(和复制数据)。它管理着从NameNode分配过来的数据块,是来管理对应节点的数据存储。HDFS对外开放文件命名空间并允许用户数据以文件形式存储。图2-1显示了一个HDFS的架构:

图2-1 HDFS架构

· 客户端应用:每当需要定位一个文件或添加/复制/移动/删除一个文件时,与名字节点交互,获取文件位置信息(返回相关的数据节点信息);与数据节点交互,读取和写入数据。

· 名字节点(NameNode):HDFS文件系统的核心节点,保存着集群中所有数据块位置的一个目录。它管理HDFS的名称空间和数据块映射信息,配置副本策略,处理客户端请求。

· 数据节点(DataNode):存储实际的数据,汇报存储信息给NameNode。启动后,DataNode连接到NameNode,响应NameNode的文件操作请求。一旦NameNode提供了文件数据的位置信息,客户端应用可以直接与DataNode联系。DataNode并不能感知集群中其他DataNode的存在。DataNode之间可以直接通信,数据复制就是在DataNode之间完成的。

名字节点和数据节点都是运行在普通的机器之上的软件,一般都用Linux操作系统。因为HDFS是用Java编写的,任何支持Java的机器都可以运行名字节点或数据节点,我们很容易将HDFS部署到大范围的机器上。典型的部署是由一个专门的机器来运行名字节点软件,集群中的其他每台机器运行一个数据节点实例。体系结构虽然不排斥在一个机器上运行多个数据节点的实例,但是实际的部署不会有这种情况。

集群中只有一个名字节点极大地简单化了系统的体系结构。名字节点是仲裁者和所有HDFS元数据的仓库,用户的实际数据不经过名字节点。在集群中,我们一般还会配置Secondary NameNode。这个Secondary NameNode下载NameNode的image文件和editlogs,并对它们做本地归并,最后再将归并完的image文件发回给NameNode。Secondary NameNode并不是NameNode的热备份,在NameNode出故障时并不能工作。

2.1.2 MapReduce(分布式计算框架)

MapReduce是一种编程模型(也称为计算模型),用于大数据量的批处理计算。读者需要注意的是,我们现在很少直接使用MapReduce来进行编程,但是理解MapReduce的思想是理解分布式计算的关键,新的分布式计算框架都是对MapReduce的改进和提升。如图2-2所示,MapReduce的思想是将批量处理的任务主要分成两个阶段(Map和Reduce阶段),所谓的Map阶段就是把数据生成“键-值”对(Key-Value Pair),按键排序。中间有一步操作叫Shuffle,把同样的Key(键)运输到同一个Reducer上面去。在Reducer上,因为都是同一个Key,就直接可以做聚合(算出总和),最后把结果输出到HDFS上。应用开发者只需编写map()函数和reduce()函数,中间的排序、Shuffling网络传输、容错处理,在MapReduce框架中都已经做好了。

图2-2 MapReduce处理示例

图2-2所示的例子是计算各个单词出现的次数。MapReduce通常将输入的数据集分割为一些独立的数据块(Splitting步骤),然后由一些Map任务在服务器集群上以完全并行的方式进行处理,这些Map任务的计算结果最后通过Reduce任务合并在一起来计算最终的结果。具体来说,Map对数据进行指定的操作,生成“键-值”对形式的中间结果(Mapping步骤,“Deer,1”就是一个“键-值”对,这个中间结果一般存放在文件系统上)。MapReduce框架有Sort(排序)操作会按照“键”来对map()函数所产生的“键-值”对进行排序(图2-2所示的排序结果),然后Shuffle(发送)操作将所有具有相同键的“键-值”对发送给同一个reduce()函数。MapReduce框架对中间结果按照“键-值”排序(Shuffling步骤),Reduce则对中间结果中相同“键”的所有“值”进行规约(Reduing步骤),以得到最终结果。最终结果一般也存放在文件系统上(如Final result步骤)。

MapReduce非常适合在大量计算机组成的分布式并行环境里进行数据处理。MapReduce框架负责任务的调度和监控,并重新执行这些失败的任务。图2-3显示了一个MapReduce的使用场景。

图2-3 MapReduce进程示例

· JobTracker:这是主节点,只有一个,它管理所有作业/任务的监控、错误处理等;它将任务分解成一系列子任务(Map任务、Reduce任务、Shuffle操作),并分派给TaskTracker。

· TaskTracker:这是从节点,可以有多个,它们接收来自JobTracker的Map Task、Reduce Task和Shuffle操作,并执行之;它们与JobTracker交互,汇报任务状态。

· Map Task:解析每条数据记录,传递给用户编写的map()函数并执行,将输出结果写入本地磁盘(如果为map-only作业,直接写入HDFS)。

· Reduce Task:从Map Task的执行结果中,对数据进行排序,将数据按照分组传递给用户编写的reduce()函数执行。

我们以一个实际的案例来说明MapReduce处理流程。如果要统计一下过去70年人民日报出现最多的几个词(未必是一个,可能有好几个词出现的次数一样多),那怎么使用MapReduce处理呢?我们首先想到的是,可以写一个程序,把所有人民日报按顺序遍历一遍,统计每一个遇到的词的出现次数,最后就可以知道哪几个词最热门了。但是,因为人民日报的数量很大,这个方法肯定耗时不少。既然MapReduce的本质就是把作业交给多个计算机去完成。那么,我们可以使用上述的程序,部署到N台机器上去,然后把70年的报纸分成N份,一台机器跑一个作业,然后把N个运行结果进行整合。MapReduce本质上就是如此,但是如何拆分70年的报纸文件,如何部署程序到N台机器上,如何整合结果,这都是MapReduce框架定义好的。我们只要定义好这个Map和Reduce任务,其他都交给MapReduce。

下面我们使用MapReduce伪代码来说明如何实现map()和reduce()这两个函数。map()函数和reduce()函数是需要我们自己实现的,这两个函数定义了Map任务和Reduce任务。MapReduce计算框架中输入和输出的基本数据结构是“键-值”对(再次提醒读者的是,现在很少直接使用MapReduce框架来编写程序了,而是使用基于MapReduce框架的工具来编写,或者直接用Spark等工具来编写。下面的map()函数和reduce()函数的伪代码是帮助读者来理解整个MapReduce框架的思想)。

1.map()函数

接受一个“键-值”对,产生一组中间“键-值”对。

在上面的代码中,map()函数接受的“键”是文件名(假定文件名是日期,如:20160601则表明是2016年6月1日的人民日报电子版文件),“值”是文件的内容,map()函数逐个遍历词语,每遇到一个词w,就产生一个“键-值”对<w, "1">,表示这个w词出现了一次。

2.reduce()函数

接受一个“键”(一个词),以及相关的一组“值”(这一组值是所有Map对于这个词计算出来的频数的一个集合),整个输入数据也是一个“键-值”对。将这组值进行合并产生一组规模更小的值(通常只有一个或零个值)。在我们这个例子中,是将值集合中的频数进行求和,然后记录每个词及其出现的总频数。

MapReduce将“键”相同(都是词w)的“键-值”对传给reduce()函数,这样reduce()函数接受的“键”就是单词w,“值”是字符串“1”的列表(“键”为w的“键-值”对的个数),然后将这些“1”累加就得到单词w出现的次数,最后把结果存储在HDFS上。

MapReduce支持C/C++、Java、Ruby、Perl和Python编程语言。开发人员可以使用MapReduce库来创建任务。至于节点之间的通信和协调,输入数据集的切割,在不同机器之间的程序执行调度,处理错误等,这些都由框架来完成,开发人员无需处理。map()和reduce()函数会自动在多个服务器节点上自动并行执行。即使开发人员完全没有并行和分布式系统的经验和知识,也能轻松地利用大型分布式系统的资源。MapReduce革新了海量数据计算的方式,为运行在成百上千台机器上的并行程序提供了简单的编程模型。MapReduce几乎可以做到线性扩展:随着数据量的增加,可以通过增加更多的计算机来保持作业时间不变。MapReduce容错性强,它将工作拆分成多个小任务后,能很好地处理任务失败。上面例子的完整代码如下:

2.1.3 YARN(集群资源管理器)

从Hadoop 2开始,MapReduce被一个改进的版本所替代,这个新版本叫作MapReduce 2.0 (MRv2)或YARN(Yet Another Resource Negotiator,中文为“另一种资源协调者”)。YARN是一种新的Hadoop资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。我们先来回顾一下老版本的MapReduce的流程和设计思路。从图2-3可以看出:

(1)首先客户端应用提交了一个作业(Job),作业的信息会发送到Job Tracker(作业追踪器)中,Job Tracker是MapReduce框架的中心,它与集群中的机器定时通信(通过心跳机制),确定哪些程序在哪些机器上执行,管理所有作业失败、重启等操作。

(2)TaskTracker(任务追踪器)在MapReduce集群中每台机器都有,主要是监视所在机器的资源情况。

(3)TaskTracker同时监视当前机器上的任务(Task)的运行状况。TaskTracker把这些信息通过心跳发送给JobTracker,JobTracker会搜集这些信息,以便确定新提交的作业运行在哪些机器上。

MapReduce架构简单明了,在最初推出的几年中就收获了众多的成功案例,得到业界广泛的支持和肯定,但随着分布式系统集群的规模和其工作负荷的增长,原框架的问题逐渐浮出水面,主要的问题集中如下:

(1)JobTracker是MapReduce的集中处理点,存在单点故障。

(2)JobTracker承担了太多的任务,造成了过多的资源消耗,当作业非常多的时候,会造成很大的内存开销,也增加了JobTracker崩溃的风险。业界的共识是老版本的MapReduce的上限只能支持4000个节点主机。

(3)在TaskTracker端,只以Map/Reduce task的数目作为资源的表示过于简单,没有考虑到CPU和内存的占用情况,如果两个需要消耗大量内存的任务被调度到了一块,就很容易出现Java的OOM(即内存耗尽)。

(4)在TaskTracker端,把资源强制划分为map task slot和reduce task slot。当系统中只有map task或者只有reduce task的时候,这会造成资源的浪费,也就是前面提到的集群资源利用的问题。

YARN最初是为了修复MapReduce实现里的明显不足,并对可伸缩性(支持一万个节点和二十万个内核的集群)、可靠性和集群利用率进行了提升。YARN把Job Tracker的两个主要功能(资源管理和作业调度/监控)分成了两个独立的服务程序——全局的资源管理(Resource Manager,简称为RM)和针对每个应用的App Master(AM),这里说的应用要么是传统意义上的MapReduce任务,要么是任务的有向无环图(DAG)。Resource Manager和每一台机器的节点管理服务器(Node Manager)能够管理用户在那台机器上的进程并能对计算进行组织。其架构图如图2-4所示。

图2-4 YARN架构

资源管理器(Resource Manager)支持分层级的应用队列,这些队列享有集群一定比例的资源。它是一个调度器,基于应用程序对资源的需求进行调度的。每一个应用程序需要不同类型的资源,因此就需要不同的容器(Container)。资源包括:内存、CPU、磁盘、网络,等等。可以看出,这同老版本MapReduce的固定类型的资源使用模型有显著区别。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。

图2-4中节点管理器(Node Manager)是每一台机器的代理,是执行应用程序的容器,它监控应用程序的资源使用情况(CPU、内存、硬盘、网络)并且向资源管理器汇报。每一个应用的应用程序主控器(Application Master)是一个框架库,它结合从资源管理器获得的资源和节点管理器协同工作来运行和监控任务。每一个应用的主控器向资源管理器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败。

资源管理器是一个中心的服务,它是调度和启动每一个作业(Job)所属的应用服务器,另外监控应用程序主控器的存在情况。资源管理器负责作业与资源的调度,接收Job Submitter提交的作业,按照作业的上下文(Context)信息,以及从节点管理器收集来的状态信息,启动调度过程,分配一个容器(Container)。节点管理器功能比较专一,就是负责容器状态的维护,并向资源管理器保持心跳。应用程序主控器负责一个作业(Job)生命周期内的所有工作。但注意每一个作业(不是每一种)都有一个应用程序主控器,它可以运行在资源管理器以外的机器上。而资源管理器中有一个模块叫作ApplicationsMasters(注意不是ApplicationMaster),它用于监测应用程序主控器的运行状况,如果出问题,就会在其他机器上重启。容器(Container)是YARN为了将来作资源隔离而提出的一个框架。这一点应该借鉴了Mesos的工作,虽然容器目前是一个框架,仅仅提供了Java虚拟机内存的隔离,但是未来可能会支持更多的资源调度和控制。

总之,YARN从某种意义上来说应该算是一个云操作系统,它负责集群的资源管理。在操作系统之上可以开发各类的应用程序。这些应用可以同时利用Hadoop集群的计算能力和丰富的数据存储模型,共享同一个Hadoop集群和驻留在集群上的数据。