大数据处理系统:Hadoop源代码情景分析
上QQ阅读APP看书,第一时间看更新

3.5 资源管理器ResourceManager

如前所述,在一个Hadoop集群中,有且只有一个实际有效的“资源管理者”,即ResourceManager类的对象,起着Hadoop平台核心,其实是YARN框架核心的作用,管理着整个平台(集群)的计算资源,可以说相当于“中央政府”。对于宿主机而言,执行着ResourceManager的Java虚拟机是个独立的进程。

除ResourceManager(缩写为RM)这个“主节点”以外,集群中凡启动了YARN子系统的所有节点都是NodeManager(缩写为NM)节点,都是“从节点”,唯一例外的是后面会讲到的备份RM,或者说不活跃的RM。RM与NM之间是主从关系,相当于中央与地方的关系。注意,这里说的是“主从”而不是“主次”,更不是主副,因为主次之间不一定是从属的关系、领导与被领导的关系。

用户每向这个Hadoop平台提交一个应用程序作业,即App,资源管理器就会设法在某个NM节点上为其另起一个Java虚拟机以运行App管理者(AppMaster, AM)。YARN的AM不止一种,什么样的App就用什么样的AM。对于MapReduce应用,那就是MRAppMaster。MRAppMaster在独立的Java虚拟机上运行,与当地的NodeManager不在同一个Java虚拟机上。

可想而知,ResourceManager类的体积一定不小,我们得要分层次做它的摘要。当然,既然是摘要就不会是全面的。

      class ResourceManager extends CompositeService implements Recoverable {}
      ]List<Service> serviceList=new ArrayList<Service>() //这是在CompositeService
      ]Dispatcher rmDispatcher         //这是一个AsyncDispatcher
      ]AdminService adminService
      ]RMActiveServices activeServices   //见后
      ]ResourceScheduler scheduler       //资源调度器,有三种选择
      ]ClientRMService clientRM
      ]ApplicationMasterService masterService //为现有的AM提供服务和管理
      ]NodesListManager nodesListManager
      ]RMAppManager rmAppManager        //管理已提交而尚未完成的App
      ]WebApp webApp                   //提供作为webApp的网站服务
      ]AppReportFetcher fetcher
      ]ResourceTrackerService resourceTracker
      ]String webAppAddress
      ]main(String argv[])
        > resourceManager=new ResourceManager()
        > resourceManager.init(conf)//这会调用super.init(),然后回过来调用这里的serviceInit()
        > resourceManager.start()
      ]serviceInit(Configuration conf)
        > doSecureLogin() //如果启用了安全机制,用户就得登录,此后便用实际用户的身份
        > rmDispatcher=setupDispatcher()//创建一个Dispatcher并将其设置成RMDispatcher
        > adminService=createAdminService() //创建系统管理服务createAndInitActiveServices() //创建许多的 service并放进上面的 serviceList
        > super.serviceInit(this.conf)
      ]serviceStart()
        > rmStore.start()
        > super.serviceStart()
      ]class RMActiveServices
      ]class ApplicationEventDispatcher implements EventHandler<RMAppEvent> {}
      ]class RMContainerPreemptEventDispatcher{}
      ]class ApplicationAttemptEventDispatcher implements EventHandler<…> {}
      ]class NodeEventDispatcher implements EventHandler<RMNodeEvent> {}
      ]class SchedulerEventDispatcher{}
      ]]class EventProcessor implements Runnable {}
      ]]handle(SchedulerEvent event)

在这份摘要里,除webAppAddress以外,在main()之前的都是ResourceManager内部第一层的结构成分,相当于中央一级的部门,其中有些是特别重要的,像rmDispatcher、scheduler等都是。

ResourceManager对象带有main()函数,说明一般而言这个类的对象都是独占一台JVM的,同一台JVM上的其余对象都由它直接或间接创建。注意,像这一类的对象是JVM装载并调用其main()在先,此时尚未创建具体的对象,对象的创建一般是由main()完成的。

RM对象的初始化由serviceInit()完成。Hadoop上的安全机制启用与否是可配置的,如果启用了安全机制,用户此时就得登录,登录以后便使用实际用户的身份;否则就直接使用启动该JVM的用户,通常是特权用户的身份了。RM并没有状态机,但也需要有个通用的Dispatcher,因为在运行过程中它也会产生事件而需要分发。当然,想要采用这个通用Dispatcher就得向其登记,告知其什么样的事件应该发送给谁。细看一下setupDispatcher()的代码,就可发现RM所用的其实也是AsyncDispatcher。下面的createAdminService()所创建的是个AdminService,即管理服务对象,这与系统的容错机制也有关系,但是现在我们还不关心这个。

再下面的createAndInitActiveServices()就厉害了,它创建一个RMActiveServices类对象,在这个对象的初始化过程中为RM创建了许多“活跃”服务。前面讲过,一个集群中只有一个有效的,也即活跃的资源管理者,但是这并不意味着不可以有后备的、不活跃的ResourceManager,二者的区别就在于活跃的RM上有着一些活跃服务,从而能提供并被允许提供这些服务。而RMActiveServices,就是这些活跃服务的管理者。那么有些什么样的活跃服务呢?我们可以看一下RMActiveServices的摘要,特别是其serviceInit()方法的摘要:

      class ResourceManager{}
      ]class RMActiveServices{}
      ]]EventHandler<SchedulerEvent> schedulerDispatcher
      ]]ApplicationMasterLauncher applicationMasterLauncher
      ]]serviceInit() //注意这是RMActiveServices serviceInit()
          > containerAllocationExpirer=new ContainerAllocationExpirer(rmDispatcher);
          > addService(containerAllocationExpirer);
          > rmContext.setContainerAllocationExpirer(containerAllocationExpirer)
          >//为减小篇幅,以下各项均略去addService()和对rmContext的设置
          > amLivelinessMonitor=createAMLivelinessMonitor() //用来监视各AM是否存活
          >> new AMLivelinessMonitor(this.rmDispatcher)
          > amFinishingMonitor=createAMLivelinessMonitor()//用来监视各AM是否已完成使命
          > nlm=createNodeLabelManager() //创建RMNodeLabelsManager
          > rmStore=RMStateStoreFactory.getStore(conf) //创建RMStateStore,以保存各种副本
          > rmStore.setRMDispatcher(rmDispatcher)
          > rmApplicationHistoryWriter=createRMApplicationHistoryWriter()//用于运行日志Log
          > systemMetricsPublisher=createSystemMetricsPublisher()    //用来发布统计信息
          > nodesListManager=new NodesListManager(rmContext) //用来管理NM节点状态清单
          > scheduler=createScheduler() //创建资源调度器
          >> schedulerClazz=Class.forName(schedulerClassName)//conf文件获取调度器类名
          >> ReflectionUtils.newInstance(schedulerClazz, this.conf) //创建调度器
          > schedulerDispatcher=createSchedulerEventDispatcher()
                                                        //创建调度器专用的Dispatcher
          >> new SchedulerEventDispatcher(this.scheduler)
          >//创建三种Dispatch目标对象(接受来自RM的事件),并向RMDispatcher登记
          > rmDispatcher.register(RMAppEventType.class,
                                    new ApplicationEventDispatcher(rmContext))
          > rmDispatcher.register(RMAppAttemptEventType.class,
                                    new ApplicationAttemptEventDispatcher(rmContext))
          > rmDispatcher.register(RMNodeEventType.class,
                                    new NodeEventDispatcher(rmContext))
          > nmLivelinessMonitor=createNMLivelinessMonitor()//用来监视NodeManager的存活
          > resourceTracker=createResourceTrackerService() //用来跟踪资源的使用
          > JvmMetrics.initSingleton("ResourceManager", null) //保证统计信息中只有一个R
          > reservationSystem=createReservationSystem()  //如有要求就创建资源预订机制
          >> reservationClazz=Class.forName(reservationClassName)
          >> ReflectionUtils.newInstance(reservationClazz, this.conf)
          > masterService=createApplicationMasterService() //用来为NM节点上的AM提供服务
          >> new ApplicationMasterService(this.rmContext, scheduler)
          > rmAppManager=createRMAppManager() //用来管理AM
          >> new RMAppManager(this.rmContext, …)
          > clientRM=createClientRMService()     //用来为客户提供资源服务
          > applicationMasterLauncher=createAMLauncher() //用来在NM节点上启动运行AM
          >> new ApplicationMasterLauncher(this.rmContext)
      ]]serviceStart()

可见RM上“活跃服务”的种类真是不少,用来提供这些服务的对象也真不少。创建了每一个这样的“服务”以后,都要通过addService()把它加入到RM的服务列表serviceList中,这里为了减小摘要的篇幅就把这些操作略去了。

这里需要说明一下Dispatcher。RM有个顶层的rmDispatcher,用来帮助向ApplicationEventDispatcher、ApplicationAttemptEventDispatcher、NodeEventDispatcher等对象分发事件。但是这三者的命名容易令人误解,因为它们其实并非Dispatcher,而是Dispatch的目标;它们实现的是EventHandler界面,而不是Dispatcher界面。这里在RMActiveServices中又有个schedulerDispatcher,这显然与调度器有关,其类型是SchedulerEventDispatcher。但是它实现的也是EventHandler界面,而不是Dispatcher界面;从形式上看,它是用来处理SchedulerEvent类事件的,而不是帮助分发事件的路由器。

不过称它们为Dispatcher也并非全无道理,因为它们其实起着中介的作用,对于rmDispatcher来说,它们确实是事件的处理者而不是分发者,所实现的界面也是EventHandler而不是Dispatcher。但是,对于站在它们身后的真正意义上的事件处理者而言,它们却相当于事件的分发者,相当于Dispatcher。

这里的ApplicationEventDispatcher和ApplicationAttemptEventDispatcher,前者有关Application,用来驱动代表着具体App的ApplicationImpl对象中的状态机;后者有关ApplicationAttempt,用来驱动代表着一次具体运行尝试的RMAppAttemptImpl对象中的状态机。二者的区别在于:前者是宏观层面的,代表着一个App;而后者则代表运行该App的一次具体尝试,相对而言是微观层面的。如果App的一次ApplicationAttempt失败,AM可以要求再来一次,要到屡战屡败之后才能宣告Application的失败。

至于NodeEventDispatcher,则与ResourceTrackerService和RMNodeImpl有关。RM为NodeEventDispatcher登记要接收的事件类型是RMNodeEventType,这种事件主要来自ResourceTrackerService,反映着集群中各节点的状态变化,例如失去连接然后又恢复了,或者机器重启了,等等。NodeEventDispatcher用这些事件驱动相应的RMNodeImpl对象中与此节点相对应的状态机。NodeEventDispatcher的handle()函数说明了这一点:

      [ResourceManager.NodeEventDispatcher.handle()]


          public void handle(RMNodeEvent event){
            NodeId nodeId=event.getNodeId();
            RMNode node=this.rmContext.getRMNodes().get(nodeId);
                        //RMNode是界面,RMNodeImpl 实现了RMNodeEventHandler两个界面
            if (node! =null){
              try{
                ((EventHandler<RMNodeEvent>)node).handle(event);
                                      //操作RMNodeImpl 的状态机,调用其doTransition()
              }catch(Throwable t){
                LOG.error("Error in handling event type"+event.getType()+"for node"+nodeId, t);
              }
            }
          }

对于集群中的每一个节点,RM维持着一个与之对应的RMNodeImpl对象,在这个对象中有一个状态机,代表着该节点的当前状态。当然,这样的RMNodeImpl对象有很多,而NodeEventDispatcher就是这些状态机的驱动者。它根据到来的事件event内容中的NodeId确定这是要分发到哪一个节点的状态机,然后找到代表着那个节点的RMNode对象node,即RMNodeImpl,就以event为参数直接调用其handle()函数。从这个意义上说,称之为Dispatcher确实也有道理,并且这是个同步的Dispatcher。

回到前面对ResourceManager数据部分的摘要,其实还有更多,下面先择要简单介绍几个,其余的会在后面结合具体的情景加以介绍,这里就不一一列举介绍了。