4.5 RPC客户端的创建
再看客户端这一边,我们从应用层的Client这个类开始。
我们知道,在Hadoop的IPC层上有个名为Client的类,那是在org.apache.hadoop.ipc这个package中。注意在应用层也有个叫Client的类,这个类起着相当于Shell的作用,不过这是在org.apache.hadoop.yarn.applications.distributedshell这个package中。两个Client虽然类名相同,但是它们在命名空间的路径不同,在两个不同的package中。我们只要注意一下源文件前面的package语句就可分辨。正如这个Client在命名空间的路径所示,这是一个distributedshell,即分布式的Shell。
应用层的Client是个独立的应用,这个类有main()函数,需要通过命令行启动,运行在一个独立的Java虚拟机上。在下面的叙述中,只要未作特别说明,凡是说到Client的都是指这个属于应用层的Client。
Client类的main()函数通过new创建Client对象,那就要执行其构造函数,这就开始了创建客户端RPC协议栈的历程。
Client.Client() > YarnCl ient yarnCl ient=YarnClient.createYarnClient() >> new YarnCl ientImpl() >>> YarnClientImpl.serviceStart() >>>> rmClient=ClientRMProxy.createRMProxy(getConfig(), ApplicationClientProtocol.class)
作为分布式Shell的Client是整个系统的Client,但是同一个系统中可以同时有很多个Client。我们现在用作例子的是作业提交,即submitApplication(),那必须与YARN子系统,实际上是RM打交道。所以Hadoop代码中另有一个抽象类叫YarnClient,意为专门针对YARN的Client,这里通过其createYarnClient()加以创建。当然,真正创建的是扩充了这个抽象类的实体类,即YarnClientImpl。要创建YarnClient的不仅仅是作为分布式Shell的Client,其他相对独立的模块要跟RM打交道时也得创建自己的YarnClient。
RPC层Client的问题,核心在于Proxy,因为发送给服务端的RPC请求要靠Proxy传递。Proxy就像联络员,像服务端派驻在客户端的代理人,所以这里要创建Proxy。
注意,这里调用的ClientRMProxy.createRMProxy()只有两个参数。我们看一下这个函数,以及ClientRMProxy这个类:
class ClientRMProxy<T> extends RMProxy<T> {} ]static ClientRMProxy INSTANCE=new ClientRMProxy() ]createRMProxy(Configuration configuration, Class<T> protocol) > return createRMProxy(configuration, protocol, INSTANCE)//添加了参数 INSTANCE
]…
ClientRMProxy这个类有个静态成分INSTANCE,这是个预先创建好的ClientRMProxy对象。而这个带两个参数的createRMProxy()则在后面添上INSTANCE为第三个参数,调用带三个参数的createRMProxy()。然而,我们在ClientRMProxy这个类的代码中找不到带这么三个参数的createRMProxy(),那就去它的父类RMProxy中寻找,里面确实有这么一个createRMProxy():
[Client.Client()> YarnClient.createYarnClient()> YarnClientImpl.serviceStart() > Cl ientRMProxy.createRMProxy()> RMProxy.createRMProxy()] RMProxy.createRMProxy(Configuration configuration, Class<T> protocol, RMProxy instance) //参数protocol 实际上是界面hadoop.yarn.api.ApplicationClientProtocol > RetryPol icy retryPol icy=createRetryPol icy(conf) > if(HAUtil.isHAEnabled(conf)){ //如果开通HA,就要考虑发送失败时的对策 >+ provider=instance.createRMFailoverProxyProvider(conf, protocol) ==RMProxy.createRMFailoverProxyProvider(Configuration conf, Class<T> protocol) >+> defaultProviderClass=Class.forName( YarnConfiguration.DEFAULT_CLIENT_FAILOVER_PROXY_PROVIDER) //默认为ConfiguredRMFailoverProxyProvider >+> clazz=conf.getClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER, defaultProviderClass, RMFailoverProxyProvider.class) //如果conf中没有关于“yarn.client.failover-proxy-provider”的设置,就采用默认 >+> RMFailoverProxyProvider<T> provider=ReflectionUtils.newInstance(clazz, conf) //创建一个ConfiguredRMFailoverProxyProvider对象 >+> provider.init(conf, (RMProxy<T>)this, protocol) //这个this就是前面的ClientRMProxy.INSTANCE,是个ClientRMProxy >+> return provider >+ return (T)RetryProxy.create(protocol, provider, retryPol icy) //以RMFailoverProxyProvider为基础创建高层的RetryProxy ==RetryProxy.create(Class<T> iface, FailoverProxyProvider<T> proxyProvider, RetryPol icy retryPol icy) >+> loader=proxyProvider.getInterface().getClassLoader() >+> invoker=new RetryInvocationHandler<T>(proxyProvider, retryPolicy) >+> return Proxy.newProxyInstance(loader, new Class<? >[]{iface }, invoker) > }else { //不开通HA,就相对简单一些 >+ InetSocketAddress rmAddress=instance.getRMAddress(conf, protocol) >+ LOG.info("Connecting to ResourceManager at"+rmAddress) >+ T proxy=RMProxy.<T>getProxy(conf, protocol, rmAddress) //先创建一个普通的proxy,详见后 >+ return(T)RetryProxy.create(protocol, proxy, retryPolicy)//将此proxy包装成RetryProxy
==RetryProxy.create(Class<T> iface, T implementation, RetryPolicy retryPolicy) >+> failover=new DefaultFailoverProxyProvider<T>(iface, implementation) //implementation==proxy,为其创建一个DefaultFailoverProxyProvider >+> RetryProxy.create(iface, failover, retryPol icy) //跟上面有HA时的调用方法相同 //以DefaultFailoverProxyProvider为基础创建高层的RetryProxy ==RetryProxy.create(Class<T> iface, FailoverProxyProvider<T> proxyProvider, RetryPol icy retryPol icy) >+>> loader=proxyProvider.getInterface().getClassLoader() >+>> invoker=new RetryInvocationHandler<T>(proxyProvider, retryPolicy) >+>> return Proxy.newProxyInstance(loader, new Class<? >[]{iface }, invoker) //创建一个面向ApplicationClientProtocol 界面的 java.lang.reflect.Proxy > }
这里分两种情况:一种是开通了HA,即高可用性(High Availability)的容错RPC机制;另一种是常规的RPC机制。不管开不开通HA机制,由RMProxy.createRMProxy()创建的都是一个RetryProxy对象,这个RetryProxy实质上就是一个Proxy,只不过是以一个FailoverProxyProvider(更确切地说是实现了此种界面的对象)为基础的Proxy,所以通信失败后可以“Retry”,即重试。
在Java语言的Reflection机制中,Proxy是可以嵌套的。比方说,我们可以为界面Interface创建一个proxy1,其InvocationHandler是handler1;然后以proxy1为基础,再为同一个界面Interface创建一个proxy2,其InvocationHandler是handler2。这相当于在proxy1的外面包上了一层proxy2。这样,运行时首先调用proxy2所提供的定义于界面Interface的某个函数,这就到了handler2.invoke();然后在handler2.invoke()中进行了某些处理之后就调用proxy1中的同名函数,于是就又到了handler1.invoke()。这样,依此类推,就可以为这个界面Interface建立起一个“协议栈”,其中处于外层的Proxy就相当于协议栈中的高层。
而RetryProxy,就相当于一个两层协议栈中的高层,其底层Proxy就是与前面Server端底层相对应的客户端底层。所以,在创建高层的RetryProxy之前,先要创建底层的常规Proxy。之所以需要有这么一个两层的协议栈,是为了在底层Proxy通信失败时可以由高层的RetryProxy处理重试。不过,在创建RetryProxy时不是直接以底层Proxy为基础,而是以一个FailoverProxyProvider(更确切地说是一个实现了FailoverProxyProvider界面的某类对象,例如DefaultFailoverProxyProvider)为基础,这个对象中包含了底层Proxy,或者换言之也可以说这是把底层的Proxy包装成了RetryProxy。
在开通或不开通HA机制的协议栈中,底层Proxy基本上是一样的,但是处理失败重发的机制却不一样。开通HA机制时的FailoverProxyProvider可以通过配置文件设定,如果未作设定就默认为ConfiguredRMFailoverProxyProvider,而在不开通HA时,则采用DefaultFailoverProxyProvider,这个FailoverProxyProvider处理出错重发但不支持Proxy倒换。
在不开通HA的情况下,用作底层Proxy的并不是前面的ClientRMProxy.INSTANCE,即ClientRMProxy,而是要通过RMProxy.getProxy()另外创建一个Proxy,那就是一个ApplicationClientProtocolPBClientImpl类对象:
[Client.Client()> YarnClient.createYarnClient()> YarnClientImpl.serviceStart() > Cl ientRMProxy.createRMProxy()> RMProxy.createRMProxy()> RMProxy.getProxy()] RMProxy.getProxy(Configuration conf, Class<T> protocol, InetSocketAddress rmAddress) > return UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction<T>()) ]run() > yarnRPC=YarnRPC.create(conf) >> clazzName=conf.get(YarnConfiguration.IPC_RPC_IMPL)//“yarn.ipc.rpc.class” >> if (clazzName==null)clazzName=YarnConfiguration.DEFAULT_IPC_RPC_IMPL //默认为“org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC” >> return (YarnRPC)Class.forName(clazzName).newInstance() > return (T)yarnRPC.getProxy(protocol, rmAddress, conf) ==HadoopYarnProtoRPC.getProxy(protocol, rmAddress, conf)
创建Proxy,以及创建了Proxy以后通过其进行的RPC操作,涉及用户的身份和权限,所以创建Proxy的操作是放在doAs()方法中的一种授权行为,需要为此另建一个线程,在这个线程的run()函数中进行。本书后面将专门详述包括doAs()在内的安全机制。
与创建服务端的协议栈一样,这里也要先创建一个YarnRPC对象。不过YarnRPC是个抽象类,所以实际创建的对象是对YarnRPC的某种扩充。至于具体属于什么类则可以在配置块conf中加以设置,所以这里首先寻找对于“yarn.ipc.rpc.class”的设置,如果没有设置就采用默认的HadoopYarnProtoRPC。事实上,Hadoop的代码中扩充了YarnRPC的类也只有HadoopYarnProtoRPC这么一种。创建了HadoopYarnProtoRPC对象之后,就调用其getProxy()方法获取或创建具体的Proxy。
[Client.Client()> YarnClient.createYarnClient()> YarnClientImpl.serviceStart() > Cl ientRMProxy.createRMProxy()> RMProxy.createRMProxy()> RMProxy.getProxy() > HadoopYarnProtoRPC.getProxy()] HadoopYarnProtoRPC.getProxy(protocol, rmAddress, conf) > LOG.debug("Creating a HadoopYarnProtoRpc proxy for protocol"+protocol) > factory=RpcFactoryProvider.getClientFactory(conf) >> clientFactoryClassName=conf.get(YarnConfiguration.IPC_CLIENT_FACTORY_CLASS, YarnConfiguration.DEFAULT_IPC_CLIENT_FACTORY_CLASS) //默认为RpcCl ientFactoryPBImpl > return factory.getCl ient(protocol,1, addr, conf) ==RpcClientFactoryPBImpl.getClient(protocol,1, addr, conf) >> constructor=cache.get(protocol) //看看cache中是否已经有相应的构造方法 >> if (constructor==null){ //如果还没有,就创建 >>+ clazzName=getPBImplClassName(protocol) //生成针对该protocol 的Client类名 //在我们这个情景中是“ApplicationClientProtocolPBClientImpl” >>+ pbClazz=localConf.getClassByName(clazzName)
//通过Reflection机制获取该类的Class对象 >>+ constructor=pbClazz.getConstructor(Long.TYPE, InetSocketAddress.class, Configuration.class)//获取该类的构造方法 >>+ cache.putIfAbsent(protocol, constructor) //将此构造方法加入对照表 >> } >> retObject=constructor.newInstance(clientVersion, addr, conf) //构造目标对象,在我们这个情景中是ApplicationClientProtocolPBClientImpl >> return retObj ect //返回该对象
粗略看一下这段摘要,就会产生一个疑问:说是getProxy(),可是实际上要获取或创建的好像是Client啊?其实Proxy和Client都是相对的,这里要创建的是RPC客户端协议栈。在客户端的应用层即真正的Client面前,这个协议栈的顶层代表着远地的服务端,是服务端的代理Proxy,这一层以下是看不见的,而在这个协议栈的底层看来,其顶层却代表着应用层,是Client,至于真正应用层的Client是根本看不见的。
跟创建服务端的RPC层Server一样,这里也要先通过RpcFactoryProvider创建一个RpcClientFactory,具体是什么RpcClientFactory也是在运行时根据配置动态确定的,如果没有加以配置则默认为RpcClientFactoryPBImpl,这显然是专为采用ProtoBuf而设计的。创建了RpcClientFactoryPBImpl之后就调用其getClient()以获取或创建RPC层的Client。
RpcClientFactoryPBImpl对象内部有个作为缓存cache的对照表,如果已经为某个Protocol即Interface创建过Proxy,在对照表中就可以查到其相应的构造方法,那就省事了。如果是第一次创建,则要为具体的Protocol,在我们这个情景中是ApplicationClientProtocol,生成相应的Client类名,在我们这个情景中就是ApplicationClientProtocolPBClientImpl。
生成了目标类名之后,运用Java的Reflection机制就可以获取这个类的构造函数,这里一方面把它放进作为cache的便查表,一方面就用这个构造函数来创建和构造ApplicationClientProtocolPBClientImpl对象。下面是ApplicationClientProtocolPBClientImpl这个类的摘要,我们主要关心两件事。
首先,这个类的对象内部有个成分Proxy,在ApplicationClientProtocolPBClientImpl这里代表着RPC的Server一方,这就是客户端RPC协议栈中的下一层次。这个Proxy应该是个实现了ApplicationClientProtocolPB界面的某类对象,而ApplicationClientProtocolPB实际上就是我们在前面看到过的ApplicationClientProtocolService.BlockingInterface,这是在Hadoop的源码文件ApplicationClientProtocolPB.java中定义的:
interface ApplicationClientProtocolPB
extends ApplicationClientProtocolService.BlockingInterface{//空白}
这里说ApplicationClientProtocolPB是个扩充,可是实际扩充的内容却是空白,所以二者其实是等价的。至于ApplicationClientProtocolService.BlockingInterface的定义,则是protoc根据.proto文件编译生成的。
其次,这个Proxy是在ApplicationClientProtocolPBClientImpl的构造函数中创建的,我们必须顺着这个创建的过程才能知道这究竟是个什么对象。
[Client.Client()> YarnClient.createYarnClient()> YarnClientImpl.serviceStart()> Cl ientRMProxy.createRMProxy()> RMProxy.createRMProxy()> RMProxy.getProxy()> HadoopYarnProtoRPC.getProxy()> RpcCl ientFactoryPBImpl.getCl ient()
> ApplicationClientProtocolPBClientImpl()]
class ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol{}]ApplicationClientProtocolPB proxy
]ApplicationClientProtocolPBClientImpl(long clientVersion,
InetSocketAddress addr, Configuration conf) //构造函数
> RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class, ProtobufRpcEngine.class)
//将ApplicationClientProtocolPB与ProtobufRpcEngine绑定
> proxy=RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf)
由此可见,ApplicationClientProtocolPBClientImpl内部的Proxy是通过RPC.getProxy()创建的,所创建的是一个支持ApplicationClientProtocolPB界面的Proxy。注意这里通过RPC.setProtocolEngine把这个界面和ProtobufRpcEngine绑定在一起,这一点下面马上就要用到:
[Client.Client()> YarnClient.createYarnClient()> YarnClientImpl.serviceStart() > Cl ientRMProxy.createRMProxy()> RMProxy.createRMProxy()> RMProxy.getProxy() > HadoopYarnProtoRPC.getProxy()> RpcCl ientFactoryPBImpl.getCl ient() > ApplicationClientProtocolPBClientImpl()> RPC.getProxy()] RPC.getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, Configuration conf) > protoProxy=RPC.getProtocolProxy(protocol, clientVersion, addr, conf) //三个参数,这里的protocol 是ApplicationClientProtocolPB >> sockFactory=NetUtils.getDefaultSocketFactory(conf) //默认hadoop.net.StandardSocketFactory,用来创建使用 ip地址的 Socket >> getProtocolProxy(protocol, clientVersion, addr, conf, sockFactory) //增加了一个参数 >>> protoEngine=getProtocolEngine(protocol, conf) //已与ApplicationClientProtocolPB绑定的是ProtobufRpcEngine,见前 >>> protoEngine.getProxy(protocol, clientVersion, …) ==ProtobufRpcEngine.getProxy(protocol, clientVersion, …) >>>> invoker=new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth) //创建一个ProtobufRpcEngine.Invoker >>>> proxy=Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol}, invoker) //这个Proxy是 java.lang.reflect.Proxy >>>> return new ProtocolProxy<T>(protocol, proxy, false)//以proxy为参数之一创建 >>>>> this.protocol=protocol
>>>>> this.proxy=proxy
> return protoProxy.getProxy() //所返回的就是刚创建的Proxy
>> return ProtocolProxy.proxy
上面这段摘要让人感觉太复杂、太弯弯绕绕了,但这都是为了保持灵活性和通用性。
由于前面绑定了ProtobufRpcEngine,这里的Proxy是通过ProtobufRpcEngine.getProxy()创建的。所创建的是一个以ProtobufRpcEngine.Invoker为InvocationHandler、支持ApplicationClientProtocolPB界面即ApplicationClientProtocolService.BlockingInterface界面的ProtocolProxy对象,其核心是一个java.lang.reflect.Proxy对象。这样,凡是对Proxy所支持界面的函数调用都会被引导到ProtobufRpcEngine的Invoker.invoke()。
可是,在我们至此所见到的代码中,这个RPC协议栈仍未“落地”,还没有与IPC层挂上。这个问题是在ProtobufRpcEngine.Invoker的构造函数中解决的:
class ProtobufRpcEngine implements RpcEngine {} ]Cl ientCache CLIENTS=new Cl ientCache() ]class Invoker implements RpcInvocationHandler {} ]]Cl ient cl ient //注意,这是 IPC层的Cl ient类 ]]Invoker(Class<? > protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, …) //构造方法,8个参数 > connId=Client.ConnectionId.getConnectionId(addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf) > this(protocol, connId, conf, factory)//转而调用下面这4个参数的构造方法 ]]Invoker(Class<? > protocol, Client.ConnectionId connId, Configuration conf, SocketFactory factory) > this.remoteId=connId > this.client=CLIENTS.getClient(conf, factory, RpcResponseWrapper.class) ==Cl ientCache.getCl ient() >> Cl ient cl ient=cl ients.get(factory) >> if (client==null){ >>+ client=new Client(valueClass, conf, factory) //创建 IPC层的Client对象 >>+ cl ients.put(factory, cl ient) >> } >> return cl ient > this.protocolName=RPC.getProtocolName(protocol) > this.clientProtocolVersion=RPC.getProtocolVersion(protocol)
可见,上面在创建Invoker对象的时候在其构造函数中创建了Client对象,这就是IPC层的Client。我们可以看一下ProtobufRpcEngine.java这个文件前面的package语句,就可知道这是在org.apache.hadoop.ipc这个package中。这样,这个客户端的RPC协议栈就“落地”了,与IPC层挂上了钩。
回到前面RMProxy.createRMProxy()中的else部分,即未开通HA的那个分支,我们已经创建了那里的Proxy,下面就是在所创建的Proxy外面包上一层对于失败重发的处理,使这二者合在一起成为一个RetryProxy,而这个RetryProxy就成为RPC协议栈的上层ApplicationClientProtocolPBClientImpl所看到的Proxy。我们在前面看到,RetryProxy的InvocationHandler是RetryInvocationHandler,所支持的界面则是ApplicationClientProtocol,所以凡是对此界面上的函数调用,例如submitApplication()等等,都会被引导到这个RetryInvocationHandler的invoke():
class RetryInvocationHandler<T> implements RpcInvocationHandler {} ]RetryInvocationHandler(FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) //构造方法 > this(proxyProvider, retryPolicy, Collections.<String, RetryPolicy>emptyMap()) >> this.proxyProvider=proxyProvider >> this.defaultPolicy=defaultPolicy >> this.methodNameToPolicyMap=methodNameToPolicyMap >> this.currentProxy=proxyProvider.getProxy() ]invoke(Obj ect proxy, Method method, Obj ect[]args) //发起RPC调用 > RetryPol icy pol icy=methodNameToPol icyMap.get(method.getName()) > if (policy==null)policy=defaultPolicy > boolean isRpc=isRpcInvocation(currentProxy.proxy) > while (true){ >+ if(isRpc)Client.setCallIdAndRetryCount(callId, retries)//设置是否应该重试的判定标准 >+ try{ >++ Obj ect ret=invokeMethod(method, args) >++ return ret //如果成功就返回,发生异常才有循环 >+ }catch(Exception e){ >++ RetryAction action=policy.shouldRetry(e, retries++, invocationFailoverCount, isIdempotentOrAtMostOnce)//判断是否应该重试 >++ if (action.action==RetryAction.RetryDecision.FAIL){ >+++ throw e //已经无可挽救,放弃 >++ }else { //retry or failover >+++ if (action.delayMillis > 0)Thread.sleep(action.delayMillis) //等待一会儿 >+++ if (action.action==RetryAction.RetryDecision.FAILOVER_AND_RETRY){ >++++ if (invocationAttemptFailoverCount==proxyProviderFailoverCount){ >+++++ proxyProvider.performFailover(currentProxy.proxy) //需要倒换就倒换 //在HA未开通时这是DefaultFailoverProxyProvider.performFailover(),是个空函数 >+++++ proxyProviderFailoverCount++ >++++ }else { >+++++ LOG.warn("A failover has occurred since the start of this method invocation attempt." >++++ } >++++ currentProxy=proxyProvider.getProxy() //换一个Proxy试试 >++++ invocationFailoverCount++
>+++ } //if (action.action==RetryAction.RetryDecision.FAILOVER_AND_RETRY)
>++ } //end if-else中的重试
>+ } //end catch
> } //end while
在invoke()函数中,真正把RPC请求发送出去要靠invokeMethod(),这里把它放在一个while循环中。如果发送成功就返回;但是倘若在发送过程中因失败而发生异常,则因为这个while循环而可以反复进行努力。是否继续努力的决定则取决于RetryPolicies。如果决定继续努力,则有两条路可走:一条是重试,即retry;另一条是Failover,即倒换,详见本书后面对于容错的介绍。
未开通HA时的proxyProvider是DefaultFailoverProxyProvider,它的performFailover()是个空函数,所以只有在开通了HA的情况下这个函数才有意义。
至于RMProxy.createRMProxy()中开通了HA的那个分支,则限于篇幅不再深入下去,留给读者自己进一步阅读分析了。
至此,服务和客户两边的RPC协议栈都搭建好了,下面我们就以作业提交操作为例看一下具体RPC的流程。当用户在Hadoop上启动执行一个App作业的时候,需要把作业提交给RM节点。用户的App可以在集群中的任何一个节点上,是另起一个Java虚拟机、作为一个独立的JVM进程运行的。另外,和我们一般所理解的“运行”不同,这个进程所做的其实只是把作业提交给RM,然后过一会儿就询问一下作业的进展,有消息就在屏幕上显示,直到作业完成。真正的计算并非在本地进行,而是分布在集群中的许多节点上。读者在下一章中将看到,App通过一个YARNRunner对象的submitJob()方法提交作业:
[JobSubmitter.submitJobInternal()> YARNRunner.submitJob()] class YARNRunner implements ClientProtocol {} ]ResourceMgrDelegate resMgrDelegate ]submitJob(JobID jobId, String jobSubmitDir, Credentials ts) > appContext=createApplicationSubmissionContext(conf, jobSubmitDir, ts) > applicationId=resMgrDelegate.submitApplication(appContext) ==ResourceMgrDelegate.submitApplication(appContext) >> client.submitApplication(appContext)//client由YarnClient.createYarnClient()创建 ==YarnClientImpl.submitApplication(appContext)
读者在下一章中将看到,YARNRunner.submitJob()可以说是流程中的一个关键节点,实际上可以看作是作业提交流程的起点。从这个节点开始,流程的第一站是对于ResourceMgrDelegate.submitApplication()的调用,YARNRunner中的resMgrDelegate是个ResourceMgrDelegate对象,这是在YARNRunner的构造方法中创建的。然后, ResourceMgrDelegate中client的类型则是YarnClient,是通过YarnClient.createYarnClient()创建的,实际的类型是YarnClientImpl。YarnClientImpl是对YarnClient的扩充,对于客户端对服务端的通信起着桥头堡似的作用,YARN子系统中所有NM对RM的请求都是经过这个类型的对象出去的。而YarnClientImpl.submitApplication()则是作业提交流程中的又一个关键节点:
[YARNRunner.submitJob()> YarnClientImpl.submitApplication()] YarnClientImpl.submitApplication(ApplicationSubmissionContext appContext) >> applicationId=appContext.getApplicationId() >> request=Records.newRecord(SubmitApplicationRequest.class) >> request.setApplicationSubmissionContext(appContext) >> rmClient.submitApplication(request) //交给rmCl ient完成 >> while (true){ >>+ try{ >>++ appReport=getApplicationReport(applicationId) >>++> request=Records.newRecord(GetApplicationReportRequest.class) >>++> request.setApplicationId(appId) >>++> response=rmClient.getApplicationReport(request) //交给rmClient完成 ==ApplicationClientProtocolPBClientImpl.submitApplication(request) >>++ state=appReport.getYarnApplicationState() >>++ if (! state.equals(YarnApplicationState.NEW) &&! state.equals(YarnApplicationState.NEW_SAVING)){ >>+++ LOG.info("Submitted application"+applicationId) >>+++ break >>++ } >>++ Thread.sleep(submitPollIntervalMillis) >>+ }catch(ApplicationNotFoundException ex){ >>++//FailOver or RMrestart happens before RMStateStore saves ApplicationState >>++ LOG.info("Re-submit application"+applicationId+"with the"+ "same ApplicationSubmissionContext") >>++ rmClient.submitApplication(request) >>+ } >> }
这里的rmClient实际上是个ApplicationClientProtocolPBClientImpl,这个类实现了ApplicationClientProtocol界面,所以在YarnClientImpl的代码中对rmClient的类型说明是ApplicationClientProtocol。这个类是由protobuf提供的,其代码来自工具protoc对proto文件的编译:
[YARNRunner.submitJob()> YarnClientImpl.submitApplication()
> ApplicationClientProtocolPBClientImpl.submitApplication()]
ApplicationClientProtocolPBClientImpl.submitApplication(SubmitApplicationRequest request)
> requestProto=((SubmitApplicationRequestPBImpl)request).getProto()
> proto=proxy.submitApplication(null, requestProto)
> return new SubmitApplicationResponsePBImpl(proto)
这里的proxy,我们回顾一下,是前面在创建ApplicationClientProtocolPBClientImpl对象时在其构造函数中调用RPC.getProxy()创建的一个ProtocolProxy类对象。调用时的第一个参数是ApplicationClientProtocolPB.class,这是一个界面,说是扩充,实际上就等价于ApplicationClientProtocolService.BlockingInterface。而ProtocolProxy类的核心则是由Java语言提供的java.lang.reflect.Proxy。
所以,简而言之,这个proxy是一个以ProtobufRpcEngine.Invoker为InvocationHandler,实现了ApplicationClientProtocolService.BlockingInterface界面的java.lang.reflect.Proxy对象。这样,凡是对定义于这个界面上的函数调用都被引导到ProtobufRpcEngine.Invoker.invoke(),而submitApplication()正是这个界面上定义的函数之一。
[YARNRunner.submitJob()> YarnClientImpl.submitApplication() > ApplicationClientProtocolPBClientImpl.submitApplication() => ProtobufRpcEngine.Invoker.invoke()] ProtobufRpcEngine.Invoker.invoke(Obj ect proxy, Method method, Obj ect[]args) > RequestHeaderProto rpcRequestHeader=constructRpcRequestHeader(method) > theRequest=(Message)args[1] > wrapper=new RpcRequestWrapper(rpcRequestHeader, theRequest) > val=(RpcResponseWrapper)client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, wrapper, remoteId, fallbackToSimpleAuth) ==Client.call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth) >> call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT, fallbackToSimpleAuth) >>> Call call=createCall(rpcKind, rpcRequest) >>> Connection connection=getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth) >>> connection.sendRpcRequest(call) >>> while (! call.done)call.wait() >>> return call.getRpcResponse() >>>> return rpcResponse //see setRpcResponse(Writable rpcResponse) > prototype=getReturnProtoType(method) > returnMessage=prototype.newBuilderForType().mergeFrom(val.theResponseRead).build() > return returnMessage
到了IPC这一层,就无须多说了。
这个RPC请求转化为一个ApplicationClientProtocol协议的报文,被发送到RM节点。
客户端的RPC报文到达服务端后,由Server.Handler线程加以处理。从系统结构的角度看,Handler线程处于接收端“协议栈(protocol stack)”的底部,是最靠近硬件设备的;但是从服务端的函数调用路径看,它却又是处于顶部,服务端因此而引起的一系列操作都是在Handler线程的上下文中由其直接或间接的调用而发生的。所以,我们就从Handler的run()函数看起。
Server.Handler.run() > while (running){ >+ call=callQueue.take(); //pop the queue; maybe blocked here >+ value=call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp) ==ProtobufRpcEngine.call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp) >+> RpcRequestWrapper request=(RpcRequestWrapper)writableRequest >+> RequestHeaderProto rpcRequest=request.requestHeader >+> String methodName=rpcRequest.getMethodName() //从报文中获取方法名称 >+> String protoName=rpcRequest.getDeclaringClassProtocolName() //获取协议名称 >+> ProtoClassProtoImpl protocolImpl=getProtocolImpl(server, protoName, clientVersion) //根据协议名称找到实现该协议的Proto对象 >+> BlockingService service=(BlockingService)protocolImpl.protocolImpl //这就是当初在newReflectiveBlockingService()里创建的那个对象 //在那里的代码中动态定义了一个实现BlockingService界面的类 >+>MethodDescriptor methodDescriptor= service.getDescriptorForType().findMethodByName(methodName) //根据方法名称找到这个方法的MethodDescriptor >+> prototype=service.getRequestPrototype(methodDescriptor) >+> param=prototype.newBuilderForType().mergeFrom(request.theRequestRead).build() >+> result=service.callBlockingMethod(methodDescriptor, null, param)//调用这个方法 ==ApplicationClientProtocol.BlockingService.callBlockingMethod( methodDescriptor, null, param) >+>> switch(method.getIndex()){ >+>> case 2: >+>>+ return impl.submitApplication(controller, (SubmitApplicationRequestProto)request) //impl 是调用newReflectiveBlockingService()时的参数 //就是RM节点上提供对Client服务的ClientRMService对象 >+>>+> ApplicationClientProtocolPBServiceImpl.submitApplication(RpcController arg0, SubmitApplicationRequestProto proto) >+>>+>> request=new SubmitApplicationRequestPBImpl(proto) >+>>+>> response=real.submitApplication(request)//这就是此次RPC真正的目标函数 ==ClientRMService.submitApplication(SubmitApplicationRequest request) >+>> } >+ setupResponse(buf, call, returnStatus, detailedErr, value, errorClass, error) >+ responder.doRespond(call) > }
结合注释和前文所述,这里已经很清楚,就不用多说了。
上面讲的是YARN子系统中提供对App即Client服务的RPC机制,其protocol是ApplicationClientProtocol。但那只是众多服务协议中的一种,其他的还有MRClientProtocol、ContainerManagementProtocol、DatanodeProtocol、InterDatanodeProtocol、NamenodeProtocol、ClientDatanodeProtocol、ClientNamenodeProtocol等。但是,不管在应用层上是什么protocol,它们的底层都是一样的,都是建立在Protobuf和RPC的基础上,并且都是采用Java的reflection机制实现的。
HDFS子系统也是一样,也要在服务端和客户端建立起RPC通信的“协议栈”,也是那样的RPC.Server和Client,只不过所用的protocol当然不是ApplicationClientProtocol,而是用于HDFS的protocol了,但是底层的机制还是一样。
以NameNode上的NameNodeRpcServer为例,那是NameNode在初始化的过程中创建的,这个类的数据结构部分的摘要如下:
class NameNodeRpcServer implements NamenodeProtocols {} ]FSNamesystem namesystem ]NameNode nn ]RPC.Server serviceRpcServer ]InetSocketAddress serviceRPCAddress ]RPC.Server cl ientRpcServer ]InetSocketAddress clientRpcAddress
NameNode在其初始化阶段创建了NameNodeRpcServer对象,其构造函数的摘要为:
[NameNode.initialize()> createRpcServer()> NameNodeRpcServer.NameNodeRpcServer()] NameNodeRpcServer.NameNodeRpcServer(Configuration conf, NameNode nn) > int handlerCount=conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, …) > RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class) > clientProtocolServerTranslator= new ClientNamenodeProtocolServerSideTranslatorPB(this) > clientNNPbService=ClientNamenodeProtocol.newReflectiveBlockingService( clientProtocolServerTranslator) > dnProtoPbTranslator=new DatanodeProtocolServerSideTranslatorPB(this) > dnProtoPbService=DatanodeProtocolService.newReflectiveBlockingService( dnProtoPbTranslator) > namenodeProtocolXlator=new NamenodeProtocolServerSideTranslatorPB(this) > NNPbService=NamenodeProtocolService.newReflectiveBlockingService( namenodeProtocolXlator) > refreshAuthPol icyXlator= new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this) > refreshAuthService=RefreshAuthorizationPolicyProtocolService .newReflectiveBlockingService(refreshAuthPolicyXlator)
> refreshUserMappingXlator= new RefreshUserMappingsProtocolServerSideTranslatorPB(this) > refreshUserMappingService=RefreshUserMappingsProtocolService. newReflectiveBlockingService(refreshUserMappingXlator) > refreshCallQueueXlator=new RefreshCallQueueProtocolServerSideTranslatorPB(this) > refreshCallQueueService=RefreshCallQueueProtocolService. newReflectiveBlockingService(refreshCallQueueXlator) > genericRefreshXlator=new GenericRefreshProtocolServerSideTranslatorPB(this) > genericRefreshService=GenericRefreshProtocolService.newReflectiveBlockingService( genericRefreshXlator) > getUserMappingXlator=new GetUserMappingsProtocolServerSideTranslatorPB(this) > getUserMappingService=GetUserMappingsProtocolService.newReflectiveBlockingService( getUserMappingXlator) > haServiceProtocolXlator=new HAServiceProtocolServerSideTranslatorPB(this) > haPbService=HAServiceProtocolService.newReflectiveBlockingService( haServiceProtocolXlator) > traceAdminXlator=new TraceAdminProtocolServerSideTranslatorPB(this) > traceAdminService=TraceAdminService.newReflectiveBlockingService( traceAdminXlator) >WritableRpcEngine.ensureInitialized() > InetSocketAddress serviceRpcAddr=nn.getServiceRpcServerAddress(conf) > if (serviceRpcAddr! =null){ >+ String bindHost=nn.getServiceRpcServerBindHost(conf) >+ if (bindHost==null)bindHost=serviceRpcAddr.getHostName() >+ serviceHandlerCount=conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, …) >+ this.serviceRpcServer=new RPC.Builder(conf) .setProtocol(…ClientNamenodeProtocolPB.class)….build() >+//Add all the RPC protocols that the namenode implements >+ DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, serviceRpcServer) >+> RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine.class) >+> server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service) ==RPC.Server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service) >+>> registerProtocolAndImpl(rpcKind, protocolClass, protocolImpl) >+>>> String protocolName=RPC.getProtocolName(protocolClass) >+>>> long version=RPC.getProtocolVersion(protocolClass) >+>>> pn=new ProtoNameVer(protocolName, version) >+>>> pi=new ProtoClassProtoImpl(protocolClass, protocolImpl)) >+>>> pmap=getProtocolImplMap(rpcKind) >+>>> pmap.put(pn, pi)
>+ DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, serviceRpcServer) >+ DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, serviceRpcServer) >+ … >+//We support Refreshing call queue here in case the client RPC queue is full >+ DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, refreshCallQueueService, serviceRpcServer) >+ DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, genericRefreshService, serviceRpcServer) >+ … >+//Update the address with the correct port >+ InetSocketAddress listenAddr=serviceRpcServer.getListenerAddress() >+ serviceRPCAddress=new InetSocketAddress(serviceRpcAddr.getHostName(), listenAddr.getPort()) >+ nn.setRpcServiceServerAddress(conf, serviceRPCAddress) > }else { >+ serviceRpcServer=null >+ serviceRPCAddress=null > } > > InetSocketAddress rpcAddr=nn.getRpcServerAddress(conf) > String bindHost=nn.getRpcServerBindHost(conf) > if (bindHost==null)bindHost=rpcAddr.getHostName() > LOG.info("RPC server is binding to"+bindHost+":"+rpcAddr.getPort()) > this.clientRpcServer=new RPC.Builder(conf) .setProtocol(…ClientNamenodeProtocolPB.class)…build() >//Add all the RPC protocols that the namenode implements > DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, clientRpcServer) > DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, clientRpcServer) > DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, clientRpcServer) > … > if (serviceAuthEnabled=conf.getBoolean(…HADOOP_SECURITY_AUTHORIZATION, false)){ >+ clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()) >+ if (serviceRpcServer! =null){ >++ serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()) >+ }
> }
>//The rpc-server port can be ephemeral… ensure we have the correct info
> InetSocketAddress listenAddr=clientRpcServer.getListenerAddress()
> clientRpcAddress=new InetSocketAddress(rpcAddr.getHostName(), listenAddr.getPort())
> nn.setRpcServerAddress(conf, clientRpcAddress)
> minimumDataNodeVersion=conf.get(
…DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY, …)
>//Set terse exception whose stack trace won't be logged
> this.clientRpcServer.addTerseExceptions(SafeModeException.class,
FileNotFoundException.class, …)
看似很大一块,那只是NameNodeRpcServer需要支持的协议比较多而已。这就留给读者自己慢慢看了。