大数据处理系统:Hadoop源代码情景分析
上QQ阅读APP看书,第一时间看更新

第6章 作业的调度与指派

6.1 作业的受理

我们在上一章的结尾处看到,当一个作业,也就是一个App,通过RPC被提交到RM节点时,“上岸”后经由ClientRMService被交到了RMAppManager对象的手里。RM节点上的ClientRMService对象相当于接待站,而RMAppManager对象则专门管理与应用/作业的申请和运行有关的事务。两个对象均由ResourceManager创建,都在同一个JVM上。

ClientRMService是通过调用rmAppManager.submitApplication()把作业申请交到RMAppManager手里的,我们就从这里继续往下看。


      [RPC > ApplicationClientProtocolPBServiceImpl.submitApplication()>

      ClientRMService.submitApplication()> RMAppManager.submitApplication()]




      RMAppManager.submitApplication(ApplicationSubmissionContext submissionContext,

                                long submitTime, String user)throws YarnException {

          ApplicationId applicationId=submissionContext.getApplicationId();




          RMAppImpl application=

                        createAndPopulateNewRMApp(submissionContext, submitTime, user);

          ApplicationId appId=submissionContext.getApplicationId();

          if (UserGroupInformation.isSecurityEnabled()){ //如果开启了安全机制

            try{

              this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,

                    parseCredentials(submissionContext),

                    submissionContext.getCancelTokensWhenComplete(),

                    application.getUser());

            }catch(Exception e){ //异常,addApplicationAsync()失败

              LOG.warn("Unable to parse credentials.", e);

              //Sending APP_REJECTED is fine, since we assume that the

              //RMApp is in NEW state and thus we haven't yet informed the

              //scheduler about the existence of the application


              assert application.getState()==RMAppState.NEW;

              this.rmContext.getDispatcher().getEventHandler()

                .handle(new RMAppRejectedEvent(applicationId, e.getMessage()));

              throw RPCUtil.getRemoteException(e);

            }

          }else { //未开启安全机制

            //Dispatcher is not yet started at this time, so these START events

            //enqueued should be guaranteed to be first processed when dispatcher

            //gets started

            this.rmContext.getDispatcher().getEventHandler()

              .handle(new RMAppEvent(applicationId, RMAppEventType.START));

                        //rmContext获取所用的Dispatcher,进而获取其EventHandler

                        //START 事件交给它处理后者会用其触发RMAppImpl对象的状态机

          }

        }

首先是createAndPopulateNewRMApp()。顾名思义这个函数做的是两件事,即为所提交的作业创建一个RMApp对象,并Populate。不过RMApp只是个interface,实际创建的是个RMAppImpl对象。这是根据所提交的submissionContext,即前一章中所讲的ASC和用户名创建的,代表着具体的App,也包含着有关该App的全部信息。至于Populate,则是说把所创建的RMAppImpl对象连同其ApplicationId放到一个类似于名单一样的便查表中,即MAP<ApplicationId, RMApp>中,这样以后凭ID就可以从中找到它的RMAppImpl对象。


      [RPC > ApplicationClientProtocolPBServiceImpl.submitApplication()>

      ClientRMService.submitApplication()> RMAppManager.submitApplication()>

      createAndPopulateNewRMApp()]




      createAndPopulateNewRMApp(ApplicationSubmissionContext submissionContext,

                                                        long submitTime, String user)

      > ApplicationId applicationId=submissionContext.getApplicationId()

      > ResourceRequest amReq=validateAndCreateResourceRequest(submissionContext)

                  //检验资源请求是否合理,不合理就发 InvalidResourceRequestException异常

      > RMAppImpl application=new RMAppImpl(applicationId, rmContext,

                            this.conf, submissionContext.getApplicationName(), user,

                            submissionContext.getQueue(), submissionContext,

                            this.scheduler, this.masterService, submitTime,

                            submissionContext.getApplicationType(),  //默认为“YARN”

                            submissionContext.getApplicationTags(), amReq)

      > rmContext.getRMApps().putIfAbsent(applicationId, application)

                            //App加入RMContextImpl 内部的一个MAP

                            //以后凭applicationId就可找到其RMAppImpl 对象


      > this.applicationACLsManager.addApplication(applicationId,

                            submissionContext.getAMContainerSpec().getApplicationACLs())

                                              //App加入到访问控制名单ACL管理器中

      > String appViewACLs=submissionContext.getAMContainerSpec()

                            .getApplicationACLs().get(ApplicationAccessType.VIEW_APP)

      > rmContext.getSystemMetricsPublisher().appACLsUpdated(application,

                                  appViewACLs, System.currentTimeMillis())//与统计有关

      > return application

创建RMAppImpl时所传递的参数主要来自用户,特别是来自其提交的ApplicationSubmissionContext,但也有些信息是由RM节点上的RMAppManager提供的,如rmContext、this.conf、this.scheduler和this.masterService。其中this.scheduler决定了将采用哪一种调度器;this.masterService则是个ApplicationMasterService对象,是RM节点上专门管理那些为具体应用而建立的ApplicationMaster的。如前所述,ApplicationMaster类似于“项目组长”。下面就要为当前的这个App找个节点在上面创建ApplicationMaster了,创建后它就得向ApplicationMasterService登记报到。

回到前面RMAppManager.submitApplication()的代码。

Hadoop实现了基于身份认证的安全机制,但是用户可以启用也可以不启用这种安全机制。启用了安全机制后的流程当然要稍为复杂一些,但是由此而来的复杂性并不影响Hadoop与YARN的主流。在这里,如果一头钻进安全机制不免会偏离我们的大方向,会冲淡我们对主流的分析理解,所以我们现在走不开启安全机制的这条路。

作业的提交引起一个RMAppEventType.START事件,这就涉及我们在前面介绍过的状态机了。如前所述,用户每向RM提交一个作业,RMAppManager就为其创建一个RMAppImpl对象,作为该作业在RM中的代表。可想而知,随着作业的进展,其RMAppImpl对象将处于各种不同的状态。所以,每个RMAppImpl对象,或者说每个RMApp,都有个自己的状态机。所有RMAppImpl对象的状态机都是一样的,只不过各自处于不同的状态;所以stateMachineFactory是RMAppImpl类的静态成分。但是,具体的状态机stateMachine则是在RMAppImpl对象的构造函数中创建的。创建后的初始状态为RMAppState.NEW。而事件RMAppEventType.START就被用来触发该App的状态机。

不过,这里也简单提一下启用安全机制时的流程。代码中的getDelegationTokenRenewer()返回一个DelegationTokenRenewer对象,这个对象提供一个方法addApplicationAsync(),这显然就是异步增加一个App的意思。这个方法将App连同用户信息及其各种证件一起挂入DelegationTokenRenewer的队列,然后它的一个DelegationTokenRenewerRunnable线程会来检验队列中这些App的资质。如果检验合格,就同样会发起一次RMAppEventType.START事件,此后就跟不开启安全机制时的流程一样了。

这里创建的事件是RMAppEventType.START,并通过Dispatcher把这个事件派送给RMApp的状态机。前面讲状态机时说过,Dispatcher就好像是按事件类型导向的路由器,想要让某个受主接受某类事件,就得向Dispatcher登记,这就好比在路由器中配置一条路由。实际情况也正是如此。ResourceManager在其serviceInit()中向其内部的rmDispatcher登记,将RMAppEventType类的事件绑定到一个ApplicationEventDispatcher类对象,并将后者记录在rmContext中。注意,后者虽然名为ApplicationEventDispatcher,但实际上却只是RMApp的EventHandler。所以,上面的this.rmContext.getDispatcher().getEventHandler()所返回的其实是作为事件处理器的ApplicationEventDispatcher对象,然后调用其handle()方法就会触发RMApp状态机的跳变:


      [RPC > ApplicationClientProtocolPBServiceImpl.submitApplication()>

      ClientRMService.submitApplication()> RMAppManager.submitApplication()>

      ApplicationEventDispatcher.handle()]




      ApplicationEventDispatcher.handle(RMAppEvent event)

      > appID=event.getApplicationId() //获取该事件的AppId

      > rmApp=this.rmContext.getRMApps().get(appID) //根据AppId找到其RMAppImpl 对象

      > if (rmApp! =null)rmApp.handle(event)==RMAppImpl.handle(event)//调用其handle()方法

      >> appID=event.getApplicationId()

      >> this.stateMachine.doTransition(event.getType(), event) //触发状态机跳变

这里rmContext是RM的Context,实际上是个RMContextImpl对象。前述的那个MAP就在这个对象内部,所以根据具体应用的AppID可以得到其RMApp,实际上是代表着这个应用的RMAppImpl对象。

看一下RMAppImpl类的代码,就可以找到其状态机的跳变规则,其中以RMAppEventType.START为触发条件的只有一条,那就是:


      addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,

                  RMAppEventType.START, new RMAppNewlySavingTransition())

我们这个应用的RMAppImpl对象是刚刚创建的,它的状态机正处于初始状态RMAppState.NEW,所以这条规则正好适用。注意,这条规则还规定要为状态机提供一个RMAppNewlySavingTransition类的对象,这个类提供了一个transition()函数供状态机调用,作为这个跳变所需的伴随操作,这也是状态机对事件的反应之一。类似于RMAppNewlySavingTransition那样,用在这个地方的类,都实现了SingleArcTransition或MultipleArcTransition界面,所以都提供transition()操作。值得注意的是,名曰transition(),实际上却不是用来实现状态变迁的,而只是用来提供伴随着状态变迁、除状态跳变本身以外的操作。

所以,接着发生的事,是RMAppNewlySavingTransition.transition()得到调用,而这个状态机的当前状态则变成RMAppState.NEW_SAVING。


      [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()]




        RMAppNewlySavingTransition.transition(RMAppImpl app, RMAppEvent event){

          //If recovery is enabled then store the application information in a

          //non-blocking call so make sure that RMhas stored the information

          //needed to restart the AMafter RMrestart without further client communication

          LOG.info("Storing application with id"+app.applicationId);


          app.rmContext.getStateStore().storeNewApplication(app);

        }

这里在程序的调用路径中用“=>”表示前者导致了后者,而不是(或不一定是)前者直接调用了后者。这是因为,如果Dispatcher是异步的,那么前者只是把一个事件对象放进一个事件队列,而另有一个线程在处理这些事件对象的派发,这才调用到了后者,后者实际上并非由前者直接调用,而且后者是在另一个线程的上下文中得到执行。

显然,这个函数要做的事情是通过storeNewApplication()把具体应用的信息都保存起来。这是因为,这个应用是否能马上被调度运行还不得而知,也许需要等上很长时间,而且随后对这个应用的处理和执行有可能中途失败,将要为其创建的AM甚至RM都有可能重启,那时候这个App应该要有个副本作为恢复起点,这相当于数据库处理中的checkpoint。我们在前面看到所创建的RMAppImpl对象被放进了RM的app集合,但是那个正本的RMAppImpl对象在随后的处理中是会改变,甚至有可能丢失的。


      [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()>

      RMStateStore.storeNewApplication()]




        /**

        * Non-Blocking API

        * ResourceManager services use this to store the application's state

        * This does not block the dispatcher threads

        * RMAppStoredEvent will be sent on completion to notify the RMApp

        */




      storeNewApplication(RMApp app){

          ApplicationSubmissionContext context=app.getApplicationSubmissionContext();

          assert context instanceof ApplicationSubmissionContextPBImpl;

          ApplicationState appState=new ApplicationState(app.getSubmitTime(),

                                            app.getStartTime(), context, app.getUser());

          dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));

      }

显然,这里要保存的主体是这个App的ApplicationSubmissionContext,用户所提交的信息其实都在这里面。把它保存起来,就把所提交的作业信息保存了一份副本,所以这里创建的是一个包含着这些信息的ApplicationState对象。此外,这里还以此为参数创建了一个RMStateStoreAppEvent事件,并让Dispatcher把该事件分发给相应的事件处理器。不过,这个事件的类型属于RMStateStoreEvent,而不是前述的RMAppState,所以针对的是不同的事件处理器或状态机。事实上,这一方面固然是为了保存所创建的ApplicationState对象appState,另一方面这也会引起RMStateStore状态机的跳变和反应。这个RMStateStoreAppEvent的具体类型是RMStateStoreEventType.STORE_APP, appState则相当于这个事件的附件。

那保存在哪里呢?RM节点上有个RMStateStore对象,我们可以在ResourceManager的serviceInit()中看到RMStateStore的创建。但RMStateStore是个抽象类,由此扩充派生出来的就有FileSystemRMStateStore、MemoryRMStateStore、NullRMStateStore以及ZKRMStateStore,具体选用哪一种可以在配置文件yarn-default.xml中加以指定。

可想而知,如果采用的是FileSystemRMStateStore,那就要把副本保存在文件系统中,这里面有个过程,而且可能要应对许多不同的情况。所以,其实RMStateStore类自己就有个StateMachineFactory,具体的FileSystemRMStateStore对象则各有自己的状态机。

RMStateStore对STORE_APP事件的反应是在StoreAppTransition.transition()中调用storeApplicationStateInternal()以保存appState中的数据,操作完成以后就发出一个RMAppEventType.APP_NEW_SAVED事件,用来触发RMAppImpl对象的状态机。

在RMAppImpl对象的状态机中,以RMAppEventType.APP_NEW_SAVED为触发条件的跳变规则是:


      addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,

        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())

就是说,如果状态机的当前状态是NEW_SAVING,到来的事件是APP_NEW_SAVED,则状态变成SUBMITTED,伴随的操作是AddApplicationToSchedulerTransition.transition()。状态变成RMAppState.SUBMITTED,就表示提交已经完成。不过状态的变化发生在伴随操作之后,所以先要执行下面这个操作:


      [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()=>

      AddApplicationToSchedulerTransition.transition()]




      AddApplicationToSchedulerTransition.transition(RMAppImpl app, RMAppEvent event){

        app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,

                                    app.submissionContext.getQueue(), app.user,

                                    app.submissionContext.getReservationID()));

      }

这里创建并作为参数传给app.handler.handle()的AppAddedSchedulerEvent对象把有关该app的一些信息打包在一起,其中applicationId和用户名user不言自明;而getQueue()所返回的队列名表示要把该app请求挂到哪一个资源调度队列中。最后的ReservationID则用于事先已有预订的情况。我们来看一下AppAddedSchedulerEvent对象的构造函数:


      class AppAddedSchedulerEvent extends SchedulerEvent {

        public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,

                      String user, boolean isAppRecovering, ReservationId reservationID){

          super(SchedulerEventType.APP_ADDED);

          this.applicationId=applicationId;

          this.queue=queue;

          this.user=user;

          this.reservationID=reservationID;

          this.isAppRecovering=isAppRecovering;


        }

      }

这里我们关心的是,这个事件的类型是SchedulerEventType.APP_ADDED。

另外,这个事件是通过app.handler.handle()派发的;而RMAppImpl的handler,到源码中看一下就可知道,是通过dispatcher.getEventHandler()得来的。那么这个EventHandler究竟是什么呢?这就要看当初登记与SchedulerEventType绑定的是什么。事实上,前面我们看到在ResourceManager的初始化阶段,在其所创建的RMActiveServices的serviceInit()中创建了一个SchedulerEventDispatcher对象,并登记与SchedulerEventType类的事件绑定,这个SchedulerEventDispatcher实现了EventHandler界面,起着EventHandler的作用。所以这里调用的实际上是SchedulerEventDispatcher.handle(SchedulerEventType.APP_ADDED)。

注意SchedulerEventDispatcher是定义于ResourceManager内部的一个类,它实现了EventHandler界面,并且内部有一个EventProcessor线程。现在调用的是EventHandler界面上的handle()方法:


      [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()

      => AddApplicationToSchedulerTransition.transition()> SchedulerEventDispatcher.handle()]




          ResourceManager.SchedulerEventDispatcher.handle(SchedulerEvent event){

            try{

            int qSize=eventQueue.size();

            if (qSize! =0&&qSize %1000==0){

              LOG.info("Size of scheduler event-queue is"+qSize); //1000个事件LOG一次

            }

            int remCapacity=eventQueue.remainingCapacity();

            if (remCapacity < 1000){ //事件队列的容量已感紧张

              LOG.info("Very low remaining capacity on scheduler event queue:"+remCapacity);

            }

            this.eventQueue.put(event); //挂入调度器的事件队列

          }catch(InterruptedException e){

            LOG.info("Interrupted.Trying to exit gracefully.");

          }

        }

像AsyncDispatcher中的情况一样,这个handle()只是把作为参数传下来的事件挂在SchedulerEventDispatcher的事件队列中就返回了。但是SchedulerEventDispatcher内部的EventProcessor线程会从这事件队列中取出事件做进一步的处理,它的run()函数如下:


      [SchedulerEventDispatcher.EventProcessor.run()]




      public void run(){

          SchedulerEvent event;


          while (! stopped&&! Thread.currentThread().isInterrupted()){

            try{

              event=eventQueue.take(); //从队列中取下一个事件

            }catch(InterruptedException e){

              LOG.error("Returning, interrupted:"+e);

              return; //TODO:Kill RM.

            }




            try{

              scheduler.handle(event); //例如FifoScheduler.handle()

            }catch(Throwable t){

              //An error occurred, but we are shutting down anyway

              //If it was an InterruptedException, the very act of

              //shutdown could have caused it and is probably harmless

              if (stopped){

                LOG.warn("Exception during shutdown:", t);

                break;

              }

              LOG.fatal("Error in handling event type"+event.getType()+"to the scheduler", t);

              if (shouldExitOnError&&! ShutdownHookManager.get().isShutdownInProgress()){

                LOG.info("Exiting, bbye…");

                System.exit(-1);

              }

            }//end catch

          } //end while

        }

SchedulerEventDispatcher的scheduler,就是ResourceManager的scheduler,如前所述可有三种选择,这里假定其为FifoScheduler,所以这里所调用的就是FifoScheduler.handle()。我们看一下它的摘要:


      FifoScheduler.handle(SchedulerEvent event)

      > switch(event.getType()){

      > case NODE_ADDED:  //增加了一个节点

      >> …

      > case NODE_REMOVED: //撤去了一个节点

      >> …

      > case NODE_RESOURCE_UPDATE: //节点的资源发生变化

      >> …

      > case NODE_UPDATE: //节点的状态发生变化

      >> …


      > case APP_ADDED:   //增加了一个App

      >> appAddedEvent=(AppAddedSchedulerEvent)event

      >> addApplication(appAddedEvent.getApplicationId(), appAddedEvent.getQueue(), … )

      >>> application=new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user )

      >>> applications.put(applicationId, application)

                              //将创建的 SchedulerApplication放在applications集合中

      >>> metrics.submitApp(user) //metrics是一个QueueMetrics对象,用于统计目的

      >>> e=new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)//创建新事件

      >>> rmContext.getDispatcher().getEventHandler().handle(e) //派发新创建的事件

      > case APP_REMOVED: //撤销了一个App

      >> …

      > case APP_ATTEMPT_ADDED:  //增加了一个App企图

      >> …

      > case APP_ATTEMPT_REMOVED: //撤销了一个App企图

      >> …

      > case CONTAINER_EXPIRED:  //容器使用超时

      >> …

      > } //end switch

同属于SchedulerEventType的事件有不少,这个handle()方法通过一个switch语句分情形加以处理。由此可见,作为实现了EventHandler界面的对象,其handle()函数对于事件的反应并不一定是通过状态机做出的,也可以通过switch语句做出反应。我们现在这个事件是APP_ADDED,暂不关心别的事件。

对于APP_ADDED,具体的反应是addApplication()。这里把这个函数展开了:首先是为这个App创建一个SchedulerApplication类的对象application,这是具体App在调度器中的代表,这个SchedulerApplication和applicationId一起被放入一个集合applications中,以后凭applicationId就可以在这个集合中找到其SchedulerApplication对象。请注意在创建application时的参数DEFAULT_QUEUE,这说明在ApplicationSubmissionContext中指定的队列对于FifoScheduler不起作用,FifoScheduler总是采用其DEFAULT_QUEUE。

同时,出于统计的目的,这里还以用户名user为参数向一个QueueMetrics类对象metrics调用其submitApp(),表示这个用户增加了一个App。

然后就创建一个新的事件RMAppEventType.APP_ACCEPTED,表示这个App已经被提交给调度器并为其接受。这个事件既然属于RMAppEventType,事件中又包含着原先的applicationId,那就是回过头来针对原先那个RMAppImpl状态机的,相应的跳变规则为:


      addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,

            RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())

此时RMAppImpl状态机对触发事件RMAppEventType.APP_ACCEPTED的反应是:调用伴随操作StartAppAttemptTransition.transition(),然后使这个状态机的当前状态变成

RMAppState.ACCEPTED。


      [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()

      =>AddApplicationToSchedulerTransition.transition()=>StartAppAttemptTransition.transition()]




        StartAppAttemptTransition.transition(RMAppImpl app, RMAppEvent event){

          app.createAndStartNewAttempt(false); //开始一次启动运行的尝试

        }

既然App申请已为调度器所接受,下一步就是为此应用(作业)开始一次运行尝试,即AppAttempt。这里的参数app是个RMAppImpl对象,所以这里所调用的操作就是RMAppImpl.createAndStartNewAttempt(),其摘要如下:


      [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()=>

      AddApplicationToSchedulerTransition.transition()=> StartAppAttemptTransition.transition()>

      RMAppImpl.createAndStartNewAttempt()]




      RMAppImpl.createAndStartNewAttempt()

      > createNewAttempt() //创建一个新的RMAppAttemptImpl,并将其设置成currentAttempt

      >> appAttemptId=ApplicationAttemptId.newInstance(applicationId, attempts.size()+1)

      >> attempt=new RMAppAttemptImpl(appAttemptId, rmContext,

                            scheduler, masterService, …)

      >> attempts.put(appAttemptId, attempt)//attempt连同其 ID一起放入attempts集合

      >> currentAttempt=attempt //将新创建的attempt设置成currentAttempt

      > e=new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(), …)

              //生成一个RMAppAttemptEventType.START事件

      >> super(appAttemptId, RMAppAttemptEventType.START)

      > handler.handle(e) //handler取决于事件类型,在这里是RMAppAttemptImpl.handle()

顺便看一下,作为对照,RMAppImpl.createAndStartNewAttempt()的源码是这样:


      RMAppImpl.createAndStartNewAttempt(boolean transferStateFromPreviousAttempt){

        createNewAttempt();

        handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),

                                                  transferStateFromPreviousAttempt));

      }

对于用户提交的每个App,试图启动其运行的每次尝试(试运行)都称为一次应用尝试,即RMAppAttempt,由一个RMAppAttemptImpl类的对象作为代表。之所以加上前缀RM,显然是因为这是在RM节点上。对于具体App的一次尝试是个逻辑上独立又颇为复杂的过程,中间有许多状态变迁,所以RMAppAttemptImpl对象中也有状态机。

像ApplicationId一样,每个RMAppAttemptImpl都有个唯一的ApplicationAttemptId,所以在创建RMAppAttemptImpl之前都要为其分配一个号码,每次递增。

在RMAppImpl对象内部有个attempts集合,实际上是个Map<ApplicationAttemptId, RMAppAttempt>,凡属于这个App的所有attempt都在这个集合中,根据具体AppAttempt的ID号就可在这个集合中找到其RMAppAttemptImpl对象。所以,每创建了一个RMAppAttemptImpl对象之后,就要把它连同其ID号放到这个集合中,并使其成为这个RMAppImpl的“当前尝试”currentAttempt。

然后就创建一个RMAppAttemptEventType.START事件并加以派发。如前所述,派发的目标取决于事件类型。事件类型与事件处理器的绑定是事先向Dispatcher登记好的,所以这里的handler.handle()实际上是ApplicationAttemptEventDispatcher.handle():


      [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()=>

      AddApplicationToSchedulerTransition.transition()=> StartAppAttemptTransition.transition()>

      RMAppImpl.createAndStartNewAttempt()> ApplicationAttemptEventDispatcher.handle()]




      ApplicationAttemptEventDispatcher.handle(RMAppAttemptEvent event)

      > ApplicationAttemptId appAttemptID=event.getApplicationAttemptId()

      > ApplicationId appAttemptId=appAttemptID.getApplicationId()//注意,这是ApplicationId

      > rmApp=this.rmContext.getRMApps().get(appAttemptId)

                                            //根据ApplicationId获取其RMAppImpl 对象

      > if (rmApp! =null){

      >+ RMAppAttempt rmAppAttempt=rmApp.getRMAppAttempt(appAttemptID)

                                //根据ApplicationAttemptId获取其RMAppAttemptImpl 对象

      >+ if (rmAppAttempt! =null)rmAppAttempt.handle(event)==RMAppAttemptImpl.handle()

      >+> appAttemptID=event.getApplicationAttemptId() // event中提取

      >+> this.stateMachine.doTransition(event.getType(), event)

                                      //这是具体RMAppAttemptImpl 对象的状态机

      > }

可见,事件处理器ApplicationAttemptEventDispatcher对此事件的处理就是驱动具体RMAppAttemptImpl对象的状态机,而RMAppAttemptImpl状态机中相应的跳变规则如下:


      addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,

                        RMAppAttemptEventType.START, new AttemptStartedTransition())

      显然,RMAppAttemptImpl.handle()会调用AttemptStartedTransition.transition():

      [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()

      =>AddApplicationToSchedulerTransition.transition()=>StartAppAttemptTransition.transition()

      > RMAppImpl.createAndStartNewAttempt()=> AttemptStartedTransition.transition()]




          AttemptStartedTransition.transition(RMAppAttemptImpl appAttempt,

                                                            RMAppAttemptEvent event){

          boolean transferStateFromPreviousAttempt=false;

          if (event instanceof RMAppStartAttemptEvent){

            transferStateFromPreviousAttempt=

                ((RMAppStartAttemptEvent)event).getTransferStateFromPreviousAttempt();


          }

          appAttempt.startTime=System.currentTimeMillis();




          //Register with the ApplicationMasterService

          appAttempt.masterService.registerAppAttempt(appAttempt.applicationAttemptId);

                                    //ApplicationMasterService登记一次RMAppAttempt

          if (UserGroupInformation.isSecurityEnabled()){ //如果启用了安全机制

            appAttempt.cl ientTokenMasterKey=

                appAttempt.rmContext.getCl ientToAMTokenSecretManager()

                  .createMasterKey(appAttempt.applicationAttemptId);

          }




          //Add the applicationAttempt to the scheduler and inform the scheduler

          //whether to transfer the state from previous attempt.

          appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(

                  appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));

        }

我们为这个函数做一个摘要,并略作展开,就是这样:


      AttemptStartedTransition.transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event)

      > transferStateFromPreviousAttempt=event.getTransferStateFromPreviousAttempt()

      > appAttempt.masterService.registerAppAttempt(appAttempt.applicationAttemptId)

      > e=new AppAttemptAddedSchedulerEvent(appAttempt.applicationAttemptId, …)

      >> super(SchedulerEventType.APP_ATTEMPT_ADDED)

      > appAttempt.eventHandler.handle(e) //这就是FifoScheduler.handle()

先说说参数。这里的参数已经从RMAppImpl变成了RMAppAttemptImpl。前者代表一个作业、一个应用;后者则代表运行这个应用的一次尝试、一次试运行。一个应用不一定能一次运行成功,也许需要运行多次才能完成。这里的这个RMAppAttemptImpl对象appAttempt,就是前面在createAndStartNewAttempt()中创建的,这是RMAppImpl状态机受事件RMAppEventType.APP_ACCEPTED的触发而做出的反应。从现在开始,下面的活动就都是针对App的一次具体的运行尝试,而不是针对整个App的了。

这里的布尔量transferStateFromPreviousAttempt最初是从事件对象event里带进来的,目的在于供调度器使用,用以确定在为一个App发起一次新的Attempt时要不要把先前的Attempt的状态转移过来。这是因为,在这种情况下先前的Attempt多半已经失败,将其状态转移继承过来很可能可以利用其已经获得的部分成果。

另一个事情,是通过registerAppAttempt()向RM的ApplicationMasterService登记,向其报告新创建RMAppAttemptImpl对象的ApplicationAttemptId。因为一旦为此应用在某个NodeManager节点上创建起一个ApplicationMaster,它就要来与ApplicationMasterService联系,所以这里先要登记一下,挂个号备个案。ApplicationMasterService是专门管理正在集群中运行的那些App的ApplicationMaster的。

然后就是创建AppAttemptAddedSchedulerEvent对象并加以派发。这个事件的类型实际上是SchedulerEventType.APP_ATTEMPT_ADDED。在我们的情景中,因为我们假定采用FifoScheduler,这个事件对象又会被派发给FifoScheduler.handle()。我们在前面见过这个函数的摘要,但是那时我们只关心其switch语句中的“case APP_ADDED”,现在要关心一下“case APP_ATTEMPT_ADDED”了:


      FifoScheduler.handle(SchedulerEvent event)

      > switch(event.getType()){

      > case APP_ATTEMPT_ADDED:   //增加了一个App运行尝试

      >> addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), …)

      > } //end switch

这当然是要把我们的AppAttempt加入到调度器中。下面是addApplicationAttempt()的源码。


      [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()

      =>AddApplicationToSchedulerTransition.transition()=>StartAppAttemptTransition.transition()

      => AttemptStartedTransition.transition()=> FifoScheduler.addApplicationAttempt()]




          FifoScheduler.addApplicationAttempt(ApplicationAttemptId appAttemptId,

                  boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering){

          SchedulerApplication<FiCaSchedulerApp> application=

                                    applications.get(appAttemptId.getApplicationId());

                  //参考前面FifoScheduler.handle()case APP_ADDED下的addApplication()

          String user=application.getUser();

          //TODO:Fix store

          FiCaSchedulerApp schedulerApp=

                    new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,

                                                  activeUsersManager, this.rmContext);

                    //这个类型既用于FifoScheduler,又用于CapacityScheduler,故得此名

          if (transferStateFromPreviousAttempt){

            schedulerApp.transferStateFromPreviousAttempt(

                                      application.getCurrentAppAttempt());

          } //如果要转移继承先前尝试的状态,那么所谓先前尝试就是此刻的当前尝试

          application.setCurrentAppAttempt(schedulerApp); //新的尝试变成了当前尝试




          metrics.submitAppAttempt(user);    //AppAttempt纳入统计

          LOG.info("Added Application Attempt"+appAttemptId

                                    +"to scheduler from user"+application.getUser());

          if (isAttemptRecovering){ //见调用参数,true表示Attempt并非新创

            if (LOG.isDebugEnabled()){


            LOG.debug(appAttemptId+"is recovering.Skipping notifying ATTEMPT_ADDED");

          }

        }else { //如果是新加入的应用尝试

          rmContext.getDispatcher().getEventHandler().handle(

            new RMAppAttemptEvent(appAttemptId,

                                  RMAppAttemptEventType.ATTEMPT_ADDED));

        }

      }

前面,在将一个App提交给调度器的时候(当时的事件是APP_ADDED),我们为其创建了一个SchedulerApplication对象。其实这个SchedulerApplication只是个简略的模板名,对于FifoScheduler而言更确切的类名是SchedulerApplication<FiCaSchedulerApp>。前缀“FiCa”说明这个类既用于FifoScheduler,也用于CapacityScheduler。创建了这个对象之后,就把它放入了调度器的applications集合中。需要的时候,用ApplicationId就可从这个集合中找到这个SchedulerApplication对象。这里的第一个语句正是这样。之所以需要这个对象,是因为这里要为其创建一个专供调度器使用的FiCaSchedulerApp对象,这是对SchedulerApplicationAttempt的扩展,实际上代表的是应用尝试(而不是应用本身),这个尝试应该成为App的“当前尝试(CurrentAppAttempt)”。而如果要转移继承先前尝试的状态,则此刻之前的当前尝试就是此后的先前尝试了。

注意,这个函数有个调用参数isAttemptRecovering,这是个布尔量,为“真”即表示此次调用addApplicationAttempt()并非要将一个新创的Attempt加入调度,而只是要恢复原来已经存在的Attempt。

一般而言,调用addApplicationAttempt()总是为新创的应用尝试居多,此时就得创建和派发RMAppAttemptEventType.ATTEMPT_ADDED事件。其实这就是调度器对于具体RMAppAttemptImpl对象的响应。那个RMAppAttemptImpl状态机的状态在前面已经变成SUBMITTED,现在又受到ATTEMPT_ADDED事件的触发,其跳变规则为:


      addTransition(RMAppAttemptState.SUBMITTED,

                EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,

                          RMAppAttemptState.SCHEDULED),

                RMAppAttemptEventType.ATTEMPT_ADDED, new ScheduleTransition())

这条规则与我们以前所见有些不同,它的跳变目标有两个,一个是SCHEDULED,另一个是LAUNCHED_UNMANAGED_SAVING。这说明,在同一事件的触发下,状态机的下一个状态是什么还得看某些别的条件才能确定,这就是所谓“多弧(MultipleArc)”跳变。状态机的宿主即其所属对象的handle()函数对于单弧和多弧的处理有所不同,但是不管是单弧还是多弧,最后的状态改变总是发生在从transition()函数返回的时候,而且transition()的返回值决定了多弧跳变的目标状态究竟是这个集合中的哪一种。

我们看一下ScheduleTransition.transition()的代码:


      [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()

      => AddApplicationToSchedulerTransition.transition()=>StartAppAttemptTransition.transition()


      => AttemptStartedTransition.transition()=> FifoScheduler.addApplicationAttempt()

      => RMAppAttemptImpl.ScheduleTransition.transition()]




          ScheduleTransition.transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event){

          ApplicationSubmissionContext subCtx=appAttempt.submissionContext;

          if (! subCtx.getUnmanagedAM()){ //如果不是UnmanagedAM,就得为其创建AM

            //Need reset #containers before create new attempt, because this request

            //will be passed to scheduler, and scheduler will deduct the number after

            //AMcontainer allocated




            //Currently, following fields are all hard code,

            //TODO:change these fields when we want to support

            //priority/resource-name/relax-locality specification for AMcontainers

            //allocation.

            appAttempt.amReq.setNumContainers(1);  //只要求一个容器,用于创建AM

            appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);

            appAttempt.amReq.setResourceName(ResourceRequest.ANY);

            appAttempt.amReq.setRelaxLocality(true);




            //SchedulerUtils.validateResourceRequests is not necessary because

            //AMresource has been checked when submission

            Allocation amContainerAllocation=

                appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,

                    Collections.singletonList(appAttempt.amReq),

                    EMPTY_CONTAINER_RELEASE_LIST, null, null); //分配资源

                          //无需要释放的容器,在我们这个情景中 schedulerFifoScheduler

            if (amContainerAllocation! =null

                &&amContainerAllocation.getContainers()! =null){

              assert (amContainerAllocation.getContainers().size()==0);

            }

            return RMAppAttemptState.SCHEDULED; //RMAppAttemptImpl 状态机的新状态

          }else {

            //save state and then go to LAUNCHED state

            appAttempt.storeAttempt();

            return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;

          }

        }

可见,ScheduleTransition.transition()的返回值因subCtx.getUnmanagedAM()的结果而异。这个subCtx就是用户提交的ApplicationSubmissionContext。如果subCtx.getUnmanagedAM()的结果为false,表示这不是一个UnmanagedAM,最后就会返回SCHEDULED;否则就会返回LAUNCHED_UNMANAGED_SAVING。总之,ScheduleTransition.transition()返回什么,状态机的下一个状态就是什么。

所谓“不受管理的AM(UnmanagedAM)”是指不由RM发起和管理,而直接由使用者在RM节点上通过命令行发起的AM。Hadoop的代码中有个类,叫UnmanagedAMLauncher,这个类是有main()函数的,所以使用者可以直接通过命令行发起一个JVM来执行这个类,这样就绕过了常规的作业提交和调度,直接就进入(在某个NodeManager节点上)发起AppMaster的阶段。不过这样的AppMaster一经发起后仍得向RM登记,因为所需的资源总归还得由RM分配。我们在这里只关心常规的情景和流程,有兴趣的读者可以自己阅读和分析UnmanagedAMLauncher的代码。

但是,为什么在这个地方会涉及UnmanagedAM呢?这是因为,对于正常提交的App,更准确地说是AppAttempt,下一步要做的也正是要指派在某个NodeManager节点上发起一个起着“课题组长”作用的AppMaster,这一点上二者的走向是共同的。虽然AppMaster并不直接执行具体App的程序,它也是一个独立运行的任务,是要启动一个独立的JVM加以执行的,它也需要耗用一定的资源。所以,要指派发起AppMaster,就得为其分配一个“容器(Container)”。这里所做的,就是先填写好一个ResourceRequest,就是代码中的amReq,然后通过调度器的allocate()方法分配容器。注意,这里要求分配的容器数量是1,那就是AppMaster本身所需的容器。至于AppMaster发起执行具体AppAttempt的也许是大量的任务(例如Mapper和Reducer)时所需的(也许是大量的)容器,那是以后的事,得由AppMaster出面向调度器申请了。

这个transition()函数与我们在前面看到的那些有所不同。一方面,是它返回一个状态,不是SCHEDULED就是LAUNCHED_UNMANAGED_SAVING,因为这是“多弧跳变”,具体跳变到什么状态要视情况而定。另一方面,transition()函数通常会在末尾发出一个别的什么事件,这个事件会被用来触发另一个过程,往往回过头来又使这个状态机发生变化,这就构成驱动状态机运转的动力,但是这里却没有。这是为什么呢?其实原因也很简单,因为这个状态机之是否还可前行,要看通过调度器分配资源能否成功。这里的scheduler.allocate(),对于我们来说就是FifoScheduler.allocate()。下面我们就将看到,在那里,每拿到一个容器就会发出一个RMContainerEventType.ACQUIRED事件。这个事件最终会导致一个RMAppAttemptEventType.CONTAINER_ALLOCATED事件,使这状态机跳出SCHEDULED状态,经由ALLOCATED_SAVING而进入ALLOCATED状态。但是,如果一时分配不到容器,则这个RMAppAttemptImpl的容器就会暂时停滞在SCHEDULED状态。

前面看到,当一个App被提交到RMAppManager手里时,RMAppManager.submitApplication()通过createAndPopulateNewRMApp()创建了一个RMAppImpl对象,此后又创建一个RMAppAttemptImpl对象作为运行这个App的一次尝试,后者则在调度器中创建一个FiCaSchedulerApp对象,作为其在调度器中的代表。

现在,由于受ATTEMPT_ADDED事件的触发,这个具体的RMAppAttemptImpl对象appAttempt要求调度器分配资源,这样就进入了FifoScheduler.allocate()。


      [RMAppManager.submitApplication()=> RMAppNewlySavingTransition.transition()

      => AddApplicationToSchedulerTransition.transition()=>StartAppAttemptTransition.transition()


    => AttemptStartedTransition.transition()=> FifoScheduler.addApplicationAttempt()

    => RMAppAttemptImpl.ScheduleTransition.transition()> FifoScheduler.allocate()]




    FifoScheduler.allocate(ApplicationAttemptId applicationAttemptId,

                        List<ResourceRequest> ask, List<ContainerId> release,

                        List<String> blacklistAdditions, List<String> blacklistRemovals)

    > FiCaSchedulerApp application=getApplicationAttempt(applicationAttemptId)

                                                  //代表着要求分配资源的AppAttempt

    > SchedulerUtils.normalizeRequests(ask, resourceCalculator, clusterResource,

                  minimumAllocation, maximumAllocation)//资源要求的合理性检测和规格化releaseContainers(release, application) //释放该释放的容器

    > if (! ask.isEmpty()){ //要求分配的资源集合非空

    >+ application.updateResourceRequests(ask)//Update application requests

    > }

    > application.updateBlacklist(blacklistAdditions, blacklistRemovals) //修改黑名单

    > ContainersAndNMTokensAllocation allocation=

                                  application.pullNewlyAllocatedContainersAndNMTokens()

                                        //把新分配的容器都收揽过来并为之创建Token

    >> for(Iterator<RMContainer> i=newlyAllocatedContainers.iterator(); i.hasNext(); ){

                                  //对于newlyAllocatedContainers列表中的每一个容器

    >>+ RMContainer rmContainer=i.next()

    >>+ Container container=rmContainer.getContainer()

    >>+> return this.container

    >>+ container.setContainerToken(

            rmContext.getContainerTokenSecretManager().createContainerToken(

              container.getId(), container.getNodeId(), getUser(), container.getResource(),

              container.getPriority(), rmContainer.getCreationTime(), this.logAggregationContext))

    >>+ NMToken nmToken=rmContext.getNMTokenSecretManager().createAndGetNMToken(

                                      getUser(), getApplicationAttemptId(), container)

    >>+ if (nmToken! =null)nmTokens.add(nmToken)

    >>+ returnContainerList.add(container)      //揽到了一个容器

    >>+ i.remove()             //newlyAllocatedContainers列表中摘除

    >>+ rmContainer.handle(new RMContainerEvent(

                      rmContainer.getContainerId(), RMContainerEventType.ACQUIRED))

    >> } //end for

        //已从该AppnewlyAllocatedContainers中把容器都揽到了returnContainerList

    >> return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens)

                  //所返回的ContainersAndNMTokensAllocation对象成为前面的allocation

    > return new Allocation(allocation.getContainerList(), application.getHeadroom(),

                                        null, null, null, allocation.getNMTokenList())

这个函数有5个参数,第一个是applicationAttemptId,其意义自明。此外就是4个List,其中ask是资源请求列表;release是资源释放列表;blacklistAdditions和blacklistRemovals则分别是需要加入黑名单或从中移除的资源列表。如果一个节点或机架在一个App的黑名单中,调度器就不会在这个节点或机架上为这个App分配资源。这样,如果需要,就可以让一个App有意避开在某些节点或机架上运行。不过,我们在前面看到,从ScheduleTransition.transition()中调用allocate()函数时这两个参数都是null。另外,调用时具体的要求是只要一个容器。

所以,这个函数既可用于分配资源,也可用于释放资源。所谓资源的分配和释放,实际上就是容器的分配与释放,因为具体的资源,例如存储空间和VCore,是包裹(记载)在容器里面的。

我们只关心容器的分配,故而直接看对于pullNewlyAllocatedContainersAndNMTokens()的调用。这个函数定义于SchedulerApplicationAttempt类内部,而FiCaSchedulerApp是对于SchedulerApplicationAttempt的扩充,所以这也是由FiCaSchedulerApp提供的操作方法。另外,SchedulerApplicationAttempt即FiCaSchedulerApp内部还有个RMContainer的列表newlyAllocatedContainers,意为新分配的容器。在这个列表中,如果有的话,是根据FiCaSchedulerApp的要求为其分配的容器(可以有多个),这些容器可以来自不同的节点(更确切地说是可以由不同的节点提供)。我们暂且不问这些容器的来历,只要知道这个列表中可能有分配好的容器就行了。而所谓allocate(),其实只是从这个列表中收揽已经分配的容器(RMContainer对象),并为这些容器办理使用证件(NMToken对象),然后一并打包成一个Allocation对象。其中收揽容器和办理证件的操作就是由pullNewlyAllocatedContainersAndNMTokens()完成的。

特别地,每当从newlyAllocatedContainers列表中收揽到一个容器,就向这容器(更确切地说是该容器的状态机)发出一个RMContainerEventType.ACQUIRED事件。

既然这些容器已经存在于newlyAllocatedContainers这个List中,当然是已经被创建的了。每个RMContainer对象都有自己的状态机,因为容器有它的生存周期,有状态变迁。

但是,也有可能在newlyAllocatedContainers中还没有所需的容器,那样就不会有ACQUIRED事件发出,RMAppAttemptImpl对象的状态机就会停滞在SCHEDULED状态。可想而知,系统中一定有一种机制,使得当有容器可供分配时再走一遍上述的过程。后面我们将会看到这一点。现在得说一下这些容器的来源了。