第6章 作业的调度与指派
6.1 作业的受理
我们在上一章的结尾处看到,当一个作业,也就是一个App,通过RPC被提交到RM节点时,“上岸”后经由ClientRMService被交到了RMAppManager对象的手里。RM节点上的ClientRMService对象相当于接待站,而RMAppManager对象则专门管理与应用/作业的申请和运行有关的事务。两个对象均由ResourceManager创建,都在同一个JVM上。
ClientRMService是通过调用rmAppManager.submitApplication()把作业申请交到RMAppManager手里的,我们就从这里继续往下看。
[RPC > ApplicationClientProtocolPBServiceImpl.submitApplication()> ClientRMService.submitApplication()> RMAppManager.submitApplication()] RMAppManager.submitApplication(ApplicationSubmissionContext submissionContext, long submitTime, String user)throws YarnException { ApplicationId applicationId=submissionContext.getApplicationId(); RMAppImpl application= createAndPopulateNewRMApp(submissionContext, submitTime, user); ApplicationId appId=submissionContext.getApplicationId(); if (UserGroupInformation.isSecurityEnabled()){ //如果开启了安全机制 try{ this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, parseCredentials(submissionContext), submissionContext.getCancelTokensWhenComplete(), application.getUser()); }catch(Exception e){ //异常,addApplicationAsync()失败 LOG.warn("Unable to parse credentials.", e); //Sending APP_REJECTED is fine, since we assume that the //RMApp is in NEW state and thus we haven't yet informed the //scheduler about the existence of the application
assert application.getState()==RMAppState.NEW; this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppRejectedEvent(applicationId, e.getMessage())); throw RPCUtil.getRemoteException(e); } }else { //未开启安全机制 //Dispatcher is not yet started at this time, so these START events //enqueued should be guaranteed to be first processed when dispatcher //gets started this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.START)); //从rmContext获取所用的Dispatcher,进而获取其EventHandler //将START 事件交给它处理。后者会用其触发RMAppImpl对象的状态机 } }
首先是createAndPopulateNewRMApp()。顾名思义这个函数做的是两件事,即为所提交的作业创建一个RMApp对象,并Populate。不过RMApp只是个interface,实际创建的是个RMAppImpl对象。这是根据所提交的submissionContext,即前一章中所讲的ASC和用户名创建的,代表着具体的App,也包含着有关该App的全部信息。至于Populate,则是说把所创建的RMAppImpl对象连同其ApplicationId放到一个类似于名单一样的便查表中,即MAP<ApplicationId, RMApp>中,这样以后凭ID就可以从中找到它的RMAppImpl对象。
[RPC > ApplicationClientProtocolPBServiceImpl.submitApplication()> ClientRMService.submitApplication()> RMAppManager.submitApplication()> createAndPopulateNewRMApp()] createAndPopulateNewRMApp(ApplicationSubmissionContext submissionContext, long submitTime, String user) > ApplicationId applicationId=submissionContext.getApplicationId() > ResourceRequest amReq=validateAndCreateResourceRequest(submissionContext) //检验资源请求是否合理,不合理就发 InvalidResourceRequestException异常 > RMAppImpl application=new RMAppImpl(applicationId, rmContext, this.conf, submissionContext.getApplicationName(), user, submissionContext.getQueue(), submissionContext, this.scheduler, this.masterService, submitTime, submissionContext.getApplicationType(), //默认为“YARN” submissionContext.getApplicationTags(), amReq) > rmContext.getRMApps().putIfAbsent(applicationId, application) //将App加入RMContextImpl 内部的一个MAP //以后凭applicationId就可找到其RMAppImpl 对象
> this.applicationACLsManager.addApplication(applicationId, submissionContext.getAMContainerSpec().getApplicationACLs()) //将App加入到访问控制名单ACL管理器中 > String appViewACLs=submissionContext.getAMContainerSpec() .getApplicationACLs().get(ApplicationAccessType.VIEW_APP) > rmContext.getSystemMetricsPublisher().appACLsUpdated(application, appViewACLs, System.currentTimeMillis())//与统计有关 > return application
创建RMAppImpl时所传递的参数主要来自用户,特别是来自其提交的ApplicationSubmissionContext,但也有些信息是由RM节点上的RMAppManager提供的,如rmContext、this.conf、this.scheduler和this.masterService。其中this.scheduler决定了将采用哪一种调度器;this.masterService则是个ApplicationMasterService对象,是RM节点上专门管理那些为具体应用而建立的ApplicationMaster的。如前所述,ApplicationMaster类似于“项目组长”。下面就要为当前的这个App找个节点在上面创建ApplicationMaster了,创建后它就得向ApplicationMasterService登记报到。
回到前面RMAppManager.submitApplication()的代码。
Hadoop实现了基于身份认证的安全机制,但是用户可以启用也可以不启用这种安全机制。启用了安全机制后的流程当然要稍为复杂一些,但是由此而来的复杂性并不影响Hadoop与YARN的主流。在这里,如果一头钻进安全机制不免会偏离我们的大方向,会冲淡我们对主流的分析理解,所以我们现在走不开启安全机制的这条路。
作业的提交引起一个RMAppEventType.START事件,这就涉及我们在前面介绍过的状态机了。如前所述,用户每向RM提交一个作业,RMAppManager就为其创建一个RMAppImpl对象,作为该作业在RM中的代表。可想而知,随着作业的进展,其RMAppImpl对象将处于各种不同的状态。所以,每个RMAppImpl对象,或者说每个RMApp,都有个自己的状态机。所有RMAppImpl对象的状态机都是一样的,只不过各自处于不同的状态;所以stateMachineFactory是RMAppImpl类的静态成分。但是,具体的状态机stateMachine则是在RMAppImpl对象的构造函数中创建的。创建后的初始状态为RMAppState.NEW。而事件RMAppEventType.START就被用来触发该App的状态机。
不过,这里也简单提一下启用安全机制时的流程。代码中的getDelegationTokenRenewer()返回一个DelegationTokenRenewer对象,这个对象提供一个方法addApplicationAsync(),这显然就是异步增加一个App的意思。这个方法将App连同用户信息及其各种证件一起挂入DelegationTokenRenewer的队列,然后它的一个DelegationTokenRenewerRunnable线程会来检验队列中这些App的资质。如果检验合格,就同样会发起一次RMAppEventType.START事件,此后就跟不开启安全机制时的流程一样了。
这里创建的事件是RMAppEventType.START,并通过Dispatcher把这个事件派送给RMApp的状态机。前面讲状态机时说过,Dispatcher就好像是按事件类型导向的路由器,想要让某个受主接受某类事件,就得向Dispatcher登记,这就好比在路由器中配置一条路由。实际情况也正是如此。ResourceManager在其serviceInit()中向其内部的rmDispatcher登记,将RMAppEventType类的事件绑定到一个ApplicationEventDispatcher类对象,并将后者记录在rmContext中。注意,后者虽然名为ApplicationEventDispatcher,但实际上却只是RMApp的EventHandler。所以,上面的this.rmContext.getDispatcher().getEventHandler()所返回的其实是作为事件处理器的ApplicationEventDispatcher对象,然后调用其handle()方法就会触发RMApp状态机的跳变:
[RPC > ApplicationClientProtocolPBServiceImpl.submitApplication()> ClientRMService.submitApplication()> RMAppManager.submitApplication()> ApplicationEventDispatcher.handle()] ApplicationEventDispatcher.handle(RMAppEvent event) > appID=event.getApplicationId() //获取该事件的AppId > rmApp=this.rmContext.getRMApps().get(appID) //根据AppId找到其RMAppImpl 对象 > if (rmApp! =null)rmApp.handle(event)==RMAppImpl.handle(event)//调用其handle()方法 >> appID=event.getApplicationId() >> this.stateMachine.doTransition(event.getType(), event) //触发状态机跳变
这里rmContext是RM的Context,实际上是个RMContextImpl对象。前述的那个MAP就在这个对象内部,所以根据具体应用的AppID可以得到其RMApp,实际上是代表着这个应用的RMAppImpl对象。
看一下RMAppImpl类的代码,就可以找到其状态机的跳变规则,其中以RMAppEventType.START为触发条件的只有一条,那就是:
addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
我们这个应用的RMAppImpl对象是刚刚创建的,它的状态机正处于初始状态RMAppState.NEW,所以这条规则正好适用。注意,这条规则还规定要为状态机提供一个RMAppNewlySavingTransition类的对象,这个类提供了一个transition()函数供状态机调用,作为这个跳变所需的伴随操作,这也是状态机对事件的反应之一。类似于RMAppNewlySavingTransition那样,用在这个地方的类,都实现了SingleArcTransition或MultipleArcTransition界面,所以都提供transition()操作。值得注意的是,名曰transition(),实际上却不是用来实现状态变迁的,而只是用来提供伴随着状态变迁、除状态跳变本身以外的操作。
所以,接着发生的事,是RMAppNewlySavingTransition.transition()得到调用,而这个状态机的当前状态则变成RMAppState.NEW_SAVING。
[RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()]
RMAppNewlySavingTransition.transition(RMAppImpl app, RMAppEvent event){
//If recovery is enabled then store the application information in a
//non-blocking call so make sure that RMhas stored the information
//needed to restart the AMafter RMrestart without further client communication
LOG.info("Storing application with id"+app.applicationId);
app.rmContext.getStateStore().storeNewApplication(app);
}
这里在程序的调用路径中用“=>”表示前者导致了后者,而不是(或不一定是)前者直接调用了后者。这是因为,如果Dispatcher是异步的,那么前者只是把一个事件对象放进一个事件队列,而另有一个线程在处理这些事件对象的派发,这才调用到了后者,后者实际上并非由前者直接调用,而且后者是在另一个线程的上下文中得到执行。
显然,这个函数要做的事情是通过storeNewApplication()把具体应用的信息都保存起来。这是因为,这个应用是否能马上被调度运行还不得而知,也许需要等上很长时间,而且随后对这个应用的处理和执行有可能中途失败,将要为其创建的AM甚至RM都有可能重启,那时候这个App应该要有个副本作为恢复起点,这相当于数据库处理中的checkpoint。我们在前面看到所创建的RMAppImpl对象被放进了RM的app集合,但是那个正本的RMAppImpl对象在随后的处理中是会改变,甚至有可能丢失的。
[RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()> RMStateStore.storeNewApplication()] /** * Non-Blocking API * ResourceManager services use this to store the application's state * This does not block the dispatcher threads * RMAppStoredEvent will be sent on completion to notify the RMApp */ storeNewApplication(RMApp app){ ApplicationSubmissionContext context=app.getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; ApplicationState appState=new ApplicationState(app.getSubmitTime(), app.getStartTime(), context, app.getUser()); dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); }
显然,这里要保存的主体是这个App的ApplicationSubmissionContext,用户所提交的信息其实都在这里面。把它保存起来,就把所提交的作业信息保存了一份副本,所以这里创建的是一个包含着这些信息的ApplicationState对象。此外,这里还以此为参数创建了一个RMStateStoreAppEvent事件,并让Dispatcher把该事件分发给相应的事件处理器。不过,这个事件的类型属于RMStateStoreEvent,而不是前述的RMAppState,所以针对的是不同的事件处理器或状态机。事实上,这一方面固然是为了保存所创建的ApplicationState对象appState,另一方面这也会引起RMStateStore状态机的跳变和反应。这个RMStateStoreAppEvent的具体类型是RMStateStoreEventType.STORE_APP, appState则相当于这个事件的附件。
那保存在哪里呢?RM节点上有个RMStateStore对象,我们可以在ResourceManager的serviceInit()中看到RMStateStore的创建。但RMStateStore是个抽象类,由此扩充派生出来的就有FileSystemRMStateStore、MemoryRMStateStore、NullRMStateStore以及ZKRMStateStore,具体选用哪一种可以在配置文件yarn-default.xml中加以指定。
可想而知,如果采用的是FileSystemRMStateStore,那就要把副本保存在文件系统中,这里面有个过程,而且可能要应对许多不同的情况。所以,其实RMStateStore类自己就有个StateMachineFactory,具体的FileSystemRMStateStore对象则各有自己的状态机。
RMStateStore对STORE_APP事件的反应是在StoreAppTransition.transition()中调用storeApplicationStateInternal()以保存appState中的数据,操作完成以后就发出一个RMAppEventType.APP_NEW_SAVED事件,用来触发RMAppImpl对象的状态机。
在RMAppImpl对象的状态机中,以RMAppEventType.APP_NEW_SAVED为触发条件的跳变规则是:
addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
就是说,如果状态机的当前状态是NEW_SAVING,到来的事件是APP_NEW_SAVED,则状态变成SUBMITTED,伴随的操作是AddApplicationToSchedulerTransition.transition()。状态变成RMAppState.SUBMITTED,就表示提交已经完成。不过状态的变化发生在伴随操作之后,所以先要执行下面这个操作:
[RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()=> AddApplicationToSchedulerTransition.transition()] AddApplicationToSchedulerTransition.transition(RMAppImpl app, RMAppEvent event){ app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user, app.submissionContext.getReservationID())); }
这里创建并作为参数传给app.handler.handle()的AppAddedSchedulerEvent对象把有关该app的一些信息打包在一起,其中applicationId和用户名user不言自明;而getQueue()所返回的队列名表示要把该app请求挂到哪一个资源调度队列中。最后的ReservationID则用于事先已有预订的情况。我们来看一下AppAddedSchedulerEvent对象的构造函数:
class AppAddedSchedulerEvent extends SchedulerEvent { public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, String user, boolean isAppRecovering, ReservationId reservationID){ super(SchedulerEventType.APP_ADDED); this.applicationId=applicationId; this.queue=queue; this.user=user; this.reservationID=reservationID; this.isAppRecovering=isAppRecovering;
} }
这里我们关心的是,这个事件的类型是SchedulerEventType.APP_ADDED。
另外,这个事件是通过app.handler.handle()派发的;而RMAppImpl的handler,到源码中看一下就可知道,是通过dispatcher.getEventHandler()得来的。那么这个EventHandler究竟是什么呢?这就要看当初登记与SchedulerEventType绑定的是什么。事实上,前面我们看到在ResourceManager的初始化阶段,在其所创建的RMActiveServices的serviceInit()中创建了一个SchedulerEventDispatcher对象,并登记与SchedulerEventType类的事件绑定,这个SchedulerEventDispatcher实现了EventHandler界面,起着EventHandler的作用。所以这里调用的实际上是SchedulerEventDispatcher.handle(SchedulerEventType.APP_ADDED)。
注意SchedulerEventDispatcher是定义于ResourceManager内部的一个类,它实现了EventHandler界面,并且内部有一个EventProcessor线程。现在调用的是EventHandler界面上的handle()方法:
[RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition() => AddApplicationToSchedulerTransition.transition()> SchedulerEventDispatcher.handle()] ResourceManager.SchedulerEventDispatcher.handle(SchedulerEvent event){ try{ int qSize=eventQueue.size(); if (qSize! =0&&qSize %1000==0){ LOG.info("Size of scheduler event-queue is"+qSize); //每1000个事件LOG一次 } int remCapacity=eventQueue.remainingCapacity(); if (remCapacity < 1000){ //事件队列的容量已感紧张 LOG.info("Very low remaining capacity on scheduler event queue:"+remCapacity); } this.eventQueue.put(event); //挂入调度器的事件队列 }catch(InterruptedException e){ LOG.info("Interrupted.Trying to exit gracefully."); } }
像AsyncDispatcher中的情况一样,这个handle()只是把作为参数传下来的事件挂在SchedulerEventDispatcher的事件队列中就返回了。但是SchedulerEventDispatcher内部的EventProcessor线程会从这事件队列中取出事件做进一步的处理,它的run()函数如下:
[SchedulerEventDispatcher.EventProcessor.run()]
public void run(){
SchedulerEvent event;
while (! stopped&&! Thread.currentThread().isInterrupted()){ try{ event=eventQueue.take(); //从队列中取下一个事件 }catch(InterruptedException e){ LOG.error("Returning, interrupted:"+e); return; //TODO:Kill RM. } try{ scheduler.handle(event); //例如FifoScheduler.handle() }catch(Throwable t){ //An error occurred, but we are shutting down anyway //If it was an InterruptedException, the very act of //shutdown could have caused it and is probably harmless if (stopped){ LOG.warn("Exception during shutdown:", t); break; } LOG.fatal("Error in handling event type"+event.getType()+"to the scheduler", t); if (shouldExitOnError&&! ShutdownHookManager.get().isShutdownInProgress()){ LOG.info("Exiting, bbye…"); System.exit(-1); } }//end catch } //end while }
SchedulerEventDispatcher的scheduler,就是ResourceManager的scheduler,如前所述可有三种选择,这里假定其为FifoScheduler,所以这里所调用的就是FifoScheduler.handle()。我们看一下它的摘要:
FifoScheduler.handle(SchedulerEvent event) > switch(event.getType()){ > case NODE_ADDED: //增加了一个节点 >> … > case NODE_REMOVED: //撤去了一个节点 >> … > case NODE_RESOURCE_UPDATE: //节点的资源发生变化 >> … > case NODE_UPDATE: //节点的状态发生变化 >> …
> case APP_ADDED: //增加了一个App >> appAddedEvent=(AppAddedSchedulerEvent)event >> addApplication(appAddedEvent.getApplicationId(), appAddedEvent.getQueue(), … ) >>> application=new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user ) >>> applications.put(applicationId, application) //将创建的 SchedulerApplication放在applications集合中 >>> metrics.submitApp(user) //metrics是一个QueueMetrics对象,用于统计目的 >>> e=new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)//创建新事件 >>> rmContext.getDispatcher().getEventHandler().handle(e) //派发新创建的事件 > case APP_REMOVED: //撤销了一个App >> … > case APP_ATTEMPT_ADDED: //增加了一个App企图 >> … > case APP_ATTEMPT_REMOVED: //撤销了一个App企图 >> … > case CONTAINER_EXPIRED: //容器使用超时 >> … > } //end switch
同属于SchedulerEventType的事件有不少,这个handle()方法通过一个switch语句分情形加以处理。由此可见,作为实现了EventHandler界面的对象,其handle()函数对于事件的反应并不一定是通过状态机做出的,也可以通过switch语句做出反应。我们现在这个事件是APP_ADDED,暂不关心别的事件。
对于APP_ADDED,具体的反应是addApplication()。这里把这个函数展开了:首先是为这个App创建一个SchedulerApplication类的对象application,这是具体App在调度器中的代表,这个SchedulerApplication和applicationId一起被放入一个集合applications中,以后凭applicationId就可以在这个集合中找到其SchedulerApplication对象。请注意在创建application时的参数DEFAULT_QUEUE,这说明在ApplicationSubmissionContext中指定的队列对于FifoScheduler不起作用,FifoScheduler总是采用其DEFAULT_QUEUE。
同时,出于统计的目的,这里还以用户名user为参数向一个QueueMetrics类对象metrics调用其submitApp(),表示这个用户增加了一个App。
然后就创建一个新的事件RMAppEventType.APP_ACCEPTED,表示这个App已经被提交给调度器并为其接受。这个事件既然属于RMAppEventType,事件中又包含着原先的applicationId,那就是回过头来针对原先那个RMAppImpl状态机的,相应的跳变规则为:
addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
此时RMAppImpl状态机对触发事件RMAppEventType.APP_ACCEPTED的反应是:调用伴随操作StartAppAttemptTransition.transition(),然后使这个状态机的当前状态变成
RMAppState.ACCEPTED。
[RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition() =>AddApplicationToSchedulerTransition.transition()=>StartAppAttemptTransition.transition()] StartAppAttemptTransition.transition(RMAppImpl app, RMAppEvent event){ app.createAndStartNewAttempt(false); //开始一次启动运行的尝试 }
既然App申请已为调度器所接受,下一步就是为此应用(作业)开始一次运行尝试,即AppAttempt。这里的参数app是个RMAppImpl对象,所以这里所调用的操作就是RMAppImpl.createAndStartNewAttempt(),其摘要如下:
[RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()=> AddApplicationToSchedulerTransition.transition()=> StartAppAttemptTransition.transition()> RMAppImpl.createAndStartNewAttempt()] RMAppImpl.createAndStartNewAttempt() > createNewAttempt() //创建一个新的RMAppAttemptImpl,并将其设置成currentAttempt >> appAttemptId=ApplicationAttemptId.newInstance(applicationId, attempts.size()+1) >> attempt=new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, …) >> attempts.put(appAttemptId, attempt)//将attempt连同其 ID一起放入attempts集合 >> currentAttempt=attempt //将新创建的attempt设置成currentAttempt > e=new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(), …) //生成一个RMAppAttemptEventType.START事件 >> super(appAttemptId, RMAppAttemptEventType.START) > handler.handle(e) //handler取决于事件类型,在这里是RMAppAttemptImpl.handle()
顺便看一下,作为对照,RMAppImpl.createAndStartNewAttempt()的源码是这样:
RMAppImpl.createAndStartNewAttempt(boolean transferStateFromPreviousAttempt){ createNewAttempt(); handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(), transferStateFromPreviousAttempt)); }
对于用户提交的每个App,试图启动其运行的每次尝试(试运行)都称为一次应用尝试,即RMAppAttempt,由一个RMAppAttemptImpl类的对象作为代表。之所以加上前缀RM,显然是因为这是在RM节点上。对于具体App的一次尝试是个逻辑上独立又颇为复杂的过程,中间有许多状态变迁,所以RMAppAttemptImpl对象中也有状态机。
像ApplicationId一样,每个RMAppAttemptImpl都有个唯一的ApplicationAttemptId,所以在创建RMAppAttemptImpl之前都要为其分配一个号码,每次递增。
在RMAppImpl对象内部有个attempts集合,实际上是个Map<ApplicationAttemptId, RMAppAttempt>,凡属于这个App的所有attempt都在这个集合中,根据具体AppAttempt的ID号就可在这个集合中找到其RMAppAttemptImpl对象。所以,每创建了一个RMAppAttemptImpl对象之后,就要把它连同其ID号放到这个集合中,并使其成为这个RMAppImpl的“当前尝试”currentAttempt。
然后就创建一个RMAppAttemptEventType.START事件并加以派发。如前所述,派发的目标取决于事件类型。事件类型与事件处理器的绑定是事先向Dispatcher登记好的,所以这里的handler.handle()实际上是ApplicationAttemptEventDispatcher.handle():
[RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()=> AddApplicationToSchedulerTransition.transition()=> StartAppAttemptTransition.transition()> RMAppImpl.createAndStartNewAttempt()> ApplicationAttemptEventDispatcher.handle()] ApplicationAttemptEventDispatcher.handle(RMAppAttemptEvent event) > ApplicationAttemptId appAttemptID=event.getApplicationAttemptId() > ApplicationId appAttemptId=appAttemptID.getApplicationId()//注意,这是ApplicationId > rmApp=this.rmContext.getRMApps().get(appAttemptId) //根据ApplicationId获取其RMAppImpl 对象 > if (rmApp! =null){ >+ RMAppAttempt rmAppAttempt=rmApp.getRMAppAttempt(appAttemptID) //根据ApplicationAttemptId获取其RMAppAttemptImpl 对象 >+ if (rmAppAttempt! =null)rmAppAttempt.handle(event)==RMAppAttemptImpl.handle() >+> appAttemptID=event.getApplicationAttemptId() //从 event中提取 >+> this.stateMachine.doTransition(event.getType(), event) //这是具体RMAppAttemptImpl 对象的状态机 > }
可见,事件处理器ApplicationAttemptEventDispatcher对此事件的处理就是驱动具体RMAppAttemptImpl对象的状态机,而RMAppAttemptImpl状态机中相应的跳变规则如下:
addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, RMAppAttemptEventType.START, new AttemptStartedTransition()) 显然,RMAppAttemptImpl.handle()会调用AttemptStartedTransition.transition(): [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition() =>AddApplicationToSchedulerTransition.transition()=>StartAppAttemptTransition.transition() > RMAppImpl.createAndStartNewAttempt()=> AttemptStartedTransition.transition()] AttemptStartedTransition.transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event){ boolean transferStateFromPreviousAttempt=false; if (event instanceof RMAppStartAttemptEvent){ transferStateFromPreviousAttempt= ((RMAppStartAttemptEvent)event).getTransferStateFromPreviousAttempt();
} appAttempt.startTime=System.currentTimeMillis(); //Register with the ApplicationMasterService appAttempt.masterService.registerAppAttempt(appAttempt.applicationAttemptId); //向ApplicationMasterService登记一次RMAppAttempt if (UserGroupInformation.isSecurityEnabled()){ //如果启用了安全机制 appAttempt.cl ientTokenMasterKey= appAttempt.rmContext.getCl ientToAMTokenSecretManager() .createMasterKey(appAttempt.applicationAttemptId); } //Add the applicationAttempt to the scheduler and inform the scheduler //whether to transfer the state from previous attempt. appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( appAttempt.applicationAttemptId, transferStateFromPreviousAttempt)); }
我们为这个函数做一个摘要,并略作展开,就是这样:
AttemptStartedTransition.transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) > transferStateFromPreviousAttempt=event.getTransferStateFromPreviousAttempt() > appAttempt.masterService.registerAppAttempt(appAttempt.applicationAttemptId) > e=new AppAttemptAddedSchedulerEvent(appAttempt.applicationAttemptId, …) >> super(SchedulerEventType.APP_ATTEMPT_ADDED) > appAttempt.eventHandler.handle(e) //这就是FifoScheduler.handle()
先说说参数。这里的参数已经从RMAppImpl变成了RMAppAttemptImpl。前者代表一个作业、一个应用;后者则代表运行这个应用的一次尝试、一次试运行。一个应用不一定能一次运行成功,也许需要运行多次才能完成。这里的这个RMAppAttemptImpl对象appAttempt,就是前面在createAndStartNewAttempt()中创建的,这是RMAppImpl状态机受事件RMAppEventType.APP_ACCEPTED的触发而做出的反应。从现在开始,下面的活动就都是针对App的一次具体的运行尝试,而不是针对整个App的了。
这里的布尔量transferStateFromPreviousAttempt最初是从事件对象event里带进来的,目的在于供调度器使用,用以确定在为一个App发起一次新的Attempt时要不要把先前的Attempt的状态转移过来。这是因为,在这种情况下先前的Attempt多半已经失败,将其状态转移继承过来很可能可以利用其已经获得的部分成果。
另一个事情,是通过registerAppAttempt()向RM的ApplicationMasterService登记,向其报告新创建RMAppAttemptImpl对象的ApplicationAttemptId。因为一旦为此应用在某个NodeManager节点上创建起一个ApplicationMaster,它就要来与ApplicationMasterService联系,所以这里先要登记一下,挂个号备个案。ApplicationMasterService是专门管理正在集群中运行的那些App的ApplicationMaster的。
然后就是创建AppAttemptAddedSchedulerEvent对象并加以派发。这个事件的类型实际上是SchedulerEventType.APP_ATTEMPT_ADDED。在我们的情景中,因为我们假定采用FifoScheduler,这个事件对象又会被派发给FifoScheduler.handle()。我们在前面见过这个函数的摘要,但是那时我们只关心其switch语句中的“case APP_ADDED”,现在要关心一下“case APP_ATTEMPT_ADDED”了:
FifoScheduler.handle(SchedulerEvent event) > switch(event.getType()){ > case APP_ATTEMPT_ADDED: //增加了一个App运行尝试 >> addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), …) > } //end switch
这当然是要把我们的AppAttempt加入到调度器中。下面是addApplicationAttempt()的源码。
[RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition() =>AddApplicationToSchedulerTransition.transition()=>StartAppAttemptTransition.transition() => AttemptStartedTransition.transition()=> FifoScheduler.addApplicationAttempt()] FifoScheduler.addApplicationAttempt(ApplicationAttemptId appAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering){ SchedulerApplication<FiCaSchedulerApp> application= applications.get(appAttemptId.getApplicationId()); //参考前面FifoScheduler.handle()中case APP_ADDED下的addApplication() String user=application.getUser(); //TODO:Fix store FiCaSchedulerApp schedulerApp= new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, this.rmContext); //这个类型既用于FifoScheduler,又用于CapacityScheduler,故得此名 if (transferStateFromPreviousAttempt){ schedulerApp.transferStateFromPreviousAttempt( application.getCurrentAppAttempt()); } //如果要转移继承先前尝试的状态,那么所谓先前尝试就是此刻的当前尝试 application.setCurrentAppAttempt(schedulerApp); //新的尝试变成了当前尝试 metrics.submitAppAttempt(user); //将AppAttempt纳入统计 LOG.info("Added Application Attempt"+appAttemptId +"to scheduler from user"+application.getUser()); if (isAttemptRecovering){ //见调用参数,true表示Attempt并非新创 if (LOG.isDebugEnabled()){
LOG.debug(appAttemptId+"is recovering.Skipping notifying ATTEMPT_ADDED"); } }else { //如果是新加入的应用尝试 rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); } }
前面,在将一个App提交给调度器的时候(当时的事件是APP_ADDED),我们为其创建了一个SchedulerApplication对象。其实这个SchedulerApplication只是个简略的模板名,对于FifoScheduler而言更确切的类名是SchedulerApplication<FiCaSchedulerApp>。前缀“FiCa”说明这个类既用于FifoScheduler,也用于CapacityScheduler。创建了这个对象之后,就把它放入了调度器的applications集合中。需要的时候,用ApplicationId就可从这个集合中找到这个SchedulerApplication对象。这里的第一个语句正是这样。之所以需要这个对象,是因为这里要为其创建一个专供调度器使用的FiCaSchedulerApp对象,这是对SchedulerApplicationAttempt的扩展,实际上代表的是应用尝试(而不是应用本身),这个尝试应该成为App的“当前尝试(CurrentAppAttempt)”。而如果要转移继承先前尝试的状态,则此刻之前的当前尝试就是此后的先前尝试了。
注意,这个函数有个调用参数isAttemptRecovering,这是个布尔量,为“真”即表示此次调用addApplicationAttempt()并非要将一个新创的Attempt加入调度,而只是要恢复原来已经存在的Attempt。
一般而言,调用addApplicationAttempt()总是为新创的应用尝试居多,此时就得创建和派发RMAppAttemptEventType.ATTEMPT_ADDED事件。其实这就是调度器对于具体RMAppAttemptImpl对象的响应。那个RMAppAttemptImpl状态机的状态在前面已经变成SUBMITTED,现在又受到ATTEMPT_ADDED事件的触发,其跳变规则为:
addTransition(RMAppAttemptState.SUBMITTED,
EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.ATTEMPT_ADDED, new ScheduleTransition())
这条规则与我们以前所见有些不同,它的跳变目标有两个,一个是SCHEDULED,另一个是LAUNCHED_UNMANAGED_SAVING。这说明,在同一事件的触发下,状态机的下一个状态是什么还得看某些别的条件才能确定,这就是所谓“多弧(MultipleArc)”跳变。状态机的宿主即其所属对象的handle()函数对于单弧和多弧的处理有所不同,但是不管是单弧还是多弧,最后的状态改变总是发生在从transition()函数返回的时候,而且transition()的返回值决定了多弧跳变的目标状态究竟是这个集合中的哪一种。
我们看一下ScheduleTransition.transition()的代码:
[RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition() => AddApplicationToSchedulerTransition.transition()=>StartAppAttemptTransition.transition()
=> AttemptStartedTransition.transition()=> FifoScheduler.addApplicationAttempt() => RMAppAttemptImpl.ScheduleTransition.transition()] ScheduleTransition.transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event){ ApplicationSubmissionContext subCtx=appAttempt.submissionContext; if (! subCtx.getUnmanagedAM()){ //如果不是UnmanagedAM,就得为其创建AM //Need reset #containers before create new attempt, because this request //will be passed to scheduler, and scheduler will deduct the number after //AMcontainer allocated //Currently, following fields are all hard code, //TODO:change these fields when we want to support //priority/resource-name/relax-locality specification for AMcontainers //allocation. appAttempt.amReq.setNumContainers(1); //只要求一个容器,用于创建AM appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY); appAttempt.amReq.setResourceName(ResourceRequest.ANY); appAttempt.amReq.setRelaxLocality(true); //SchedulerUtils.validateResourceRequests is not necessary because //AMresource has been checked when submission Allocation amContainerAllocation= appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, Collections.singletonList(appAttempt.amReq), EMPTY_CONTAINER_RELEASE_LIST, null, null); //分配资源 //无需要释放的容器,在我们这个情景中 scheduler是FifoScheduler if (amContainerAllocation! =null &&amContainerAllocation.getContainers()! =null){ assert (amContainerAllocation.getContainers().size()==0); } return RMAppAttemptState.SCHEDULED; //RMAppAttemptImpl 状态机的新状态 }else { //save state and then go to LAUNCHED state appAttempt.storeAttempt(); return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING; } }
可见,ScheduleTransition.transition()的返回值因subCtx.getUnmanagedAM()的结果而异。这个subCtx就是用户提交的ApplicationSubmissionContext。如果subCtx.getUnmanagedAM()的结果为false,表示这不是一个UnmanagedAM,最后就会返回SCHEDULED;否则就会返回LAUNCHED_UNMANAGED_SAVING。总之,ScheduleTransition.transition()返回什么,状态机的下一个状态就是什么。
所谓“不受管理的AM(UnmanagedAM)”是指不由RM发起和管理,而直接由使用者在RM节点上通过命令行发起的AM。Hadoop的代码中有个类,叫UnmanagedAMLauncher,这个类是有main()函数的,所以使用者可以直接通过命令行发起一个JVM来执行这个类,这样就绕过了常规的作业提交和调度,直接就进入(在某个NodeManager节点上)发起AppMaster的阶段。不过这样的AppMaster一经发起后仍得向RM登记,因为所需的资源总归还得由RM分配。我们在这里只关心常规的情景和流程,有兴趣的读者可以自己阅读和分析UnmanagedAMLauncher的代码。
但是,为什么在这个地方会涉及UnmanagedAM呢?这是因为,对于正常提交的App,更准确地说是AppAttempt,下一步要做的也正是要指派在某个NodeManager节点上发起一个起着“课题组长”作用的AppMaster,这一点上二者的走向是共同的。虽然AppMaster并不直接执行具体App的程序,它也是一个独立运行的任务,是要启动一个独立的JVM加以执行的,它也需要耗用一定的资源。所以,要指派发起AppMaster,就得为其分配一个“容器(Container)”。这里所做的,就是先填写好一个ResourceRequest,就是代码中的amReq,然后通过调度器的allocate()方法分配容器。注意,这里要求分配的容器数量是1,那就是AppMaster本身所需的容器。至于AppMaster发起执行具体AppAttempt的也许是大量的任务(例如Mapper和Reducer)时所需的(也许是大量的)容器,那是以后的事,得由AppMaster出面向调度器申请了。
这个transition()函数与我们在前面看到的那些有所不同。一方面,是它返回一个状态,不是SCHEDULED就是LAUNCHED_UNMANAGED_SAVING,因为这是“多弧跳变”,具体跳变到什么状态要视情况而定。另一方面,transition()函数通常会在末尾发出一个别的什么事件,这个事件会被用来触发另一个过程,往往回过头来又使这个状态机发生变化,这就构成驱动状态机运转的动力,但是这里却没有。这是为什么呢?其实原因也很简单,因为这个状态机之是否还可前行,要看通过调度器分配资源能否成功。这里的scheduler.allocate(),对于我们来说就是FifoScheduler.allocate()。下面我们就将看到,在那里,每拿到一个容器就会发出一个RMContainerEventType.ACQUIRED事件。这个事件最终会导致一个RMAppAttemptEventType.CONTAINER_ALLOCATED事件,使这状态机跳出SCHEDULED状态,经由ALLOCATED_SAVING而进入ALLOCATED状态。但是,如果一时分配不到容器,则这个RMAppAttemptImpl的容器就会暂时停滞在SCHEDULED状态。
前面看到,当一个App被提交到RMAppManager手里时,RMAppManager.submitApplication()通过createAndPopulateNewRMApp()创建了一个RMAppImpl对象,此后又创建一个RMAppAttemptImpl对象作为运行这个App的一次尝试,后者则在调度器中创建一个FiCaSchedulerApp对象,作为其在调度器中的代表。
现在,由于受ATTEMPT_ADDED事件的触发,这个具体的RMAppAttemptImpl对象appAttempt要求调度器分配资源,这样就进入了FifoScheduler.allocate()。
[RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition() => AddApplicationToSchedulerTransition.transition()=>StartAppAttemptTransition.transition()
=> AttemptStartedTransition.transition()=> FifoScheduler.addApplicationAttempt() => RMAppAttemptImpl.ScheduleTransition.transition()> FifoScheduler.allocate()] FifoScheduler.allocate(ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask, List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) > FiCaSchedulerApp application=getApplicationAttempt(applicationAttemptId) //代表着要求分配资源的AppAttempt > SchedulerUtils.normalizeRequests(ask, resourceCalculator, clusterResource, minimumAllocation, maximumAllocation)//资源要求的合理性检测和规格化 > releaseContainers(release, application) //释放该释放的容器 > if (! ask.isEmpty()){ //要求分配的资源集合非空 >+ application.updateResourceRequests(ask)//Update application requests > } > application.updateBlacklist(blacklistAdditions, blacklistRemovals) //修改黑名单 > ContainersAndNMTokensAllocation allocation= application.pullNewlyAllocatedContainersAndNMTokens() //把新分配的容器都收揽过来并为之创建Token >> for(Iterator<RMContainer> i=newlyAllocatedContainers.iterator(); i.hasNext(); ){ //对于newlyAllocatedContainers列表中的每一个容器 >>+ RMContainer rmContainer=i.next() >>+ Container container=rmContainer.getContainer() >>+> return this.container >>+ container.setContainerToken( rmContext.getContainerTokenSecretManager().createContainerToken( container.getId(), container.getNodeId(), getUser(), container.getResource(), container.getPriority(), rmContainer.getCreationTime(), this.logAggregationContext)) >>+ NMToken nmToken=rmContext.getNMTokenSecretManager().createAndGetNMToken( getUser(), getApplicationAttemptId(), container) >>+ if (nmToken! =null)nmTokens.add(nmToken) >>+ returnContainerList.add(container) //揽到了一个容器 >>+ i.remove() //从newlyAllocatedContainers列表中摘除 >>+ rmContainer.handle(new RMContainerEvent( rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)) >> } //end for //已从该App的newlyAllocatedContainers中把容器都揽到了returnContainerList中 >> return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens) //所返回的ContainersAndNMTokensAllocation对象成为前面的allocation > return new Allocation(allocation.getContainerList(), application.getHeadroom(), null, null, null, allocation.getNMTokenList())
这个函数有5个参数,第一个是applicationAttemptId,其意义自明。此外就是4个List,其中ask是资源请求列表;release是资源释放列表;blacklistAdditions和blacklistRemovals则分别是需要加入黑名单或从中移除的资源列表。如果一个节点或机架在一个App的黑名单中,调度器就不会在这个节点或机架上为这个App分配资源。这样,如果需要,就可以让一个App有意避开在某些节点或机架上运行。不过,我们在前面看到,从ScheduleTransition.transition()中调用allocate()函数时这两个参数都是null。另外,调用时具体的要求是只要一个容器。
所以,这个函数既可用于分配资源,也可用于释放资源。所谓资源的分配和释放,实际上就是容器的分配与释放,因为具体的资源,例如存储空间和VCore,是包裹(记载)在容器里面的。
我们只关心容器的分配,故而直接看对于pullNewlyAllocatedContainersAndNMTokens()的调用。这个函数定义于SchedulerApplicationAttempt类内部,而FiCaSchedulerApp是对于SchedulerApplicationAttempt的扩充,所以这也是由FiCaSchedulerApp提供的操作方法。另外,SchedulerApplicationAttempt即FiCaSchedulerApp内部还有个RMContainer的列表newlyAllocatedContainers,意为新分配的容器。在这个列表中,如果有的话,是根据FiCaSchedulerApp的要求为其分配的容器(可以有多个),这些容器可以来自不同的节点(更确切地说是可以由不同的节点提供)。我们暂且不问这些容器的来历,只要知道这个列表中可能有分配好的容器就行了。而所谓allocate(),其实只是从这个列表中收揽已经分配的容器(RMContainer对象),并为这些容器办理使用证件(NMToken对象),然后一并打包成一个Allocation对象。其中收揽容器和办理证件的操作就是由pullNewlyAllocatedContainersAndNMTokens()完成的。
特别地,每当从newlyAllocatedContainers列表中收揽到一个容器,就向这容器(更确切地说是该容器的状态机)发出一个RMContainerEventType.ACQUIRED事件。
既然这些容器已经存在于newlyAllocatedContainers这个List中,当然是已经被创建的了。每个RMContainer对象都有自己的状态机,因为容器有它的生存周期,有状态变迁。
但是,也有可能在newlyAllocatedContainers中还没有所需的容器,那样就不会有ACQUIRED事件发出,RMAppAttemptImpl对象的状态机就会停滞在SCHEDULED状态。可想而知,系统中一定有一种机制,使得当有容器可供分配时再走一遍上述的过程。后面我们将会看到这一点。现在得说一下这些容器的来源了。