3.5 资源管理器ResourceManager
如前所述,在一个Hadoop集群中,有且只有一个实际有效的“资源管理者”,即ResourceManager类的对象,起着Hadoop平台核心,其实是YARN框架核心的作用,管理着整个平台(集群)的计算资源,可以说相当于“中央政府”。对于宿主机而言,执行着ResourceManager的Java虚拟机是个独立的进程。
除ResourceManager(缩写为RM)这个“主节点”以外,集群中凡启动了YARN子系统的所有节点都是NodeManager(缩写为NM)节点,都是“从节点”,唯一例外的是后面会讲到的备份RM,或者说不活跃的RM。RM与NM之间是主从关系,相当于中央与地方的关系。注意,这里说的是“主从”而不是“主次”,更不是主副,因为主次之间不一定是从属的关系、领导与被领导的关系。
用户每向这个Hadoop平台提交一个应用程序作业,即App,资源管理器就会设法在某个NM节点上为其另起一个Java虚拟机以运行App管理者(AppMaster, AM)。YARN的AM不止一种,什么样的App就用什么样的AM。对于MapReduce应用,那就是MRAppMaster。MRAppMaster在独立的Java虚拟机上运行,与当地的NodeManager不在同一个Java虚拟机上。
可想而知,ResourceManager类的体积一定不小,我们得要分层次做它的摘要。当然,既然是摘要就不会是全面的。
class ResourceManager extends CompositeService implements Recoverable {} ]List<Service> serviceList=new ArrayList<Service>() //这是在CompositeService中 ]Dispatcher rmDispatcher //这是一个AsyncDispatcher ]AdminService adminService ]RMActiveServices activeServices //见后
]ResourceScheduler scheduler //资源调度器,有三种选择 ]ClientRMService clientRM ]ApplicationMasterService masterService //为现有的AM提供服务和管理 ]NodesListManager nodesListManager ]RMAppManager rmAppManager //管理已提交而尚未完成的App ]WebApp webApp //提供作为webApp的网站服务 ]AppReportFetcher fetcher ]ResourceTrackerService resourceTracker ]String webAppAddress ]main(String argv[]) > resourceManager=new ResourceManager() > resourceManager.init(conf)//这会调用super.init(),然后回过来调用这里的serviceInit() > resourceManager.start() ]serviceInit(Configuration conf) > doSecureLogin() //如果启用了安全机制,用户就得登录,此后便用实际用户的身份 > rmDispatcher=setupDispatcher()//创建一个Dispatcher并将其设置成RM的Dispatcher > adminService=createAdminService() //创建系统管理服务 > createAndInitActiveServices() //创建许多的 service并放进上面的 serviceList中 > super.serviceInit(this.conf) ]serviceStart() > rmStore.start() > super.serviceStart() ]class RMActiveServices ]class ApplicationEventDispatcher implements EventHandler<RMAppEvent> {} ]class RMContainerPreemptEventDispatcher{} ]class ApplicationAttemptEventDispatcher implements EventHandler<…> {} ]class NodeEventDispatcher implements EventHandler<RMNodeEvent> {} ]class SchedulerEventDispatcher{} ]]class EventProcessor implements Runnable {} ]]handle(SchedulerEvent event)
在这份摘要里,除webAppAddress以外,在main()之前的都是ResourceManager内部第一层的结构成分,相当于中央一级的部门,其中有些是特别重要的,像rmDispatcher、scheduler等都是。
ResourceManager对象带有main()函数,说明一般而言这个类的对象都是独占一台JVM的,同一台JVM上的其余对象都由它直接或间接创建。注意,像这一类的对象是JVM装载并调用其main()在先,此时尚未创建具体的对象,对象的创建一般是由main()完成的。
RM对象的初始化由serviceInit()完成。Hadoop上的安全机制启用与否是可配置的,如果启用了安全机制,用户此时就得登录,登录以后便使用实际用户的身份;否则就直接使用启动该JVM的用户,通常是特权用户的身份了。RM并没有状态机,但也需要有个通用的Dispatcher,因为在运行过程中它也会产生事件而需要分发。当然,想要采用这个通用Dispatcher就得向其登记,告知其什么样的事件应该发送给谁。细看一下setupDispatcher()的代码,就可发现RM所用的其实也是AsyncDispatcher。下面的createAdminService()所创建的是个AdminService,即管理服务对象,这与系统的容错机制也有关系,但是现在我们还不关心这个。
再下面的createAndInitActiveServices()就厉害了,它创建一个RMActiveServices类对象,在这个对象的初始化过程中为RM创建了许多“活跃”服务。前面讲过,一个集群中只有一个有效的,也即活跃的资源管理者,但是这并不意味着不可以有后备的、不活跃的ResourceManager,二者的区别就在于活跃的RM上有着一些活跃服务,从而能提供并被允许提供这些服务。而RMActiveServices,就是这些活跃服务的管理者。那么有些什么样的活跃服务呢?我们可以看一下RMActiveServices的摘要,特别是其serviceInit()方法的摘要:
class ResourceManager{} ]class RMActiveServices{} ]]EventHandler<SchedulerEvent> schedulerDispatcher ]]ApplicationMasterLauncher applicationMasterLauncher ]]serviceInit() //注意这是RMActiveServices的 serviceInit() > containerAllocationExpirer=new ContainerAllocationExpirer(rmDispatcher); > addService(containerAllocationExpirer); > rmContext.setContainerAllocationExpirer(containerAllocationExpirer) >//为减小篇幅,以下各项均略去addService()和对rmContext的设置 > amLivelinessMonitor=createAMLivelinessMonitor() //用来监视各AM是否存活 >> new AMLivelinessMonitor(this.rmDispatcher) > amFinishingMonitor=createAMLivelinessMonitor()//用来监视各AM是否已完成使命 > nlm=createNodeLabelManager() //创建RMNodeLabelsManager > rmStore=RMStateStoreFactory.getStore(conf) //创建RMStateStore,以保存各种副本 > rmStore.setRMDispatcher(rmDispatcher) > rmApplicationHistoryWriter=createRMApplicationHistoryWriter()//用于运行日志Log > systemMetricsPublisher=createSystemMetricsPublisher() //用来发布统计信息 > nodesListManager=new NodesListManager(rmContext) //用来管理NM节点状态清单 > scheduler=createScheduler() //创建资源调度器 >> schedulerClazz=Class.forName(schedulerClassName)//从conf文件获取调度器类名 >> ReflectionUtils.newInstance(schedulerClazz, this.conf) //创建调度器 > schedulerDispatcher=createSchedulerEventDispatcher() //创建调度器专用的Dispatcher >> new SchedulerEventDispatcher(this.scheduler) >//创建三种Dispatch目标对象(接受来自RM的事件),并向RM的Dispatcher登记 > rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(rmContext)) > rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(rmContext)) > rmDispatcher.register(RMNodeEventType.class,
new NodeEventDispatcher(rmContext)) > nmLivelinessMonitor=createNMLivelinessMonitor()//用来监视NodeManager的存活 > resourceTracker=createResourceTrackerService() //用来跟踪资源的使用 > JvmMetrics.initSingleton("ResourceManager", null) //保证统计信息中只有一个R > reservationSystem=createReservationSystem() //如有要求就创建资源预订机制 >> reservationClazz=Class.forName(reservationClassName) >> ReflectionUtils.newInstance(reservationClazz, this.conf) > masterService=createApplicationMasterService() //用来为NM节点上的AM提供服务 >> new ApplicationMasterService(this.rmContext, scheduler) > rmAppManager=createRMAppManager() //用来管理AM >> new RMAppManager(this.rmContext, …) > clientRM=createClientRMService() //用来为客户提供资源服务 > applicationMasterLauncher=createAMLauncher() //用来在NM节点上启动运行AM >> new ApplicationMasterLauncher(this.rmContext) ]]serviceStart()
可见RM上“活跃服务”的种类真是不少,用来提供这些服务的对象也真不少。创建了每一个这样的“服务”以后,都要通过addService()把它加入到RM的服务列表serviceList中,这里为了减小摘要的篇幅就把这些操作略去了。
这里需要说明一下Dispatcher。RM有个顶层的rmDispatcher,用来帮助向ApplicationEventDispatcher、ApplicationAttemptEventDispatcher、NodeEventDispatcher等对象分发事件。但是这三者的命名容易令人误解,因为它们其实并非Dispatcher,而是Dispatch的目标;它们实现的是EventHandler界面,而不是Dispatcher界面。这里在RMActiveServices中又有个schedulerDispatcher,这显然与调度器有关,其类型是SchedulerEventDispatcher。但是它实现的也是EventHandler界面,而不是Dispatcher界面;从形式上看,它是用来处理SchedulerEvent类事件的,而不是帮助分发事件的路由器。
不过称它们为Dispatcher也并非全无道理,因为它们其实起着中介的作用,对于rmDispatcher来说,它们确实是事件的处理者而不是分发者,所实现的界面也是EventHandler而不是Dispatcher。但是,对于站在它们身后的真正意义上的事件处理者而言,它们却相当于事件的分发者,相当于Dispatcher。
这里的ApplicationEventDispatcher和ApplicationAttemptEventDispatcher,前者有关Application,用来驱动代表着具体App的ApplicationImpl对象中的状态机;后者有关ApplicationAttempt,用来驱动代表着一次具体运行尝试的RMAppAttemptImpl对象中的状态机。二者的区别在于:前者是宏观层面的,代表着一个App;而后者则代表运行该App的一次具体尝试,相对而言是微观层面的。如果App的一次ApplicationAttempt失败,AM可以要求再来一次,要到屡战屡败之后才能宣告Application的失败。
至于NodeEventDispatcher,则与ResourceTrackerService和RMNodeImpl有关。RM为NodeEventDispatcher登记要接收的事件类型是RMNodeEventType,这种事件主要来自ResourceTrackerService,反映着集群中各节点的状态变化,例如失去连接然后又恢复了,或者机器重启了,等等。NodeEventDispatcher用这些事件驱动相应的RMNodeImpl对象中与此节点相对应的状态机。NodeEventDispatcher的handle()函数说明了这一点:
[ResourceManager.NodeEventDispatcher.handle()] public void handle(RMNodeEvent event){ NodeId nodeId=event.getNodeId(); RMNode node=this.rmContext.getRMNodes().get(nodeId); //RMNode是界面,RMNodeImpl 实现了RMNode和EventHandler两个界面 if (node! =null){ try{ ((EventHandler<RMNodeEvent>)node).handle(event); //操作RMNodeImpl 的状态机,调用其doTransition() }catch(Throwable t){ LOG.error("Error in handling event type"+event.getType()+"for node"+nodeId, t); } } }
对于集群中的每一个节点,RM维持着一个与之对应的RMNodeImpl对象,在这个对象中有一个状态机,代表着该节点的当前状态。当然,这样的RMNodeImpl对象有很多,而NodeEventDispatcher就是这些状态机的驱动者。它根据到来的事件event内容中的NodeId确定这是要分发到哪一个节点的状态机,然后找到代表着那个节点的RMNode对象node,即RMNodeImpl,就以event为参数直接调用其handle()函数。从这个意义上说,称之为Dispatcher确实也有道理,并且这是个同步的Dispatcher。
回到前面对ResourceManager数据部分的摘要,其实还有更多,下面先择要简单介绍几个,其余的会在后面结合具体的情景加以介绍,这里就不一一列举介绍了。