大数据处理系统:Hadoop源代码情景分析
上QQ阅读APP看书,第一时间看更新

第4章 Hadoop的RPC机制

4.1 RPC与RMI

RPC是“Remote Procedure Call”即“远地过程调用”的缩写。这个机制的目的,是让一台机器上的程序能像调用本地的“过程”那样来调用别的机器上的某些过程。这里所谓“过程”,在传统的C程序设计中统称为“函数”,在Pascal程序设计中既可以是PROCEDURE也可以是FUNCTION,在Java等OO程序设计语言中就是“方法(method)”。所以,Java传统的RPC机制称为RMI,即“远地方法启用(Remote Method Invocation)”。对RPC机制的要求是:从程序代码上看,过程的调用者就好像在调用本地函数一样,但是被调用过程的代码实际上在别的机器上,被调用的过程是在别的机器上执行,然后返回执行的结果,对调用者而言就像从本地的函数调用返回一样。在这个过程中,调用者(线程)发动调用之后就进入睡眠,直至调用返回时才被唤醒。显然,这种机制的实现,是以跨机器节点的进程间通信即IPC为基础的。所以,就调用者而言,虽然形式上就像本地的调用一样,但是花费的时间显然是不同的,而且调用失败的概率也会增加。因此,即使是调用最简单的、在本地的条件下绝不会失败的函数也得做好失败(Exception)的准备,把调用放在try程序段中,并安排好异常处理和对策。

需要特别说明的是,RPC并非针对远地的所有过程,并不是对远地所有的过程都可以随心所欲地通过RPC加以调用,而只能针对预先确定的某些过程,并且在程序上得有些准备和安排。

Hadoop本可以直接借用Java的RMI机制,但是出于种种考虑决定自搞一套RPC机制,这在很大程度上是因为有Google的开源软件ProtocolBuffer可资利用。而Google之所以要自己开发一套ProtocolBuffer,应该是要以此为核心实现自己的RPC机制,主要应是出于软件开发效率、灵活性和多语种的考虑。

考虑到RPC往往涉及对象即数据结构的跨节点传输,这里面又有串行化(serialization)和去串行化的问题,所以,RPC和ProtocolBuffer也并不简单。不过对于Hadoop而言这毕竟是很底层的东西,读者如果不想一开始就在这里陷得太深,也可以先跳过本章,到后面再回头来看RPC和ProtocolBuffer的细节。

如上所述,RPC是建立在IPC的基础之上的,而现在最灵活、最通用的通信手段就是Socket。

RPC交互的两方,总有一方是通信的主动发起方,也是某种服务的需求方;另一方则是被动的响应方,也是服务的提供方。所以,通信中至少有一方扮演着“服务者”即Server的角色。如果是双方对等的通信,则各自都有其作为Server的一面。在Hadoop的系统结构中,节点有主从(Master/Slave)之分,通常主节点扮演着Server的角色,主从节点间的通信都是由从节点发起的,主节点则像是“公仆”。而不同的节点之间的通信则是对等的,谁都可以发起,所以每个从节点也都有作为Server的一面。

为此,Hadoop中定义了一个抽象类Server,下面是它的摘要:


      abstract class Server {} //org.apache.hadoop.ipc.Server

      ]Map<RPC.RpcKind, RpcKindMapValue> rpcKindMap

      ]Map<String, Class<? >> PROTOCOL_CACHE

      ]ThreadLocal<Server> SERVER=new ThreadLocal<Server>()

      ]ThreadLocal<Call> CurCall=new ThreadLocal<Call>()

      ]int handlerCount;              //number of handler threads

      ]int readThreads; //number of read threads

      ]int readerPendingConnectionQueue; //number of connections to queue per read thread

      ]Class<? extends Writable> rpcRequestClass; //class used for deserializing the rpc request

      ]CallQueueManager<Call> callQueue

      ]ConnectionManager connectionManager

      ]Listener listener               //listener是个线程,其类型定义见下

      ]Responder responder

      ]Handler[]handlers

      ----- 以上是结构成分与数据部分,以下是对方法(函数)及内嵌类的定义部分----

      ]registerProtocolEngine(RPC.RpcKind rpcKind, …)//登记一种RpcKind及其ProtocolEngine

      ]getRpcInvoker(RPC.RpcKind rpcKind)

      ]class Call implements Schedulable {}

      ]class Listener extends Thread {}

      ]]Reader[]readers   //Listener对象内部的Reader线程数组

      ]]Listener()       //Listener的构造函数

          > address=new InetSocketAddress(bindAddress, port) //Socket地址和端口号

          >//Create a new server socket and set to non blocking mode

          > acceptChannel=ServerSocketChannel.open()       //创建 Socket通道

          > acceptChannel.configureBlocking(false)

          > bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig)//绑定地址

          > port=acceptChannel.socket().getLocalPort(); //Could be an ephemeral port

          > selector=Selector.open()//create a selector,类似于select()系统调用中的fd_set

          > readers=new Reader[readThreads]    //创建一个线程数组

          > for (int i=0; i < readThreads; i++){//预先创建一组Reader线程并加以启动

          >+ Reader reader=new Reader("Socket Reader #"+(i+1)+"for port"+port)

          >+ readers[i]=reader

          >+ reader.start() //start所创建的Reader线程

          > }


          >//Register accepts on the server socket with the selector.

          > acceptChannel.register(selector, SelectionKey.OP_ACCEPT)

          > this.setName("IPC Server listener on"+port)

          > this.setDaemon(true)   //断开该线程与标准 I/O通道的连接,使其变成后台线程

      ]]run() //这是Listener线程的run()函数

      ]]class Reader extends Thread {} //接收线程Reader,接受和处理服务请求

      ]]]Reader(String name)

          > super(name)

          > this.pendingConnections=

                      new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue)

          > this.readSelector=Selector.open()

      ]]]addConnection(Connection conn)

          > pendingConnections.put(conn)

          > readSelector.wakeup()

      ]]doRead(SelectionKey key)

          > Connection c=(Connection)key.attachment()

          > count=c.readAndProcess()

          > if (count < 0)closeConnection(c)

      ]class Responder extends Thread {} //回应线程Responder

      ]class Connection {} //代表着一个具体的连接

      ]]getAuthorizedUgi(String authorizedId)

      ]]saslReadAndProcess(DataInputStream dis)    //SASL加密报文的读入和处理

      ]]saslProcess(RpcSaslProto saslMessage)

      ]]processSaslMessage(RpcSaslProto saslMessage)//处理加密的报文

      ]]…

      ]class Handler extends Thread {} //Call 处理线程

      ]Server(String bindAddress, int port, Class<? extends Writable> rpcRequestClass,

              int handlerCount, int numReaders, int queueSizePerHandler, Configuration conf,

              String serverName, SecretManager<? extends TokenIdentifier> secretManager,

              String portRangeConfig)    //Server类对象的构造函数

          > this.bindAddress=bindAddress

          > this.conf=conf

          > this.portRangeConfig=portRangeConfig

          > this.port=port

          > …

          > String prefix=getQueueClassPrefix()

          > this.callQueue=new CallQueueManager<Call>(getQueueClass(prefix, conf),

                                                      maxQueueSize, prefix, conf)

          > this.secretManager=(SecretManager<TokenIdentifier>)secretManager

          > this.authorize=conf.getBoolean(


                          CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)

        > listener=new Listener()                //创建一个Listener对象

        > connectionManager=new ConnectionManager()

        > responder=new Responder()

        > if(secretManager! =null||UserGroupInformation.isSecurityEnabled()){//如果使用加密

        >+ SaslRpcServer.init(conf)

        >+ saslPropsResolver=SaslPropertiesResolver.getInstance(conf)

        > }

        > this.exceptionsHandler.addTerseExceptions(StandbyException.class)

      ]abstract Writable call(RPC.RpcKind rpcKind, String protocol,

                                                    Writable param, long receiveTime)

Server是个抽象类,未经扩充落实是不能为其创建具体对象的。为什么这是个抽象类,是它的什么成分还一时无法落实呢?其实只有一样,那就是它的(带4个参数的)方法函数call(),这是个抽象方法,因为在这里还不能落实,无法为之提供具体的代码。然而这个call()是个十分重要的函数,因为这就是RPC三个字母中的那个C,即Call。RPC的关键就是在远地调用某个函数,但是具体怎么调用却与所用的“协议”即protocol有关。协议不同,下面的代码就不同,所以无法脱离具体的协议来提供实现这个方法的代码。显然,能实际创建的Server至少得要补上这个call()方法的具体实现才行。在Java语言中有两种手段可以做到这个:一种是静态定义一个扩充落实这个抽象类的实体类,在里面提供call()方法的代码,然后加以创建;另一种是动态定义,即在通过new语句创建Server对象时临时补上一个call()方法。

Hadoop的代码中采用的办法是:先采用静态定义的办法对Server加以扩充,但暂不落实call()。我们一般所说的子类扩充(extends)父类,其实不仅仅是扩充,而有着三方面的意思:一是扩充,就是增加数据成分和操作方法;二是落实,就是补上父类声称提供但实际并未实现的操作方法;三是修改,就是用同名同参数的操作方法覆盖替代父类所提供的操作方法。Hadoop的代码中就是先搁下对call()的落实,而先对抽象类Server进行扩充,具体就是在另一个类RPC中定义了一个同名的内嵌抽象类Server,即RPC.Server,用来扩充落实抽象类Server。但是这个RPC.Server仍是抽象类,还需要进一步的扩充落实:


      abstract static class Server extends org.apache.hadoop.ipc.Server {}

这是在RPC类内部,所以实际定义的是RPC.Server,这是对org.apache.hadoop.ipc.Server,就是上面那个抽象类的扩充。但是,这里声明了是abstract,所以仍需进一步对RPC.Server加以扩充落实,才能为其创建具体对象。有意思的是,RPC.Server其实已经补上了call()这个函数,下面是它的代码摘要,并已展开:


      RPC.Server.call(RPC.RpcKind rpcKind, String protocol, Writable rpcRequest, long receiveTime)

      > invoker=getRpcInvoker(rpcKind)

      >> RpcKindMapValue val=rpcKindMap.get(rpcKind)

      >> return (val==null)? null :val.rpcInvoker   //这是从getRpcInvoker()返回

      > return invoker.call(this, protocol, rpcRequest, receiveTime)

可是在RPC.Server的定义前面还是加上了abstract的标签。这样,Java编译器就会把住程序编译这一关,不让在程序中直接创建RPC.Server的对象,这当然是个故意的安排。这里的getRpcInvoker()是由org.apache.hadoop.ipc.Server这个类提供的,它根据给定的RPC类型rpcKind从一个rpcKindMap中获取其RpcInvoker,然后调用这个RpcInvoker所提供的call()函数。问题是,如果getRpcInvoker()返回null呢?如果没有措施保证rpcKindMap中确实有给定rpcKind的RpcInvoker,那么返回null几乎是必然的。代码的设计者之所以要在RPC.Server的定义前面加上标签abstract,就是为了防止这样的情况发生。至此为止我们还不能认为call()这个函数已经得到落实和解决。设计者的意图,是要另外用一个类来扩充RPC.Server,在那里保证给定的rpcKind的RpcInvoker一定会在rpcKindMap中,而且那个RpcInvoker的call()一定是符合设计要求的call(),然后为这个经过扩充的类创建对象。至于为什么不是一步到位,而要有这样一个中间步骤,应该是因为不同的rpcKind导致具体实现有较大的不同,所以本着尽量提取公因子的原则才会有这么中间的一步。

不过光是call()这个方法尚未落实解决并不妨碍我们对Server类其他方面工作机理的理解,再说我们现在暂时也并不需要知道对具体函数的调用。

简而言之,当创建一个Server对象时,如果不考虑加密等附加的细节,那么首先要在这个Server的构造函数中创建一个Listener对象,这是用来“倾听”连接请求的;同时还要创建一个Responder对象,这是用来向对方做出回应的;还有一个CallQueueManager,用来管理Listenner内部的请求队列。

注意,既然有Listenner,就说明底层的网络通信是有连接的,所以采用的是TCP而不是UDP协议。

当然,创建Listener和Responder这两个对象时要执行它们的构造函数。从摘要中可见,在Listener的构造函数中会创建Socket,然后创建一个线程数组readers,这个数组的大小为readThreads,来自创建Server时的参数numReaders。这表示Listener中需要有多少个用来响应连接请求的Reader线程,一个Server通常都需要维持多个并发连接,所以就需要有多个Reader线程。为提高响应速度,这里预先创建了一组Reader线程并加以启动,而并非有连接建立时才临时创建线程。

创建并启动了这些线程之后,它们就各自进入了自己的run()函数。我们先看Listener的run()函数:


      Server.Listener.run()

      > SERVER.set(Server.this)

      > connectionManager.startIdleScan()

      > while (running){            //Listener线程的主循环

      >+ getSelector().select() //睡眠等待连接请求到来

      >+ Iterator<SelectionKey> iter=getSelector().selectedKeys().iterator()//请求可能不止一个

      >+ while (iter.hasNext()){ //逐个扫描同时到来的连接请求

      >++ key=iter.next()

      >++ iter.remove()

      >++ if (key.isValid())if (key.isAcceptable())

      >+++ doAccept(key)  //接受连接请求

      >+++> ServerSocketChannel server=(ServerSocketChannel)key.channel()


      >+++> while((channel=server.accept())! =null){ //接受连接请求后成为一个通道

      >+++>+ Reader reader=getReader() //从数组readers中获取一个空闲的Reader线程

      >+++>+ Connection c=connectionManager.register(channel)//在此通道上建立一个连接

      >+++>+ key.attach(c); //so closeCurrentConnection can get the obj ect

      >+++>+ reader.addConnection(c) //将此连接指派给这个Reader线程

      >+++>++ pendingConnections.put(conn) //挂入这个Reader线程的待处理队列

      >+++>++ readSelector.wakeup()      //唤醒这个线程,下面就是这个线程的事了

      >+++> }

      >+ } //while (iter.hasNext())

      > } //end while(running)

Listener线程监听着它的Socket通道,当有连接请求到来并认为有效和可接受时,就为其创建一个Connection对象,把它交给一个Reader处理。同时到来的请求可能不止一个,所以这里需要有个内嵌的while循环。

所谓交给一个Reader线程处理,就是从线程数组readers中找一个空闲的Reader线程,把所创建的Connection对象挂入其pendingConnections队列。下面就是Reader线程的事了:


      Server.Listener.Reader.run()

      > doRunLoop()

      >> while (running){    //Reader线程的主循环

      >>+ size=pendingConnections.size()              //检查队列中是否有连接

      >>+ for (int i=size; i>0; i--){               //若有,就逐个处理

      >>++ Connection conn=pendingConnections.take()    //从队列中获取一个连接

      >>++ conn.channel.register(readSelector, SelectionKey.OP_READ, conn)

                                                        //登记此连接为有效

      >>+ }

      >>+ readSelector.select()                      //接收属于有效连接的报文

      >>+ Iterator<SelectionKey> iter=readSelector.selectedKeys().iterator()

      >>+ while (iter.hasNext()){     //对属于有效连接的每个报文,即每个RPC请求

      >>++ key=iter.next()

      >>++ iter.remove()

      >>++ if (key.isValid()&&key.isReadable())doRead(key)

      >>++> Connection c=(Connection)key.attachment()

      >>++> count=c.readAndProcess()==Connection.readAndProcess()

      >>++>> while (true){

      >>++>>+ …                          //读入和处理报头

      >>++>>+ count=channelRead(channel, data) //然后读入本次RPC请求的内容

      >>++>>+ processOneRpc (data.array())     //处理一次RPC

      >>++>> }

      >>++> if (count < 0)closeConnection(c)


      >>+ } //end while (iter.hasNext())

      >> } //end while(running)

      > readSelector.close()

Reader线程本来就在其主循环中睡眠等待,一旦被唤醒就扫描其pendingConnections,就其中的每个连接向所属通道登记。然后,一旦有属于这些连接的RPC请求到来,就通过doRead()逐一加以处理,具体就是通过readAndProcess()提供服务,那就是processOneRpc():


      [Server.Listener.Reader.run()> doRunLoop()> doRead()> Connection.readAndProcess()

      > processOneRpc()]




      Connection.processOneRpc(byte[]buf)

      > DataInputStream dis=new DataInputStream(new ByteArrayInputStream(buf))

      > RpcRequestHeaderProto header=

                        decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis)

      > callId=header.getCallId()

      > checkRpcHeaders(header)

      > processRpcRequest(header, dis)

      >> Class<? extends Writable> rpcRequestClass=getRpcRequestWrapper(header.getRpcKind())

      >> rpcRequest=ReflectionUtils.newInstance(rpcRequestClass, conf)

      >> rpcRequest.readFields(dis)

      >> Call call=new Call(header.getCallId(), header.getRetryCount(), rpcRequest, this,

          ProtoUtil.convert(header.getRpcKind()), header.getClientId().toByteArray(), traceSpan)

      >> callQueue.put(call); //queue the call; maybe blocked here,队列已满就睡眠等待

                    //将此call 挂入 ServercallQueue队列,交由Handler处理

注意这里的call是个Call类对象,代表着一个RPC调用请求。不要与前述的call()函数相混淆。

像Listener有个线程数组readers一样,Server也有个线程数组handlers,里面有一组Handler线程,这些线程都是在Server的初始化阶段在其start()函数中创建的。每个Hanler线程都盯着Server的callQueue队列,一被唤醒就试图从这个队列里获取一个call加以处理。


      Handler.run()

      > SERVER.set(Server.this)

      > ByteArrayOutputStream buf=new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE)

      > while (running){ //Hanler线程的主循环

      >+ Call call=callQueue.take(); //pop the queue; maybe blocked here

                                    //试图从队列中取一个call 对象,没有就睡眠等待

      >+ if (! call.connection.channel.isOpen())continue

      >+ CurCall.set(call)        //以此call 为当前Call

      >+ if (call.connection.user==null){ //如果所属的连接是全方位的:

      >++ value=call(call.rpcKind, call.connection.protocolName,

                                                      call.rpcRequest, call.timestamp)


                          //调用RPC.Server.call(),替对方在本地实施这个过程调用,即RPC

      >+ }else { //如果所属的连接有访问权限的管理,就以对方这用户的名义和身份

      >++ value=call.connection.user.doAs(new PrivilegedExceptionAction<Writable>())

                    ]run()

                    > return call(call.rpcKind, call.connection.protocolName,

                                                    call.rpcRequest, call.timestamp)

                          //调用RPC.Server.call(),替对方在本地实施这个过程调用,即RPC

      >+ }

      >+ CurCall.set(null)

      >+setupResponse(buf, call, returnStatus, detailedErr, value, errorClass, error)//设置响应报文

      >+ responder.doRespond(call)                  //对请求方做出回应:

      >+> call.connection.responseQueue.addLast(call) //挂入所属连接的responseQueue队列

      >+> if (call.connection.responseQueue.size()==1){ //如果队列中只有这么一个回应

      >+>+ processResponse(call.connection.responseQueue, true)    //那就越俎代庖了

      >+> }

      > }//end while

Handler线程专门替对方在本地实施远程过程调用,那就是这里对call()的调用要达到的目标。至于call()具体是怎么实施,如前所述,Server是个抽象类,实际上这要看具体call所遵循的规程即Protocol,那要由扩充落实Server的实体类提供,后面我们会看到这方面的实例。

实施了call之后,要对请求方作出回应,包括返回结果(函数的返回值有可能是个需要加以串行化的对象),所以这里要通过setupResponse()准备好一个响应,Call类的对象内部有个缓冲区rpcResponse,就是用来盛放回应信息的。Handler线程并不直接发送回应,另外有Responder线程专管这个事,只要把call挂入其所属连接的responseQueue队列就行。

每个连接,即每个Connection类对象,都有个responseQueue队列。而Server则有个Responder线程,这是在Server的构造函数中创建的。每当创建一个连接时都要向这个Responder线程登记,让其照看这个连接的responseQueue队列。

所以,Responder线程在其主循环中监视着所有连接的responseQueue队列:


      Responder.run()

      > SERVER.set(Server.this)

      > doRunLoop()

      >> while (running){  //Responder线程的主循环

      >>+ waitPending(); //If a channel is being registered, wait

      >>+ writeSelector.select(PURGE_INTERVAL)

                      //等待所有已登记连接的responseQueue队列中有需要回应的call 到来

      >>+ Iterator<SelectionKey> iter=writeSelector.selectedKeys().iterator()

      >>+ while (iter.hasNext()){   //对于其中的每一个call

      >>++ SelectionKey key=iter.next()

      >>++ iter.remove()


      >>++ if (key.isValid()&&key.isWritable())doAsyncWrite(key)

      >>++> Call call=(Call)key.attachment()

      >>++> if (call==null)return

      >>++> processResponse(call.connection.responseQueue, false) //发送响应报文

      >>+ }

      >>+ …

      >> }

有关响应报文发送即processResponse()的具体细节,这里就不深入下去了。实际上这也可以很复杂,因为网络上发送的包是有长度限制的,一个响应报文可能要好多次才能发完。但是不了解这些底层的细节不至于严重影响我们对RPC机制的理解。

另外,注意我们这是摘要,实际上有许多(我认为)对当前这个主题不那么重要的代码就被省略了,想要彻底搞个明白的读者还是应该回到源代码中。

对提供服务的Server一方有所了解以后,我们把目光转向提出服务请求的一方。请求服务的一方称为客户方,或用户方,这一边定义了一个与Server相对应的Client类,凡是对外提出服务请求都需要由Client对象经手。当然,Client对象只是在Server面前代表客户方,而绝非客户方的全部。


      class Client {}  //org.apache.hadoop.ipc.Client, a client for an IPC service.

      ]Hashtable<ConnectionId, Connection> connections //Connection线程的集合

                    //一个Client可以有多个对外连接,每个连接都有个Connection线程照看

      ]class ClientExecutorServiceFactory{}

      ]setPingInterval()

      ]checkResponse(RpcResponseHeaderProto header) //Check the rpc response header

      ]createCall(RPC.RpcKind rpcKind, Writable rpcRequest)

      > return new Call(rpcKind, rpcRequest)

      ]class Call {} //Class that represents an RPC call,每个Call 对象代表着一次RPC调用

      ]getConnection(ConnectionId remoteId, Call call,

                        int serviceClass, AtomicBoolean fallbackToSimpleAuth)//建立连接

      ]class Connection extends Thread{} //Client这一边,每个连接都有个Connection线程

      ]]Socket socket

      ]]DataInputStream in

      ]]DataOutputStream out

      ]]setupConnection()

          > while (true){

          >+ try{

          >++ this.socket=socketFactory.createSocket()

          >++ this.socket.setTcpNoDelay(tcpNoDelay)

          >++ this.socket.setKeepAlive(true)

          >++ NetUtils.connect(this.socket, server, connectionTimeout)

          >++ this.socket.setSoTimeout(pingInterval)


          >++ return

          >+ }

          > }

      ]]sendPing()

          > if(curTime-lastActivity.get()>=pingInterval){

          >+ out.writeInt(pingRequest.size())

          >+ pingRequest.writeTo(out)

          >+ out.flush()

          > }

      ]]run()

          > while (waitForWork()){ //wait here for work-read or close connection

          >+ receiveRpcResponse()

          > }

      ]]sendRpcRequest(Call call)

      ]]receiveRpcResponse()

      ]call(Writable param, InetSocketAddress address)

                                          //注意Client.call() Server.call()是两码事

          > call(RPC.RpcKind.RPC_BUILTIN, param, address)

          >> call(rpcKind, param, address, null)

          >>> ConnectionId remoteId=ConnectionId.getConnectionId(addr, null, ticket,0, conf)

          >>> call(rpcKind, param, remoteId)

      ]call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass)

          > call=createCall(rpcKind, rpcRequest)

          > connection=getConnection(remoteId, call, serviceClass)

          > connection.sendRpcRequest(call)

          > while (! call.done)call.wait()

Client管理着对外(服务方)的连接。凡是已经建立并且还维持着的连接,都有一个Connection线程,Client类对象内部有个集合connections,根据ConnectionId可以从中找到该连接的Connection线程。

当客户方需要进行RPC调用时,就通过Client.call()发出调用请求。注意,这个call()与前面Server中的那个call()是两码事。Client有好几个采用不同参数序列的call()函数,其中最简单的是只有两个参数的call():


      call(Writable param, InetSocketAddress address)

      > call(RPC.RpcKind.RPC_BUILTIN, param, address)

这里的param就是要求进行RPC的报文,address当然是对方的地址。这个call()函数添上了另一个实参RPC.RpcKind.RPC_BUILTIN,调用另一个call()函数。RPC.RpcKind意为“RPC的方式”,是个枚举类型,其取值有三种:RPC_BUILTIN、RPC_WRITABLE和RPC_PROTOCOL_BUFFER,这里我们暂且不必关心。

最后解决问题、真正“干实事”的是下面这个call():


      Client.call(RPC.RpcKind rpcKind, Writable rpcRequest,

      ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth)

      > Call call=createCall(rpcKind, rpcRequest)  //创建一个Call 对象

      >> return new Call(rpcKind, rpcRequest)

      > Connection connection=getConnection(remoteId, call, serviceClass, fallbackToSimpleAuth)

                                                //如果尚未建立连接就与对方建立连接:

      >> do {

      >>+ Connection connection=connections.get(remoteId)

      >>+ if (connection==null){

      >>++ connection=new Connection(remoteId, serviceClass)

      >>++ connections.put(remoteId, connection)

      >>+ }

      >> }while (! connection.addCall(call))

      >> connection.setupIOstreams(fallbackToSimpleAuth)

      >> return connection

      > connection.sendRpcRequest(call)==Connection.sendRpcRequest(call) //发出RPC请求

      > while (! call.done)call.wait() //等待对方的回应

      > return call.getRpcResponse() //返回对方的回应

进行RPC调用,首先要创建一个代表着这次RPC的Call对象;然后从connections中找到通往同一个Server、同一类服务的Connection线程,如果还没有建立就加以创建;再通过该连接的sendRpcRequest()发送RPC请求。

我们不妨先看一下Connection类的构造函数:


      Connection.Connection(ConnectionId remoteId, int serviceClass)

      > this.remoteId=remoteId

      > this.server=remoteId.getAddress()

      > this.rpcTimeout=remoteId.getRpcTimeout()

      > this.maxIdleTime=remoteId.getMaxIdleTime()

      > this.connectionRetryPolicy=remoteId.connectionRetryPolicy

      > this.maxRetriesOnSasl=remoteId.getMaxRetriesOnSasl()

                              //采用 Sasl 加密时允许失败的次数

      > this.maxRetriesOnSocketTimeouts=remoteId.getMaxRetriesOnSocketTimeouts()

      > this.tcpNoDelay=remoteId.getTcpNoDelay()

      > this.doPing=remoteId.getDoPing() //需不需要定时发送Ping报文

      > if (doPing){ //construct a RPC header with the callId as the ping called

                      //如果需要就预先准备好一个用于Ping的报头备用

      >+ pingRequest=new ByteArrayOutputStream() //分配缓冲区,以供构筑Ping报头

      >+ RpcRequestHeaderProto pingHeader=

              ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,

                            OperationProto.RPC_FINAL_PACKET, PING_CALL_ID,


                            RpcConstants.INVALID_RETRY_COUNT, cl ientId)

      >+ pingHeader.writeDelimitedTo(pingRequest)

      > }

      > this.pingInterval=remoteId.getPingInterval() //Ping的时间间隔

      > this.serviceClass=serviceClass

      > UserGroupInformation ticket=remoteId.getTicket() //对方所要求的门票”

            //try SASL if security is enabled or if the ugi contains tokens.

            //this causes a SIMPLE client with tokens to attempt SASL

      > boolean trySasl=UserGroupInformation.isSecurityEnabled()||

                                (ticket! =null&&! ticket.getTokens().isEmpty())

                                  //表示需不需要试试 Sasl 加密

      > this.authProtocol=trySasl ? AuthProtocol.SASL:AuthProtocol.NONE;

                                  //如果需要就是AuthProtocol.SASL

      > this.setDaemon(true) //将线程设置成Daemon,关闭标准输入输出通道

注意,这只是创建Connection对象(虽然是个线程)时的构造函数,基本上都是类似于初始化的操作,尚未真正建立与Server方的连接。这里所设置的那些变量的名称都比较能说明问题,就不需要过多解释了。将来,建立起与Server方的连接之后,可能是需要定时发送Ping报文的,如果需要就预先准备好一个用于Ping的报头备用。

所得到的Connection,不管是connections中原已存在的,还是新创建的,总之都要经过setupIOstreams()这一步。不过,若是原来已经建立的连接就马上可以返回了;若是新创建的Connection就得建立起与Server方的连接。注意,这里Connection指一个对象,这是一个线程,而“连接”则指与Server方建立的网络连接。


      [Client.call()> Connection.getConnection()> setupIOstreams()]




      Connection.setupIOstreams(AtomicBoolean fallbackToSimpleAuth)

      > if (socket! =null||shouldCloseConnection.get())return //Server的连接业已建立

      > while (true){ //新创的Connection需要建立与 Server方的连接

      >+ setupConnection()

      >+> while (true){

      >+>+ this.socket=socketFactory.createSocket()

      >+>+ UserGroupInformation ticket=remoteId.getTicket()

      >+>+ if (ticket! =null&&ticket.hasKerberosCredentials()){

      >+>++ … //

      >+>+ }

      >+>+ NetUtils.connect(this.socket, server, connectionTimeout)//Server建立网络连接

      >+>+ return

      >+> } //end while (true) //如果在此过程中发生异常而失败,就循环再试

      >+ InputStream inStream=NetUtils.getInputStream(socket) //建立基于 socket的输入流

      >+ OutputStream outStream=NetUtils.getOutputStream(socket)//建立基于 socket的输出流


      >+ writeConnectionHeader(outStream) //写这个输出流,就是通过 socket发送报头

      >+> DataOutputStream out=new DataOutputStream(new BufferedOutputStream(outStream))

      >+> out.write(RpcConstants.HEADER.array())       //4个字节:“hrpc”

      >+> out.write(RpcConstants.CURRENT_VERSION)      //1 个字节的版本号

      >+> out.write(serviceClass)                   //1 个字节的服务类别

      >+> out.write(authProtocol.callId) //1个字节的身份认证规程,NONE(0)SASL(-33)

      >+ if (authProtocol==AuthProtocol.SASL){

      >++ … //主旋律无关,有兴趣或需要的读者请自行阅读

      >+ }

      >+ if (doPing)inStream=new PingInputStream(inStream) //在输入流上增加对Ping的处理

      >+ this.in=new DataInputStream(new BufferedInputStream(inStream))

          //把基于 socket的原始输入流进一步改造成带缓冲的数据输入流,隐藏 socket的本性

      >+ if (! (outStream instanceof BufferedOutputStream)){

      >++ outStream=new BufferedOutputStream(outStream) //让输出流也带上缓冲

      >+ }

      >+ this.out=new DataOutputStream(outStream) //使输出流也变成数据输出流

      >+ writeConnectionContext(remoteId, authMethod) //

      >+> IpcConnectionContextProto message=ProtoUtil.makeIpcConnectionContext(

            RPC.getProtocolName(remoteId.getProtocol()), remoteId.getTicket(), authMethod)

      >+> RpcRequestHeaderProto connectionContextHeader=

                ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,

                                OperationProto.RPC_FINAL_PACKET,

                                CONNECTION_CONTEXT_CALL_ID,

                                RpcConstants.INVALID_RETRY_COUNT, cl ientId)

      >+> RpcRequestMessageWrapper request=

                          new RpcRequestMessageWrapper(connectionContextHeader, message)

                                                              //创建请求报文request

      >+> out.writeInt(request.getLength()) //Write out the packet length

      >+> request.write(out)

      >+ touch()

      >+ start()  //启动这个Connection线程

      >+ return

      > }

这个setupIOstreams(),其实就是专为新创Connection线程而设的,因为凡是原已存在的Connection都已经有了socket,所以一进来就返回了。而新创建的Connection线程,则通过setupConnection()创建socket并与Server方建立网络连接。建立起网络连接之后,还要以socket为基础建立起抽象程度更高的、带缓冲的数据输入流和输出流。比方说,有了DataOutputStream就可以直接往里面写整数,而无须关心字节的次序怎么排、怎么打成IP包之类的这些问题。然后,就通过这个输出流向Server方发送连接请求报文。注意,前面通过connect()建立的只是传输层的连接,现在要建立的则是RPC层即应用层的连接,是Client对象与Server对象的连接,这是不同层次上的连接。

所以,凡是新创建的Connection线程,都要先与Server建立RPC连接,具体就是向其发送连接请求。回头看看Server一方的代码摘要可以知道,得要对方接受了连接请求之后,双方才可以进行RPC请求的交互。发送了RPC连接请求,即ConnectionHeader加上ConnectionContext以后,程序就启动Connection线程的运行,使其进入run()函数:


      Connection.run()

      > while (waitForWork()){//wait here for work-read or close connection

                            //Connection线程的主循环

      >+ receiveRpcResponse()//接收来自对方的响应

      >+> int totalLen=in.readInt()

      >+> RpcResponseHeaderProto header=RpcResponseHeaderProto.parseDelimitedFrom(in)

      >+> checkResponse(header)

      >+> int headerLen=header.getSerializedSize()

      >+> headerLen+=CodedOutputStream.computeRawVarint32Size(headerLen)

      >+> int callId=header.getCallId()

      >+> Call call=calls.get(callId)

      >+> RpcStatusProto status=header.getStatus()

      >+> if (status==RpcStatusProto.SUCCESS){

      >+>+Writable value=ReflectionUtils.newInstance(valueClass, conf)

      >+>+ value.readFields(in);  //read value

      >+>+ calls.remove(callId)

      >+>+ call.setRpcResponse(value)

      >+>+ if (call.getRpcResponse()instanceof ProtobufRpcEngine.RpcWrapper){

      >+>++ ProtobufRpcEngine.RpcWrapper resWrapper=

                                (ProtobufRpcEngine.RpcWrapper)call.getRpcResponse()

      >+>+ }

      >+> }else {//Rpc Request failed

      >+>+ … //

      >+> }

      > } //end while

      > close()

显然,在Hadoop的RPC机制中,Connection线程只管接收来看Server的响应,不管发送。发送是想要进行RPC调用的那个线程自己的事。作为请求RPC的一方,Connection接收的只能是Server方的回应。注意,在通过readFields()从响应报文中读出时会涉及对象的去串行化。

回到前面Client.call()的摘要中,创建了Connection对象(线程)并与Server方建立连接之后,就通过其sendRpcRequest()要求发送RPC请求:


      [Client.call()> Connection.sendRpcRequest()]


      Connection.sendRpcRequest(Call call)

      > DataOutputBuffer d=new DataOutputBuffer() //准备好一块缓冲区

      > RpcRequestHeaderProto header=ProtoUtil.makeRpcRequestHeader(

                call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,

                cl ientId)

      > header.writeDelimitedTo(d) //RPC请求报头写入缓冲区d

      > call.rpcRequest.write(d)  //RPC请求的内容写入缓冲区d

      > runnable=new Runnable()  //创建一个Runnable对象(作为线程运行)

              ]run() //这个Runnablerun()函数

                > byte[]data=d.getData() //d就是上面准备好的缓冲区

                > int totalLength=d.getLength() //这是已经写入缓冲区的长度

                > out.writeInt(totalLength); //Total Length,把这个长度从输出通道out写出去

                > out.write(data,0, totalLength); //RpcRequestHeader+RpcRequest

                > out.flush()   //发送

                > senderFuture.get()

      > Future<? > senderFuture=sendParamsExecutor.submit(runnable)

              ==ExecutorService.submit(runnable)//提交给JVM,要求安排线程执行其run()函数

这里的out,是Connection线程在前面setupIOstreams()中创建的DataOutputStream,其基础是socket和网络连接,可以将其看成通向Server方的输出流,现在要通过它向对方发送RPC请求了。发送RPC请求比发送连接请求麻烦,因为这可能涉及一般对象的串行化。另一方面,发送的过程中也可能会因异常而要求重试,所以这里采用异步的方式,为通过输出流DataOutputStream发送的过程创建一个Runnable,并将其提交给Java虚拟机,让其安排线程加以执行。至于RPC的执行结果,则有Connection线程先加处理再予上交,请求RPC服务的线程可以在某个点上睡眠等待结果的到来。

严格说来,Server和Client二者还不能说是构成了完整的RPC层,实质上这只是一个IPC(进程间通信)层,或者说只是一种原始而粗糙的RPC底层,可以用来达到远程过程调用的目的,但是并不方便。比方说,我们确实可以通过Client.call()调用对方某类对象的某个方法,但是这在形式上跟在本地调用同类对象的同一方法有着很大的差异,这种差异要求我们必须了解许多底层的细节,甚至可能会影响我们的思维。另一方面,这样的程序几乎注定就是不易移植、不可重用(reuse)的。设想有一天把对方的这个某类对象移到了本地,那么程序中固定写死的那些对Client.call()的调用就都得要改成本地的调用了。所以,我们需要的是一种形式上与本地调用没有什么不同的RPC机制。如果我们需要调用某个远地的某类对象的某种方法,那么本地就应该有这对象的(当然是同类的)镜像,或者说“代理(proxy)”,只要调用本地的这个proxy的某个过程,就可以自动转化成对远地那个对象真身的过程调用。这样,一方面写程序的人再也不必关心许多细节(其实是把有关的工作集中到了少数几个人身上),思维上也可更自然通畅,而且即使有一天把这个某类对象移到了本地,绝大部分的程序也可不必更改,只要把proxy去掉就行了。这样的机制,才是我们想要的RPC机制。

还有个问题:需要被用作RPC目标的类及其方法可能是五花八门的,不同的类、不同的方法(函数),乃至不同的参数类型,最后实现具体调用时的程序代码也就有所不同(否则编译就通不过)。这也是为什么前面的Server只能是抽象类的原因,既然要在远地的服务端以不同的参数类型和序列对不同类的不同函数进行调用,它的call()函数怎么能固定得下来,怎么能有个统一的实现呢?这样看来,我们是否只好“一事一议”,针对需要进行远程调用的每一个函数都准备一个特别的call()。但是那样就增加了大量机械而枯燥无味又容易出错的工作,最好能有个软件工具来帮助生成这样的代码。还有个只适用于解释型语言的办法,就是让解释器在运行时根据每个函数的界面定义临时决定怎么调用,但是那样做一来太烦琐,容易出错,二来对运行效率也有影响。事实上,Java语言的Reflection机制,就是在一定程度上保留了解释型语言的特点,本质上是把一些编译时的决定和选择推迟到了运行时。Hadoop既然采用Java,这就也是个不坏的方案。当然Reflection机制不可滥用,否则就变成解释型语言了(Java语言从Java语句到虚拟机指令这一步是编译的,从虚拟机指令到物理CPU指令这一步则是解释的)。所以,这二者,即软件辅助生成代码和适度使用Reflection机制,可以互相补充,结合使用。

Hadoop在这方面的设计,应该就是出于这样的考虑,也是从2.0版开始,采用了ProtocolBuffer以及与之配套的方案。其实,上述Server和Client的设计就已经是与这个方案配套的了,其早期的RPC机制不是这样的。Hadoop的RPC机制在2.0版前后有很大的不同,下面所讲当然都是指2.0版及以后的Hadoop。

Hadoop的代码中定义了一个RPC类,用来帮助应用层构筑具体的RPC层设施,以达到上述这两方面的目标,即通过proxy进行RPC调用,以及软件辅助生成代码与Reflection机制的结合。我们先看RPC类的定义摘要:


      class RPC {}

      ]enum RpcKind {

          RPC_BUILTIN ((short)1),              //Used for built in calls by tests

          RPC_WRITABLE ((short)2), //Use WritableRpcEngine

          RPC_PROTOCOL_BUFFER ((short)3); //Use ProtobufRpcEngine

        }

      ]Map<Class<? >, RpcEngine> PROTOCOL_ENGINES

      ]interface RpcInvoker {}

      ]]call(Server server, String protocol, Writable rpcRequest, long receiveTime)

      ]setProtocolEngine(Configuration conf, Class<? > protocol, Class<? > engine)

        > conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class)

                      //ENGINE_PROP 就是“rpc.engine”

      ]getProtocolEngine(Class<? > protocol, Configuration conf)

        //获取或创建具体的ProtocolEngine,ProtobufRpcEngineWritableRpcEngine两种

      --------------以下用于客户端的创建---------------

      ]getProxy(Class<T> protocol, long clientVersion,

                          InetSocketAddress addr, Configuration conf, SocketFactory factory)

        > return getProtocolProxy(protocol, clientVersion, addr, conf, factory).getProxy()

        >> if (UserGroupInformation.isSecurityEnabled()){

              SaslRpcServer.init(conf);


        >> }

        >> pe=getProtocolEngine(protocol, conf)

        >> pe.getProxy(…)==ProtobufRpcEngine.getProxy(Class<T> protocol, long clientVersion,

                            InetSocketAddress addr, UserGroupInformation ticket,

                            Configuration conf, SocketFactory factory, int rpcTimeout,

                            RetryPolicy connectionRetryPolicy,

                            AtomicBoolean fallbackToSimpleAuth)

          >>> Invoker invoker=new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout,

                connectionRetryPolicy, fallbackToSimpleAuth) //ProtobufRpcEngine.invoker

          >>> p=Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol}, invoker)

          >>> new ProtocolProxy<T>(protocol, p, false)  //创建ProtocolProxy对象

      --------------以下用于服务端的创建---------------

      ]class Builder {} //Class to construct instances of RPC server with specific options.

      ]]setProtocol(Class<? > protocol)

      ]]setInstance(Obj ect instance)

      ]]build()

        > return getProtocolEngine(this.protocol, this.conf).getServer(

                  this.protocol, this.instance, this.bindAddress, this.port,

                  this.numHandlers, this.numReaders, this.queueSizePerHandler,

                  this.verbose, this.conf, this.secretManager, this.portRangeConfig)

      ]abstract static class Server extends org.apache.hadoop.ipc.Server{}

先要对上述摘要做些解释和说明。

首先这里引入了protocol的概念,所谓protocol,当然是Client/Server之间的规程和协议。但请注意,这是RPC层的协议,不要与传输层的协议如UDP、TCP之类混淆。如前所述, RPC所针对的类和函数可谓五花八门,调用界面各不相同,可是大的分类总还是有的。例如与提交作业和分配资源有关的函数就各有不少,于是我们就可以说,提交作业有提交作业的protocol,分配资源有分配资源的protocol。类似地,NM节点与RM节点之间有protocol, NM节点互相之间又是另一种protocol。在实践中,采用不同protocol的软件一般是由不同的人员开发的。前面说过,最好是远地有个什么类的对象、本地就有这个什么类对象的镜像即proxy;这样当然是好,但是也失之繁杂,因为数量巨大。折中的办法是以protocol为单位,让每个protocol都有自己的Server和proxy。所以,当我们说要创建一个Server(或proxy)时,首先要明确这是要创建哪一种protocol的Server(或proxy),需要有对于此种protocol的定义和说明,这样才能生成对于其中各个函数的调用(代码),并且有什么样的Server就有什么样的proxy。所以,对于RPC机制的实现,protocol是要素。

然后是RpcEngine和ProtocolEngine的概念。什么是RpcEngine呢?RpcEngine是Hadoop代码中定义的一个界面:


      interface RpcEngine {}

      ]ProtocolProxy<T> getProxy(Class<T> protocol, …)  //返回一个proxy对象

      ]RPC.Server getServer(Class<? > protocol, …)        //返回一个RPC.Server对象


      ]getProtocolMetaInfoProxy(ConnectionId connId, Configuration conf, SocketFactory factory)

注意,getProxy()的返回类型是ProtocolProxy<T>,即某种类型的ProtocolProxy。而getServer()的返回类型则是RPC.Server,即对于RPC.Server的某种落实了它的call()函数的扩充(见前述),总之是某种类型的RPC.Server对象。

而ProtocolEngine,则是作为工作“引擎”具体实现某种RpcKind的RPC机制并实现了RpcEngine界面的类。在Hadoop中这样的引擎有两种:


      class ProtobufRpcEngine implements RpcEngine {}

      class WritableRpcEngine implements RpcEngine {}

就是说,这两种引擎都可以生成RPC.Server和Proxy。那区别何在呢?前者是基于Protobuf的,后者是基于Writable的。前者不用解释了,那当然就是Google的Protobuf。后者所谓的Writable,是Hadoop代码中定义的一个界面:


      interface Writable {}

      ]write(DataOutput out)

      ]readFields(DataInput in)

这个界面只提供write()和readFields()这两种操作,实际上就是任意格式的字节串。相较于Protobuf,这似乎是无格式的、原始形式的数据,但是这里面实际上是存在着某种格式的,所以就需要有运行时的解析(Parsing)。

其实还可以有第三种,那就是前面所述的底层Server和Client(当然要补上Server.call()的代码)。

为了区分具体引擎,从而分清具体RPC请求的类型(Server与Client即proxy必须配套), RPC类中定义了一个枚举类型RpcKind,取值范围为RPC_BUILTIN、RPC_WRITABLE和RPC_PROTOCOL_BUFFER。其中对RPC_BUILTIN的注释说是为测试用,但是实际上恐怕也不仅仅是测试,凡是底层的、不纳入具体protocol的RPC调用都得用RPC_BUILTIN。

现在,Hadoop代码中定义的这个RPC类,其设计意图应该比较容易理解了。其中setProtocolEngine()和getProtocolEngine()的意义自明,其余就是用来生成或获取(如果已经生成)针对具体protocol的Server和Proxy。

对于某个具体的protocol,要为其创建Server对象时,就直接或间接调用定义于RPC类内部的Builder.build();要为其创建Proxy对象时就调用RPC类的getProxy()。而Builder.build(),实际上就是getProtocolEngine().getServer()。

用RPC.getProxy()创建Proxy的实例在Hadoop的代码中比比皆是。随便抓一个,比方说LocalizationProtocol的Proxy:


      public LocalizationProtocolPBClientImpl(long clientVersion,

                        InetSocketAddress addr, Configuration conf)throws IOException {

        RPC.setProtocolEngine(conf, LocalizationProtocolPB.class, ProtobufRpcEngine.class);

        proxy=(LocalizationProtocolPB)RPC.getProxy(

                      LocalizationProtocolPB.class, clientVersion, addr, conf);


      }

创建Server的操作则要多样化一些,并且往往也不是那么直截了当的。我们且以ResouceManager节点上的ApplicationMasterService为例,在其初始化阶段的serviceStart()中是这样创建其Server的:


      ApplicationMasterService.serviceStart()

      > YarnRPC rpc=YarnRPC.create(conf) //YarnRPC是抽象类,落实为HadoopYarnProtoRPC

      > …

      > this.server=rpc.getServer(ApplicationMasterProtocol.class,

                              this, masterServiceAddress, serverConf, …)

                  //ApplicationMasterService.server的类型就是org.apache.hadoop.ipc.Server

                  //所以一定是直接或间接扩展了这个 Server的某类对象

          ==HadoopYarnProtoRPC.getServer(ApplicationMasterProtocol.class, this, …)

      >> …

      >> return RpcFactoryProvider.getServerFactory(conf).getServer(protocol,

                instance, addr, conf, secretManager, numHandlers, portRangeConfig)

                    //RpcFactoryProvider.getServerFactory(conf)返回具体的RpcServerFactory

      >>> RpcServerFactoryPBImpl.getServer()

      >>>> …

      >>>> RpcServerFactoryPBImpl.createServer(pbProtocol, addr, conf, secretManager,

                numHandlers, (BlockingService)method.invoke(null, service), portRangeConfig)

      >>>>> RPC.setProtocolEngine(conf, pbProtocol, ProtobufRpcEngine.class)

      >>>>> RPC.Server server=new RPC.Builder(conf).setProtocol(pbProtocol)

                        .setInstance(blockingService).setBindAddress(addr.getHostName())

                        .setPort(addr.getPort()).setNumHandlers(numHandlers).setVerbose(false)

                        .setSecretManager(secretManager).setPortRangeConfig(portRangeConfig)

                        .build()

              ==RPC.Builder.build()  /创建RPC.Server对象

顺着程序中的逐层调用,读者自不难明白,这ApplicationMasterService对象的Server,尽管多层辗转,最后还是靠RPC.Builder.build()创建的。这里setInstance()、setPort()等都是RPC.Builder提供的方法函数,只是上面的摘要中没有列出,读者可以直接查看源代码。

再如HDFS子系统中的NameNodeRpcServer,就是直接调用RPC.Builder.build()创建的:


      NameNode.createRpcServer(Configuration conf)

      > return new NameNodeRpcServer(conf, this)

      >> NameNodeRpcServer(conf, this)

      >>> …

      >>> this.serviceRpcServer=new RPC.Builder(conf).setProtocol(

                        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)

                        .setInstance(clientNNPbService)…build()

读者自然要问,这两个例子中创建的两个Server,它们的call()函数又是什么样的呢?前面的RPC.Server是抽象类,为什么这里就可以创建实体的对象呢?

我们先回到RPC的摘要,里面还定义了一个界面RpcInvoker。这个界面上只定义了一个函数,那就是call()。

对这个界面的实现是在ProtobufRpcEngine和WritableRpcEngine中。


      class WritableRpcInvoker implements RpcInvoker {}  //WritableRpcEngine

      class ProtoBufRpcInvoker implements RpcInvoker {}  //ProtobufRpcEngine

读者想必还记得前面讲述为什么要把RPC.Server定义成抽象类的原因,那里说的rpcKind就是一个RpcKind枚举值,RpcInvoker可以是这二者之一,而RPC.Server的call()函数会落实到WritableRpcInvoker.call()或ProtoBufRpcInvoker.call()。

先看WritableRpcInvoker的call()是怎么实现的:


      WritableRpcEngine.WritableRpcInvoker.call(org.apache.hadoop.ipc.RPC.Server server,

          String protocolName, Writable rpcRequest, long receivedTime)

      > Invocation call=(Invocation)rpcRequest

      > if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())){

      >+ VerProtocolImpl highest=server.getHighestSupportedProtocol(

                                        RPC.RpcKind.RPC_WRITABLE, protocolName)

      >+ protocolImpl=highest.protocolTarget

      > }else {

      >+ …

      > }

      >Method method=protocolImpl.protocolClass.getMethod(

                                  call.getMethodName(), call.getParameterClasses())

      > method.setAccessible(true)

      > Object value=method.invoke(protocolImpl.protocolImpl, call.getParameters())

      > return new Obj ectWritable(method.getReturnType(), value)

这里的关键是在作为目标的类protocolClass中,可以根据所调用方法的名称和参数的类型序列,通过getMethod()找到这个方法method,再通过method.invoke()加以调用。这就是Java语言的Reflection机制赋予程序员的手段。当然,程序员须为每个Protocol提供一个protocolImpl,实现该Protocol中定义的所有方法函数,RPC调用时以这些函数为跳板,先进入这些函数(之一),再设法在这些函数中转入真正的目标函数。如前所述,如果用人工编程的话,这将是一份机械而枯燥无味的工作还容易有错。

可见,这里所利用的就是Java语言的Reflection机制。一般的编译型语言,例如C,是没有这种机制的。给你一个对象,即数据结构,要在运行时(而不是编译时)按函数名从中找到某个函数指针加以调用,或在运行时按字段名从数据结构中找到一个字段,在C语言中是没有这种机制的。当然你自己也可以设法实现,但是那就相当于给C也添上了Reflection机制,这可不是易事(C后裔之一的Go语言就实现了这种机制)。事实上,如果用C语言实现类似功能的话,多半会在Server一方安排一个函数跳转表,即函数指针数组,Client一方则在编译时将函数名转换成数组下标,即目标函数的序号。这样效率当然很高,但是灵活性会差一些。

再看ProtoBufRpcInvoker的call()是怎么实现的:


      ProtobufRpcEngine.ProtoBufRpcInvoker.call(RPC.Server server, String protocol,

                Writable writableRequest, long receiveTime)

      > RpcRequestWrapper request=(RpcRequestWrapper)writableRequest

      > RequestHeaderProto rpcRequest=request.requestHeader

      > String methodName=rpcRequest.getMethodName()  //目标方法的名称

      > String protoName=rpcRequest.getDeclaringClassProtocolName() //Protocol 的名称

      > long clientVersion=rpcRequest.getClientProtocolVersion() //Protocol 的版本号

      > ProtoClassProtoImpl protocolImpl=getProtocolImpl(server, protoName, clientVersion)

                                    //包含着对具体 Protocol 的实现,由ProtocolBuf提供

      > BlockingService service=(BlockingService)protocolImpl.protocolImpl

            //ProtocolBuf为该Protocol 提供的一个实现了BlockingService界面的某类对象

      >MethodDescriptor methodDescriptor=

                            service.getDescriptorForType().findMethodByName(methodName)

                                      //根据方法名从该类对象内部找到对目标函数的描述

      >Message prototype=service.getRequestPrototype(methodDescriptor)

      >Message param=

                  prototype.newBuilderForType().mergeFrom(request.theRequestRead).build()

                                                  //从调用请求request中恢复参数序列

      > result=service.callBlockingMethod(methodDescriptor, null, param) //调用这个方法的中介

      > return new RpcResponseWrapper(result)

显然,这里也是依靠Reflection。不同的是,这里还涉及由ProtocolBuf生成提供的底层模块。后面我们将看到,对于用ProtoBuf语言编写的每个Protocol,编译工具protoc会自动生成相关代码,为每个Protocol提供一个实现了BlockingService界面的无名类,这个类中有Protocol中定义的所有方法函数。

现在我们可以回到前面的两个问题,即ApplicationMasterService.server和NameNode的NameNodeRpcServer提供了什么样的call()函数。

要回答这个问题,我们需要仔细看一下RPC.Builder.build()是怎样实现的:


      RPC.Builder.build()

      > RpcEngine engine=getProtocolEngine(this.protocol, this.conf)

      >> RpcEngine engine=PROTOCOL_ENGINES.get(protocol) //也许已经在这个集合中

      >> if (engine==null){ //若没有,需要创建

      >>+ Class<? > impl=conf.getClass(ENGINE_PROP+"."+protocol.getName(),

                                                            WritableRpcEngine.class)

                    //这就是通过 setProtocolEngine()设置的RpcEngine∶ProtobufRpcEngine

      >>+ engine=(RpcEngine)ReflectionUtils.newInstance(impl, conf)

      >>+ PROTOCOL_ENGINES.put(protocol, engine) //放入PROTOCOL_ENGINES


      >> }

      >> return engine

      > return engine.getServer(

                  this.protocol, this.instance, this.bindAddress, this.port,

                  this.numHandlers, this.numReaders, this.queueSizePerHandler,

                  this.verbose, this.conf, this.secretManager, this.portRangeConfig)

            ==ProtobufRpcEngine.getServer(this.protocol, this.instance, this.bindAddress, …)

      >> return new Server(protocol, protocolImpl, conf, bindAddress, port,

                numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,

                portRangeConfig)//这是ProtobufRpcEngine.Server,它扩充了RPC.Server

RPC对象内部维持着一个已知Protocol,例如ApplicationMasterProtocol那样的集合PROTOCOL_ENGINES,根据具体Protocol的名称就可以从中找到它用的RpcEngine。如果这个集合中还没有,那就到配置块conf中查找,setProtocolEngine()就是用来设置这个配置项的。我们在前面看到,对这几个Protocol都设置了ProtobufRpcEngine,所以这里所调用的就是ProtobufRpcEngine.getServer(),而ProtobufRpcEngine.getServer()所做的事就是创建一个Server对象。既然这是在ProtobufRpcEngine对象内部,这里所说的Server当然是ProtobufRpcEngine.Server,此类对象的构造函数是这样的:


      class ProtobufRpcEngine.Server extends RPC.Server {}

      ]Server(Class<? > protocolClass, Object protocolImpl,

              Configuration conf, String bindAddress, int port, int numHandlers,

              int numReaders, int queueSizePerHandler, boolean verbose,

              SecretManager<? extends TokenIdentifier> secretManager,

              String portRangeConfig)

      > super(bindAddress, port, null, numHandlers, numReaders, queueSizePerHandler, conf,

                  classNameBase(protocolImpl.getClass().getName()),

                  secretManager, portRangeConfig)

      > registerProtocolAndImpl(RPC.RpcKind.RPC_PROTOCOL_BUFFER,

                      protocolClass, protocolImpl) //RPC登记

这里要执行super(),就是其父类的构造函数。ProtobufRpcEngine.Server是对RPC.Server的扩充,所以这里执行的是RPC.Server.Server()。我们在前面看到,RPC.Server是对最原始的那个Server的扩充,它实际上已经补上了缺失的call()函数,而且整个类中再没有什么抽象成分,然而却定义为abstract。这就使编译器把住了关,不让你用new操作创建RPC.Server,但在此直接调用其构造函数却是可以的。而且,既然ProtobufRpcEngine.Server扩充了RPC.Server,又没有定义自己的call(),那自然就继承了RPC.Server.call()。所以ProtobufRpcEngine.Server.call()就是RPC.Server.call()。我们再回顾一下它的摘要:


      RPC.Server.call(RPC.RpcKind rpcKind, String protocol, Writable rpcRequest, long receiveTime)

      > invoker=getRpcInvoker(rpcKind) //对于ProtobufRpcEngine,这是ProtoBufRpcInvoker

      >> RpcKindMapValue val=rpcKindMap.get(rpcKind)


      > return invoker.call(this, protocol, rpcRequest, receiveTime)

          ==ProtobufRpcEngine.ProtoBufRpcInvoker.call(this, protocol, rpcRequest, receiveTime)

这里形参rpcKind所对应的实参是RPC_PROTOCOL_BUFFER,所以invoker就是ProtoBufRpcInvoker。而ProtoBufRpcInvoker.call(),我们在上面已经看到,它依赖于ProtoBuf模块为具体Protocol提供的一个实现了BlockingService界面的对象。所以,再往下就是ProtoBuf怎样提供这个对象的问题了。

再次强调,这里创建的是ProtobufRpcEngine.Server,而不是RPC.Server,前者是对后者的扩充。