大数据处理系统:Hadoop源代码情景分析
上QQ阅读APP看书,第一时间看更新

4.5 RPC客户端的创建

再看客户端这一边,我们从应用层的Client这个类开始。

我们知道,在Hadoop的IPC层上有个名为Client的类,那是在org.apache.hadoop.ipc这个package中。注意在应用层也有个叫Client的类,这个类起着相当于Shell的作用,不过这是在org.apache.hadoop.yarn.applications.distributedshell这个package中。两个Client虽然类名相同,但是它们在命名空间的路径不同,在两个不同的package中。我们只要注意一下源文件前面的package语句就可分辨。正如这个Client在命名空间的路径所示,这是一个distributedshell,即分布式的Shell。

应用层的Client是个独立的应用,这个类有main()函数,需要通过命令行启动,运行在一个独立的Java虚拟机上。在下面的叙述中,只要未作特别说明,凡是说到Client的都是指这个属于应用层的Client。

Client类的main()函数通过new创建Client对象,那就要执行其构造函数,这就开始了创建客户端RPC协议栈的历程。

      Client.Client()
      > YarnCl ient yarnCl ient=YarnClient.createYarnClient()
      >> new YarnCl ientImpl()
      >>> YarnClientImpl.serviceStart()
      >>>> rmClient=ClientRMProxy.createRMProxy(getConfig(), ApplicationClientProtocol.class)

作为分布式Shell的Client是整个系统的Client,但是同一个系统中可以同时有很多个Client。我们现在用作例子的是作业提交,即submitApplication(),那必须与YARN子系统,实际上是RM打交道。所以Hadoop代码中另有一个抽象类叫YarnClient,意为专门针对YARN的Client,这里通过其createYarnClient()加以创建。当然,真正创建的是扩充了这个抽象类的实体类,即YarnClientImpl。要创建YarnClient的不仅仅是作为分布式Shell的Client,其他相对独立的模块要跟RM打交道时也得创建自己的YarnClient。

RPC层Client的问题,核心在于Proxy,因为发送给服务端的RPC请求要靠Proxy传递。Proxy就像联络员,像服务端派驻在客户端的代理人,所以这里要创建Proxy。

注意,这里调用的ClientRMProxy.createRMProxy()只有两个参数。我们看一下这个函数,以及ClientRMProxy这个类:

      class ClientRMProxy<T> extends RMProxy<T> {}
      ]static ClientRMProxy INSTANCE=new ClientRMProxy()
      ]createRMProxy(Configuration configuration, Class<T> protocol)
        > return createRMProxy(configuration, protocol, INSTANCE)//添加了参数 INSTANCE
      ]…

ClientRMProxy这个类有个静态成分INSTANCE,这是个预先创建好的ClientRMProxy对象。而这个带两个参数的createRMProxy()则在后面添上INSTANCE为第三个参数,调用带三个参数的createRMProxy()。然而,我们在ClientRMProxy这个类的代码中找不到带这么三个参数的createRMProxy(),那就去它的父类RMProxy中寻找,里面确实有这么一个createRMProxy():

      [Client.Client()> YarnClient.createYarnClient()> YarnClientImpl.serviceStart()
      > Cl ientRMProxy.createRMProxy()> RMProxy.createRMProxy()]


      RMProxy.createRMProxy(Configuration configuration, Class<T> protocol, RMProxy instance)
                  //参数protocol 实际上是界面hadoop.yarn.api.ApplicationClientProtocol
      > RetryPol icy retryPol icy=createRetryPol icy(conf)
      > if(HAUtil.isHAEnabled(conf)){ //如果开通HA,就要考虑发送失败时的对策
      >+ provider=instance.createRMFailoverProxyProvider(conf, protocol)
          ==RMProxy.createRMFailoverProxyProvider(Configuration conf, Class<T> protocol)
      >+> defaultProviderClass=Class.forName(
                              YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER)
                                            //默认为ConfiguredRMFailoverProxyProvider
      >+> clazz=conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
                              defaultProviderClass, RMFailoverProxyProvider.class)
          //如果conf中没有关于“yarn.client.failover-proxy-provider”的设置,就采用默认
      >+> RMFailoverProxyProvider<T> provider=ReflectionUtils.newInstance(clazz, conf)
                                      //创建一个ConfiguredRMFailoverProxyProvider对象
      >+> provider.init(conf, (RMProxy<T>)this, protocol)
              //这个this就是前面的ClientRMProxy.INSTANCE,是个ClientRMProxy
      >+> return provider
      >+ return (T)RetryProxy.create(protocol, provider, retryPol icy)
                                //RMFailoverProxyProvider为基础创建高层的RetryProxy
          ==RetryProxy.create(Class<T> iface, FailoverProxyProvider<T> proxyProvider,
                                                            RetryPol icy retryPol icy)
      >+> loader=proxyProvider.getInterface().getClassLoader()
      >+> invoker=new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
      >+> return Proxy.newProxyInstance(loader, new Class<? >[]{iface }, invoker)
      > }else { //不开通HA,就相对简单一些
      >+ InetSocketAddress rmAddress=instance.getRMAddress(conf, protocol)
      >+ LOG.info("Connecting to ResourceManager at"+rmAddress)
      >+ T proxy=RMProxy.<T>getProxy(conf, protocol, rmAddress)
                                                    //先创建一个普通的proxy,详见后
      >+ return(T)RetryProxy.create(protocol, proxy, retryPolicy)//将此proxy包装成RetryProxy
          ==RetryProxy.create(Class<T> iface, T implementation, RetryPolicy retryPolicy)
      >+> failover=new DefaultFailoverProxyProvider<T>(iface, implementation)
                      //implementation==proxy,为其创建一个DefaultFailoverProxyProvider
      >+> RetryProxy.create(iface, failover, retryPol icy) //跟上面有HA时的调用方法相同
                          //DefaultFailoverProxyProvider为基础创建高层的RetryProxy
        ==RetryProxy.create(Class<T> iface, FailoverProxyProvider<T> proxyProvider,
                                                            RetryPol icy retryPol icy)
      >+>> loader=proxyProvider.getInterface().getClassLoader()
      >+>> invoker=new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
      >+>> return Proxy.newProxyInstance(loader, new Class<? >[]{iface }, invoker)
                  //创建一个面向ApplicationClientProtocol 界面的 java.lang.reflect.Proxy
      > }

这里分两种情况:一种是开通了HA,即高可用性(High Availability)的容错RPC机制;另一种是常规的RPC机制。不管开不开通HA机制,由RMProxy.createRMProxy()创建的都是一个RetryProxy对象,这个RetryProxy实质上就是一个Proxy,只不过是以一个FailoverProxyProvider(更确切地说是实现了此种界面的对象)为基础的Proxy,所以通信失败后可以“Retry”,即重试。

在Java语言的Reflection机制中,Proxy是可以嵌套的。比方说,我们可以为界面Interface创建一个proxy1,其InvocationHandler是handler1;然后以proxy1为基础,再为同一个界面Interface创建一个proxy2,其InvocationHandler是handler2。这相当于在proxy1的外面包上了一层proxy2。这样,运行时首先调用proxy2所提供的定义于界面Interface的某个函数,这就到了handler2.invoke();然后在handler2.invoke()中进行了某些处理之后就调用proxy1中的同名函数,于是就又到了handler1.invoke()。这样,依此类推,就可以为这个界面Interface建立起一个“协议栈”,其中处于外层的Proxy就相当于协议栈中的高层。

而RetryProxy,就相当于一个两层协议栈中的高层,其底层Proxy就是与前面Server端底层相对应的客户端底层。所以,在创建高层的RetryProxy之前,先要创建底层的常规Proxy。之所以需要有这么一个两层的协议栈,是为了在底层Proxy通信失败时可以由高层的RetryProxy处理重试。不过,在创建RetryProxy时不是直接以底层Proxy为基础,而是以一个FailoverProxyProvider(更确切地说是一个实现了FailoverProxyProvider界面的某类对象,例如DefaultFailoverProxyProvider)为基础,这个对象中包含了底层Proxy,或者换言之也可以说这是把底层的Proxy包装成了RetryProxy。

在开通或不开通HA机制的协议栈中,底层Proxy基本上是一样的,但是处理失败重发的机制却不一样。开通HA机制时的FailoverProxyProvider可以通过配置文件设定,如果未作设定就默认为ConfiguredRMFailoverProxyProvider,而在不开通HA时,则采用DefaultFailoverProxyProvider,这个FailoverProxyProvider处理出错重发但不支持Proxy倒换。

在不开通HA的情况下,用作底层Proxy的并不是前面的ClientRMProxy.INSTANCE,即ClientRMProxy,而是要通过RMProxy.getProxy()另外创建一个Proxy,那就是一个ApplicationClientProtocolPBClientImpl类对象:

      [Client.Client()> YarnClient.createYarnClient()> YarnClientImpl.serviceStart()
      > Cl ientRMProxy.createRMProxy()> RMProxy.createRMProxy()> RMProxy.getProxy()]


      RMProxy.getProxy(Configuration conf, Class<T> protocol, InetSocketAddress rmAddress)
      > return UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction<T>())
          ]run()
          > yarnRPC=YarnRPC.create(conf)
          >> clazzName=conf.get(YarnConfiguration.IPC_RPC_IMPL)//“yarn.ipc.rpc.class”
          >> if (clazzName==null)clazzName=YarnConfiguration.DEFAULT_IPC_RPC_IMPL
                                //默认为“org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC”
          >> return (YarnRPC)Class.forName(clazzName).newInstance()
          > return (T)yarnRPC.getProxy(protocol, rmAddress, conf)
            ==HadoopYarnProtoRPC.getProxy(protocol, rmAddress, conf)

创建Proxy,以及创建了Proxy以后通过其进行的RPC操作,涉及用户的身份和权限,所以创建Proxy的操作是放在doAs()方法中的一种授权行为,需要为此另建一个线程,在这个线程的run()函数中进行。本书后面将专门详述包括doAs()在内的安全机制。

与创建服务端的协议栈一样,这里也要先创建一个YarnRPC对象。不过YarnRPC是个抽象类,所以实际创建的对象是对YarnRPC的某种扩充。至于具体属于什么类则可以在配置块conf中加以设置,所以这里首先寻找对于“yarn.ipc.rpc.class”的设置,如果没有设置就采用默认的HadoopYarnProtoRPC。事实上,Hadoop的代码中扩充了YarnRPC的类也只有HadoopYarnProtoRPC这么一种。创建了HadoopYarnProtoRPC对象之后,就调用其getProxy()方法获取或创建具体的Proxy。

      [Client.Client()> YarnClient.createYarnClient()> YarnClientImpl.serviceStart()
      > Cl ientRMProxy.createRMProxy()> RMProxy.createRMProxy()> RMProxy.getProxy()
      > HadoopYarnProtoRPC.getProxy()]
      HadoopYarnProtoRPC.getProxy(protocol, rmAddress, conf)


      > LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol"+protocol)
      > factory=RpcFactoryProvider.getClientFactory(conf)
      >> clientFactoryClassName=conf.get(YarnConfiguration.IPC_CLIENT_FACTORY_CLASS,
                                    YarnConfiguration.DEFAULT_IPC_CLIENT_FACTORY_CLASS)
                                                      //默认为RpcCl ientFactoryPBImpl
      > return factory.getCl ient(protocol,1, addr, conf)
        ==RpcClientFactoryPBImpl.getClient(protocol,1, addr, conf)
      >> constructor=cache.get(protocol) //看看cache中是否已经有相应的构造方法
      >> if (constructor==null){ //如果还没有,就创建
      >>+ clazzName=getPBImplClassName(protocol) //生成针对该protocol Client类名
                            //在我们这个情景中是“ApplicationClientProtocolPBClientImpl”
      >>+ pbClazz=localConf.getClassByName(clazzName)
                                            //通过Reflection机制获取该类的Class对象
      >>+ constructor=pbClazz.getConstructor(Long.TYPE,
                            InetSocketAddress.class, Configuration.class)//获取该类的构造方法
      >>+ cache.putIfAbsent(protocol, constructor)         //将此构造方法加入对照表
      >> }
      >> retObject=constructor.newInstance(clientVersion, addr, conf)
                //构造目标对象,在我们这个情景中是ApplicationClientProtocolPBClientImpl
      >> return retObj ect  //返回该对象

粗略看一下这段摘要,就会产生一个疑问:说是getProxy(),可是实际上要获取或创建的好像是Client啊?其实Proxy和Client都是相对的,这里要创建的是RPC客户端协议栈。在客户端的应用层即真正的Client面前,这个协议栈的顶层代表着远地的服务端,是服务端的代理Proxy,这一层以下是看不见的,而在这个协议栈的底层看来,其顶层却代表着应用层,是Client,至于真正应用层的Client是根本看不见的。

跟创建服务端的RPC层Server一样,这里也要先通过RpcFactoryProvider创建一个RpcClientFactory,具体是什么RpcClientFactory也是在运行时根据配置动态确定的,如果没有加以配置则默认为RpcClientFactoryPBImpl,这显然是专为采用ProtoBuf而设计的。创建了RpcClientFactoryPBImpl之后就调用其getClient()以获取或创建RPC层的Client。

RpcClientFactoryPBImpl对象内部有个作为缓存cache的对照表,如果已经为某个Protocol即Interface创建过Proxy,在对照表中就可以查到其相应的构造方法,那就省事了。如果是第一次创建,则要为具体的Protocol,在我们这个情景中是ApplicationClientProtocol,生成相应的Client类名,在我们这个情景中就是ApplicationClientProtocolPBClientImpl。

生成了目标类名之后,运用Java的Reflection机制就可以获取这个类的构造函数,这里一方面把它放进作为cache的便查表,一方面就用这个构造函数来创建和构造ApplicationClientProtocolPBClientImpl对象。下面是ApplicationClientProtocolPBClientImpl这个类的摘要,我们主要关心两件事。

首先,这个类的对象内部有个成分Proxy,在ApplicationClientProtocolPBClientImpl这里代表着RPC的Server一方,这就是客户端RPC协议栈中的下一层次。这个Proxy应该是个实现了ApplicationClientProtocolPB界面的某类对象,而ApplicationClientProtocolPB实际上就是我们在前面看到过的ApplicationClientProtocolService.BlockingInterface,这是在Hadoop的源码文件ApplicationClientProtocolPB.java中定义的:

      interface ApplicationClientProtocolPB
                          extends ApplicationClientProtocolService.BlockingInterface{//空白}

这里说ApplicationClientProtocolPB是个扩充,可是实际扩充的内容却是空白,所以二者其实是等价的。至于ApplicationClientProtocolService.BlockingInterface的定义,则是protoc根据.proto文件编译生成的。

其次,这个Proxy是在ApplicationClientProtocolPBClientImpl的构造函数中创建的,我们必须顺着这个创建的过程才能知道这究竟是个什么对象。

      [Client.Client()> YarnClient.createYarnClient()> YarnClientImpl.serviceStart()> Cl ientRMProxy.createRMProxy()> RMProxy.createRMProxy()> RMProxy.getProxy()> HadoopYarnProtoRPC.getProxy()> RpcCl ientFactoryPBImpl.getCl ient()

> ApplicationClientProtocolPBClientImpl()]

class ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol{}]ApplicationClientProtocolPB proxy

]ApplicationClientProtocolPBClientImpl(long clientVersion,

InetSocketAddress addr, Configuration conf) //构造函数

> RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class, ProtobufRpcEngine.class)

//ApplicationClientProtocolPBProtobufRpcEngine绑定

> proxy=RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf)

由此可见,ApplicationClientProtocolPBClientImpl内部的Proxy是通过RPC.getProxy()创建的,所创建的是一个支持ApplicationClientProtocolPB界面的Proxy。注意这里通过RPC.setProtocolEngine把这个界面和ProtobufRpcEngine绑定在一起,这一点下面马上就要用到:

      [Client.Client()> YarnClient.createYarnClient()> YarnClientImpl.serviceStart()
      > Cl ientRMProxy.createRMProxy()> RMProxy.createRMProxy()> RMProxy.getProxy()
      > HadoopYarnProtoRPC.getProxy()> RpcCl ientFactoryPBImpl.getCl ient()
      > ApplicationClientProtocolPBClientImpl()> RPC.getProxy()]


      RPC.getProxy(Class<T> protocol, long clientVersion,
                                        InetSocketAddress addr, Configuration conf)
      > protoProxy=RPC.getProtocolProxy(protocol, clientVersion, addr, conf)
                              //三个参数,这里的protocol ApplicationClientProtocolPB
      >> sockFactory=NetUtils.getDefaultSocketFactory(conf)
                    //默认hadoop.net.StandardSocketFactory,用来创建使用 ip地址的 Socket
      >> getProtocolProxy(protocol, clientVersion, addr, conf, sockFactory) //增加了一个参数
      >>> protoEngine=getProtocolEngine(protocol, conf)
                      //已与ApplicationClientProtocolPB绑定的是ProtobufRpcEngine,见前
      >>> protoEngine.getProxy(protocol, clientVersion, …)
            ==ProtobufRpcEngine.getProxy(protocol, clientVersion, …)
      >>>> invoker=new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout,
                                            connectionRetryPolicy, fallbackToSimpleAuth)
                                                  //创建一个ProtobufRpcEngine.Invoker
      >>>> proxy=Proxy.newProxyInstance(protocol.getClassLoader(),
                                                      new Class[]{protocol}, invoker)
                          //这个Proxy java.lang.reflect.Proxy
      >>>> return new ProtocolProxy<T>(protocol, proxy, false)//proxy为参数之一创建
      >>>>> this.protocol=protocol
      >>>>> this.proxy=proxy
      > return protoProxy.getProxy() //所返回的就是刚创建的Proxy
      >> return ProtocolProxy.proxy

上面这段摘要让人感觉太复杂、太弯弯绕绕了,但这都是为了保持灵活性和通用性。

由于前面绑定了ProtobufRpcEngine,这里的Proxy是通过ProtobufRpcEngine.getProxy()创建的。所创建的是一个以ProtobufRpcEngine.Invoker为InvocationHandler、支持ApplicationClientProtocolPB界面即ApplicationClientProtocolService.BlockingInterface界面的ProtocolProxy对象,其核心是一个java.lang.reflect.Proxy对象。这样,凡是对Proxy所支持界面的函数调用都会被引导到ProtobufRpcEngine的Invoker.invoke()。

可是,在我们至此所见到的代码中,这个RPC协议栈仍未“落地”,还没有与IPC层挂上。这个问题是在ProtobufRpcEngine.Invoker的构造函数中解决的:

      class ProtobufRpcEngine implements RpcEngine {}
      ]Cl ientCache CLIENTS=new Cl ientCache()
      ]class Invoker implements RpcInvocationHandler {}
      ]]Cl ient cl ient   //注意,这是 IPC层的Cl ient
      ]]Invoker(Class<? > protocol, InetSocketAddress addr,
                                  UserGroupInformation ticket, Configuration conf, …)
                                        //构造方法,8个参数
          > connId=Client.ConnectionId.getConnectionId(addr, protocol, ticket,
                                        rpcTimeout, connectionRetryPolicy, conf)
          > this(protocol, connId, conf, factory)//转而调用下面这4个参数的构造方法
      ]]Invoker(Class<? > protocol, Client.ConnectionId connId,
                                        Configuration conf, SocketFactory factory)
          > this.remoteId=connId
          > this.client=CLIENTS.getClient(conf, factory, RpcResponseWrapper.class)
              ==Cl ientCache.getCl ient()
          >> Cl ient cl ient=cl ients.get(factory)
          >> if (client==null){
          >>+ client=new Client(valueClass, conf, factory) //创建 IPC层的Client对象
          >>+ cl ients.put(factory, cl ient)
          >> }
          >> return cl ient
          > this.protocolName=RPC.getProtocolName(protocol)
          > this.clientProtocolVersion=RPC.getProtocolVersion(protocol)

可见,上面在创建Invoker对象的时候在其构造函数中创建了Client对象,这就是IPC层的Client。我们可以看一下ProtobufRpcEngine.java这个文件前面的package语句,就可知道这是在org.apache.hadoop.ipc这个package中。这样,这个客户端的RPC协议栈就“落地”了,与IPC层挂上了钩。

回到前面RMProxy.createRMProxy()中的else部分,即未开通HA的那个分支,我们已经创建了那里的Proxy,下面就是在所创建的Proxy外面包上一层对于失败重发的处理,使这二者合在一起成为一个RetryProxy,而这个RetryProxy就成为RPC协议栈的上层ApplicationClientProtocolPBClientImpl所看到的Proxy。我们在前面看到,RetryProxy的InvocationHandler是RetryInvocationHandler,所支持的界面则是ApplicationClientProtocol,所以凡是对此界面上的函数调用,例如submitApplication()等等,都会被引导到这个RetryInvocationHandler的invoke():

      class RetryInvocationHandler<T> implements RpcInvocationHandler {}
      ]RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy)
                                                                  //构造方法
        > this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap())
        >> this.proxyProvider=proxyProvider
        >> this.defaultPolicy=defaultPolicy
        >> this.methodNameToPolicyMap=methodNameToPolicyMap
        >> this.currentProxy=proxyProvider.getProxy()
      ]invoke(Obj ect proxy, Method method, Obj ect[]args) //发起RPC调用
        > RetryPol icy pol icy=methodNameToPol icyMap.get(method.getName())
        > if (policy==null)policy=defaultPolicy
        > boolean isRpc=isRpcInvocation(currentProxy.proxy)
        > while (true){
        >+ if(isRpc)Client.setCallIdAndRetryCount(callId, retries)//设置是否应该重试的判定标准
        >+ try{
        >++ Obj ect ret=invokeMethod(method, args)
        >++ return ret //如果成功就返回,发生异常才有循环
        >+ }catch(Exception e){
        >++ RetryAction action=policy.shouldRetry(e, retries++,
                    invocationFailoverCount, isIdempotentOrAtMostOnce)//判断是否应该重试
        >++ if (action.action==RetryAction.RetryDecision.FAIL){
        >+++ throw e     //已经无可挽救,放弃
        >++ }else { //retry or failover
        >+++ if (action.delayMillis > 0)Thread.sleep(action.delayMillis) //等待一会儿
        >+++ if (action.action==RetryAction.RetryDecision.FAILOVER_AND_RETRY){
        >++++ if (invocationAttemptFailoverCount==proxyProviderFailoverCount){
        >+++++ proxyProvider.performFailover(currentProxy.proxy) //需要倒换就倒换
            //HA未开通时这是DefaultFailoverProxyProvider.performFailover(),是个空函数
        >+++++ proxyProviderFailoverCount++
        >++++ }else {
        >+++++ LOG.warn("A failover has occurred since the start of this method invocation attempt."
        >++++ }
        >++++ currentProxy=proxyProvider.getProxy() //换一个Proxy试试
        >++++ invocationFailoverCount++
        >+++ } //if (action.action==RetryAction.RetryDecision.FAILOVER_AND_RETRY)
        >++ } //end if-else中的重试
        >+ } //end catch
        > } //end while

在invoke()函数中,真正把RPC请求发送出去要靠invokeMethod(),这里把它放在一个while循环中。如果发送成功就返回;但是倘若在发送过程中因失败而发生异常,则因为这个while循环而可以反复进行努力。是否继续努力的决定则取决于RetryPolicies。如果决定继续努力,则有两条路可走:一条是重试,即retry;另一条是Failover,即倒换,详见本书后面对于容错的介绍。

未开通HA时的proxyProvider是DefaultFailoverProxyProvider,它的performFailover()是个空函数,所以只有在开通了HA的情况下这个函数才有意义。

至于RMProxy.createRMProxy()中开通了HA的那个分支,则限于篇幅不再深入下去,留给读者自己进一步阅读分析了。

至此,服务和客户两边的RPC协议栈都搭建好了,下面我们就以作业提交操作为例看一下具体RPC的流程。当用户在Hadoop上启动执行一个App作业的时候,需要把作业提交给RM节点。用户的App可以在集群中的任何一个节点上,是另起一个Java虚拟机、作为一个独立的JVM进程运行的。另外,和我们一般所理解的“运行”不同,这个进程所做的其实只是把作业提交给RM,然后过一会儿就询问一下作业的进展,有消息就在屏幕上显示,直到作业完成。真正的计算并非在本地进行,而是分布在集群中的许多节点上。读者在下一章中将看到,App通过一个YARNRunner对象的submitJob()方法提交作业:

      [JobSubmitter.submitJobInternal()> YARNRunner.submitJob()]


      class YARNRunner implements ClientProtocol {}
      ]ResourceMgrDelegate resMgrDelegate
      ]submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
        > appContext=createApplicationSubmissionContext(conf, jobSubmitDir, ts)
        > applicationId=resMgrDelegate.submitApplication(appContext)
          ==ResourceMgrDelegate.submitApplication(appContext)
        >> client.submitApplication(appContext)//clientYarnClient.createYarnClient()创建
          ==YarnClientImpl.submitApplication(appContext)

读者在下一章中将看到,YARNRunner.submitJob()可以说是流程中的一个关键节点,实际上可以看作是作业提交流程的起点。从这个节点开始,流程的第一站是对于ResourceMgrDelegate.submitApplication()的调用,YARNRunner中的resMgrDelegate是个ResourceMgrDelegate对象,这是在YARNRunner的构造方法中创建的。然后, ResourceMgrDelegate中client的类型则是YarnClient,是通过YarnClient.createYarnClient()创建的,实际的类型是YarnClientImpl。YarnClientImpl是对YarnClient的扩充,对于客户端对服务端的通信起着桥头堡似的作用,YARN子系统中所有NM对RM的请求都是经过这个类型的对象出去的。而YarnClientImpl.submitApplication()则是作业提交流程中的又一个关键节点:

      [YARNRunner.submitJob()> YarnClientImpl.submitApplication()]


      YarnClientImpl.submitApplication(ApplicationSubmissionContext appContext)
      >> applicationId=appContext.getApplicationId()
      >> request=Records.newRecord(SubmitApplicationRequest.class)
      >> request.setApplicationSubmissionContext(appContext)
      >> rmClient.submitApplication(request)  //交给rmCl ient完成
      >> while (true){
      >>+ try{
      >>++ appReport=getApplicationReport(applicationId)
      >>++> request=Records.newRecord(GetApplicationReportRequest.class)
      >>++> request.setApplicationId(appId)
      >>++> response=rmClient.getApplicationReport(request)  //交给rmClient完成
              ==ApplicationClientProtocolPBClientImpl.submitApplication(request)
      >>++ state=appReport.getYarnApplicationState()
      >>++ if (! state.equals(YarnApplicationState.NEW)
                              &&! state.equals(YarnApplicationState.NEW_SAVING)){
      >>+++ LOG.info("Submitted application"+applicationId)
      >>+++ break
      >>++ }
      >>++ Thread.sleep(submitPollIntervalMillis)
      >>+ }catch(ApplicationNotFoundException ex){
      >>++//FailOver or RMrestart happens before RMStateStore saves ApplicationState
      >>++ LOG.info("Re-submit application"+applicationId+"with the"+
                                  "same ApplicationSubmissionContext")
      >>++ rmClient.submitApplication(request)
      >>+ }
      >> }

这里的rmClient实际上是个ApplicationClientProtocolPBClientImpl,这个类实现了ApplicationClientProtocol界面,所以在YarnClientImpl的代码中对rmClient的类型说明是ApplicationClientProtocol。这个类是由protobuf提供的,其代码来自工具protoc对proto文件的编译:

      [YARNRunner.submitJob()> YarnClientImpl.submitApplication()
      > ApplicationClientProtocolPBClientImpl.submitApplication()]


      ApplicationClientProtocolPBClientImpl.submitApplication(SubmitApplicationRequest request)
      > requestProto=((SubmitApplicationRequestPBImpl)request).getProto()
      > proto=proxy.submitApplication(null, requestProto)
      > return new SubmitApplicationResponsePBImpl(proto)

这里的proxy,我们回顾一下,是前面在创建ApplicationClientProtocolPBClientImpl对象时在其构造函数中调用RPC.getProxy()创建的一个ProtocolProxy类对象。调用时的第一个参数是ApplicationClientProtocolPB.class,这是一个界面,说是扩充,实际上就等价于ApplicationClientProtocolService.BlockingInterface。而ProtocolProxy类的核心则是由Java语言提供的java.lang.reflect.Proxy。

所以,简而言之,这个proxy是一个以ProtobufRpcEngine.Invoker为InvocationHandler,实现了ApplicationClientProtocolService.BlockingInterface界面的java.lang.reflect.Proxy对象。这样,凡是对定义于这个界面上的函数调用都被引导到ProtobufRpcEngine.Invoker.invoke(),而submitApplication()正是这个界面上定义的函数之一。

      [YARNRunner.submitJob()> YarnClientImpl.submitApplication()
      > ApplicationClientProtocolPBClientImpl.submitApplication()
      => ProtobufRpcEngine.Invoker.invoke()]


      ProtobufRpcEngine.Invoker.invoke(Obj ect proxy, Method method, Obj ect[]args)
      > RequestHeaderProto rpcRequestHeader=constructRpcRequestHeader(method)
      > theRequest=(Message)args[1]
      > wrapper=new RpcRequestWrapper(rpcRequestHeader, theRequest)
      > val=(RpcResponseWrapper)client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
                                              wrapper, remoteId, fallbackToSimpleAuth)
            ==Client.call(RPC.RpcKind rpcKind, Writable rpcRequest,
                              ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
      >> call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
                                                                fallbackToSimpleAuth)
      >>> Call call=createCall(rpcKind, rpcRequest)
      >>> Connection connection=getConnection(remoteId, call, serviceClass,
                                                                fallbackToSimpleAuth)
      >>> connection.sendRpcRequest(call)
      >>> while (! call.done)call.wait()
      >>> return call.getRpcResponse()
      >>>> return rpcResponse   //see setRpcResponse(Writable rpcResponse)
      > prototype=getReturnProtoType(method)
      > returnMessage=prototype.newBuilderForType().mergeFrom(val.theResponseRead).build()
      > return returnMessage

到了IPC这一层,就无须多说了。

这个RPC请求转化为一个ApplicationClientProtocol协议的报文,被发送到RM节点。

客户端的RPC报文到达服务端后,由Server.Handler线程加以处理。从系统结构的角度看,Handler线程处于接收端“协议栈(protocol stack)”的底部,是最靠近硬件设备的;但是从服务端的函数调用路径看,它却又是处于顶部,服务端因此而引起的一系列操作都是在Handler线程的上下文中由其直接或间接的调用而发生的。所以,我们就从Handler的run()函数看起。

      Server.Handler.run()
      > while (running){
      >+ call=callQueue.take(); //pop the queue; maybe blocked here
      >+ value=call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp)
          ==ProtobufRpcEngine.call(call.rpcKind,
                          call.connection.protocolName, call.rpcRequest, call.timestamp)
      >+> RpcRequestWrapper request=(RpcRequestWrapper)writableRequest
      >+> RequestHeaderProto rpcRequest=request.requestHeader
      >+> String methodName=rpcRequest.getMethodName() //从报文中获取方法名称
      >+> String protoName=rpcRequest.getDeclaringClassProtocolName() //获取协议名称
      >+> ProtoClassProtoImpl protocolImpl=getProtocolImpl(server, protoName, clientVersion)
                                            //根据协议名称找到实现该协议的Proto对象
      >+> BlockingService service=(BlockingService)protocolImpl.protocolImpl
                      //这就是当初在newReflectiveBlockingService()里创建的那个对象
                      //在那里的代码中动态定义了一个实现BlockingService界面的类
      >+>MethodDescriptor methodDescriptor=
                            service.getDescriptorForType().findMethodByName(methodName)
                                        //根据方法名称找到这个方法的MethodDescriptor
      >+> prototype=service.getRequestPrototype(methodDescriptor)
      >+> param=prototype.newBuilderForType().mergeFrom(request.theRequestRead).build()
      >+> result=service.callBlockingMethod(methodDescriptor, null, param)//调用这个方法
            ==ApplicationClientProtocol.BlockingService.callBlockingMethod(
                                                        methodDescriptor, null, param)
      >+>> switch(method.getIndex()){
      >+>> case 2:
      >+>>+ return impl.submitApplication(controller, (SubmitApplicationRequestProto)request)
                              //impl 是调用newReflectiveBlockingService()时的参数
                              //就是RM节点上提供对Client服务的ClientRMService对象
      >+>>+> ApplicationClientProtocolPBServiceImpl.submitApplication(RpcController arg0,
                                SubmitApplicationRequestProto proto)
      >+>>+>> request=new SubmitApplicationRequestPBImpl(proto)
      >+>>+>> response=real.submitApplication(request)//这就是此次RPC真正的目标函数
                    ==ClientRMService.submitApplication(SubmitApplicationRequest request)
      >+>> }
      >+ setupResponse(buf, call, returnStatus, detailedErr, value, errorClass, error)
      >+ responder.doRespond(call)
      > }

结合注释和前文所述,这里已经很清楚,就不用多说了。

上面讲的是YARN子系统中提供对App即Client服务的RPC机制,其protocol是ApplicationClientProtocol。但那只是众多服务协议中的一种,其他的还有MRClientProtocol、ContainerManagementProtocol、DatanodeProtocol、InterDatanodeProtocol、NamenodeProtocol、ClientDatanodeProtocol、ClientNamenodeProtocol等。但是,不管在应用层上是什么protocol,它们的底层都是一样的,都是建立在Protobuf和RPC的基础上,并且都是采用Java的reflection机制实现的。

HDFS子系统也是一样,也要在服务端和客户端建立起RPC通信的“协议栈”,也是那样的RPC.Server和Client,只不过所用的protocol当然不是ApplicationClientProtocol,而是用于HDFS的protocol了,但是底层的机制还是一样。

以NameNode上的NameNodeRpcServer为例,那是NameNode在初始化的过程中创建的,这个类的数据结构部分的摘要如下:

      class NameNodeRpcServer implements NamenodeProtocols {}
      ]FSNamesystem namesystem
      ]NameNode nn
      ]RPC.Server serviceRpcServer
      ]InetSocketAddress serviceRPCAddress
      ]RPC.Server cl ientRpcServer
      ]InetSocketAddress clientRpcAddress

NameNode在其初始化阶段创建了NameNodeRpcServer对象,其构造函数的摘要为:

      [NameNode.initialize()> createRpcServer()> NameNodeRpcServer.NameNodeRpcServer()]


      NameNodeRpcServer.NameNodeRpcServer(Configuration conf, NameNode nn)
      > int handlerCount=conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, …)
      > RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class)
      > clientProtocolServerTranslator=
                                    new ClientNamenodeProtocolServerSideTranslatorPB(this)
      > clientNNPbService=ClientNamenodeProtocol.newReflectiveBlockingService(
                                                          clientProtocolServerTranslator)
      > dnProtoPbTranslator=new DatanodeProtocolServerSideTranslatorPB(this)
      > dnProtoPbService=DatanodeProtocolService.newReflectiveBlockingService(
                                                                dnProtoPbTranslator)
      > namenodeProtocolXlator=new NamenodeProtocolServerSideTranslatorPB(this)
      > NNPbService=NamenodeProtocolService.newReflectiveBlockingService(
                                                              namenodeProtocolXlator)
      > refreshAuthPol icyXlator=
                      new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this)
      > refreshAuthService=RefreshAuthorizationPolicyProtocolService
                                  .newReflectiveBlockingService(refreshAuthPolicyXlator)
      > refreshUserMappingXlator=
                            new RefreshUserMappingsProtocolServerSideTranslatorPB(this)
      > refreshUserMappingService=RefreshUserMappingsProtocolService.
                                  newReflectiveBlockingService(refreshUserMappingXlator)
      > refreshCallQueueXlator=new RefreshCallQueueProtocolServerSideTranslatorPB(this)
      > refreshCallQueueService=RefreshCallQueueProtocolService.
                                  newReflectiveBlockingService(refreshCallQueueXlator)
      > genericRefreshXlator=new GenericRefreshProtocolServerSideTranslatorPB(this)
      > genericRefreshService=GenericRefreshProtocolService.newReflectiveBlockingService(
                                                                genericRefreshXlator)
      > getUserMappingXlator=new GetUserMappingsProtocolServerSideTranslatorPB(this)
      > getUserMappingService=GetUserMappingsProtocolService.newReflectiveBlockingService(
                                                                getUserMappingXlator)
      > haServiceProtocolXlator=new HAServiceProtocolServerSideTranslatorPB(this)
      > haPbService=HAServiceProtocolService.newReflectiveBlockingService(
                                                            haServiceProtocolXlator)
      > traceAdminXlator=new TraceAdminProtocolServerSideTranslatorPB(this)
      > traceAdminService=TraceAdminService.newReflectiveBlockingService(
                                                                  traceAdminXlator)
      >WritableRpcEngine.ensureInitialized()
      > InetSocketAddress serviceRpcAddr=nn.getServiceRpcServerAddress(conf)
      > if (serviceRpcAddr! =null){
      >+ String bindHost=nn.getServiceRpcServerBindHost(conf)
      >+ if (bindHost==null)bindHost=serviceRpcAddr.getHostName()
      >+ serviceHandlerCount=conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, …)
      >+ this.serviceRpcServer=new RPC.Builder(conf)
                                .setProtocol(…ClientNamenodeProtocolPB.class)….build()
      >+//Add all the RPC protocols that the namenode implements
      >+ DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class,
                                                        haPbService, serviceRpcServer)
      >+> RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class)
      >+> server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service)
            ==RPC.Server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service)
      >+>> registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl)
      >+>>> String protocolName=RPC.getProtocolName(protocolClass)
      >+>>> long version=RPC.getProtocolVersion(protocolClass)
      >+>>> pn=new ProtoNameVer(protocolName, version)
      >+>>> pi=new ProtoClassProtoImpl(protocolClass, protocolImpl))
      >+>>> pmap=getProtocolImplMap(rpcKind)
      >+>>> pmap.put(pn, pi)
      >+ DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class,
                                                        NNPbService, serviceRpcServer)
      >+ DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class,
                                                    dnProtoPbService, serviceRpcServer)
      >+ …
      >+//We support Refreshing call queue here in case the client RPC queue is full
      >+ DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
                                            refreshCallQueueService, serviceRpcServer)
      >+ DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
                                              genericRefreshService, serviceRpcServer)
      >+ …
      >+//Update the address with the correct port
      >+ InetSocketAddress listenAddr=serviceRpcServer.getListenerAddress()
      >+ serviceRPCAddress=new InetSocketAddress(serviceRpcAddr.getHostName(),
                                                              listenAddr.getPort())
      >+ nn.setRpcServiceServerAddress(conf, serviceRPCAddress)
      > }else {
      >+ serviceRpcServer=null
      >+ serviceRPCAddress=null
      > }
      >
      > InetSocketAddress rpcAddr=nn.getRpcServerAddress(conf)
      > String bindHost=nn.getRpcServerBindHost(conf)
      > if (bindHost==null)bindHost=rpcAddr.getHostName()
      > LOG.info("RPC server is binding to"+bindHost+":"+rpcAddr.getPort())
      > this.clientRpcServer=new RPC.Builder(conf)
                                .setProtocol(…ClientNamenodeProtocolPB.class)…build()
      >//Add all the RPC protocols that the namenode implements
      > DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class,
                                          haPbService, clientRpcServer)
      > DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class,
                                          NNPbService, clientRpcServer)
      > DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class,
                                          dnProtoPbService, clientRpcServer)
      > …
      > if (serviceAuthEnabled=conf.getBoolean(…HADOOP_SECURITY_AUTHORIZATION, false)){
      >+ clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider())
      >+ if (serviceRpcServer! =null){
      >++ serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider())
      >+ }
      > }
      >//The rpc-server port can be ephemeral… ensure we have the correct info
      > InetSocketAddress listenAddr=clientRpcServer.getListenerAddress()
      > clientRpcAddress=new InetSocketAddress(rpcAddr.getHostName(), listenAddr.getPort())
      > nn.setRpcServerAddress(conf, clientRpcAddress)
      > minimumDataNodeVersion=conf.get(
                                  …DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY, …)
      >//Set terse exception whose stack trace won't be logged
      > this.clientRpcServer.addTerseExceptions(SafeModeException.class,
                                                      FileNotFoundException.class, …)

看似很大一块,那只是NameNodeRpcServer需要支持的协议比较多而已。这就留给读者自己慢慢看了。