6.5 Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等
Spark 1.6推出了以RpcEnv、RPCEndpoint、RPCEndpointRef为核心的新型架构下的RPC通信方式,就目前的实现而言,其底层依旧是Akka。Akka是基于Actor的分布式消息通信系统,而在Spark 1.6中封装了Akka,提供更高层的Rpc实现,目的是移除对Akka的依赖,为扩展和自定义Rpc打下基础。
Spark 2.0版本中Rpc的变化情况如下。
SPARK-6280:从Spark中删除Akka systemName。
SPARK-7995:删除AkkaRpcEnv,并从Core的依赖中删除Akka。
SPARK-7997:删除开发人员api SparkEnv.actorSystem和AkkaUtils。
RpcEnv是一个抽象类abstract class,传入SparkConf。RPC环境中[RpcEndpoint]需要注册自己的名字[RpcEnv]来接收消息。[RpcEnv]将处理消息发送到[RpcEndpointRef]或远程节点,并提供给相应的[RpcEndpoint]。[RpcEnv]]未被捕获的异常,[RpcEnv]将使用[RpcCallContext.sendFailure]发送异常给发送者,如果没有这样的发送者,则记录日志NotSerializableException。
RpcEnv.scala的源码如下。
1. private[spark] abstract class RpcEnv(conf: SparkConf) { 2. 3. private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf) 4. ......
RpcCallContext.scala处理异常的方法包括reply、sendFailure、senderAddress,其中reply是给发送者发送一个信息。如果发送者是[RpcEndpoint],它的[RpcEndpoint.receive]将被调用。
其中,RpcCallContext的地址RpcAddress是一个case class,包括hostPort、toSparkURL等成员。
RpcAddress.scala的源码如下。
1. private[spark] case class RpcAddress(host: String, port: Int) { 2. def hostPort: String = host + ":" + port 3. /**返回一个字符串,该字符串的形式为:spark://host:port*/ 4. def toSparkURL: String = "spark://" + hostPort 5. override def toString: String = hostPort 6. }
RpcAddress伴生对象object RpcAddress属于包org.apache.spark.rpc,fromURIString方法从String中提取出RpcAddress;fromSparkURL方法也是从String中提取出RpcAddress。说明:case class RpcAddress通过伴生对象object RpcAddress的方法调用,case class RpcAddress也有自己的方法fromURIString、fromSparkURL,而且方法fromURIString、fromSparkURL的返回值也是RpcAddress。
伴生对象RpcAddress的源码如下。
1. private[spark] object RpcAddress { 2. /**返回[RpcAddress]为代表的uri */ 3. def fromURIString(uri: String): RpcAddress = { 4. val uriObj = new java.net.URI(uri) 5. RpcAddress(uriObj.getHost, uriObj.getPort) 6. } 7. /**返回[RpcAddress],编码的形式:spark://host:port */ 8. def fromSparkURL(sparkUrl: String): RpcAddress = { 9. val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) 10. RpcAddress(host, port) 11. } 12. }
RpcEnv解析:
(1)RpcEnv是RPC的环境(相当于Akka中的ActorSystem),所有的RPCEndpoint都需要注册到RpcEnv实例对象中(注册的时候会指定注册的名称,这样客户端就可以通过名称查询到RpcEndpoint的RpcEndpointRef引用,从而进行通信),在RpcEndpoint接收到消息后会调用receive方法进行处理。
(2)RpcEndpoint如果接收到需要reply的消息,就会交给自己的receiveAndReply来处理(回复时是通过RpcCallContext中的relpy方法来回复发送者的),如果不需要reply,就交给receive方法来处理。
(3)RpcEnvFactory是负责创建RpcEnv的,通过create方法创建RpcEnv实例对象,默认用Netty。
RpcEnv示意图如图6-4所示。
图6-4 RPCEnv示意图
回到RpcEnv.scala的源码,首先调用RpcUtils.lookupRpcTimeout(conf),返回RPC远程端点查找时默认Spark的超时时间。方法lookupRpcTimeout中构建了一个RpcTimeout,定义spark.rpc.lookupTimeout。spark.network.timeout的超时时间是120s。
RpcUtils.scala的lookupRpcTimeout方法的源码如下。
1. def lookupRpcTimeout(conf: SparkConf): RpcTimeout = { 2. RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network .timeout"), "120s") 3. }
进入RpcTimeout,进行RpcTimeout关联超时的原因描述,当TimeoutException发生的时候,关于超时的额外的上下文将包含在异常消息中。
RpcTimeout.scala的源码如下。
1. private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String) 2. extends Serializable { 3. 4. /**修正TimeoutException标准的消息包括描述 */ 5. private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = { 6. new RpcTimeoutException(te.getMessage + ". This timeout is controlled by " + timeoutProp, te) 7. }
其中的RpcTimeoutException继承自TimeoutException。
1. private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException) 2. extends TimeoutException(message) { initCause(cause) }
其中的TimeoutException继承自Exception。
1. public class TimeoutException extends Exception { 2. ...... 3. public TimeoutException(String message) { 4. super(message); 5. } 6. }
回到RpcTimeout.scala,其中的addMessageIfTimeout方法,如果出现超时,将加入这些信息。
RpcTimeout.scala的addMessageIfTimeout的源码如下。
1. def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = { 2. //异常已被转换为一个RpcTimeoutException,就抛出它 3. case rte: RpcTimeoutException => throw rte 4. //其他TimeoutException异常转换为修改的消息RpcTimeoutException 5. case te: TimeoutException => throw createRpcTimeoutException(te) 6. }
RpcTimeout.scala中的awaitResult方法比较关键:awaitResult一直等结果完成并获得结果,如果在指定的时间没有返回结果,就抛出异常[RpcTimeoutException]。
Spark 2.1.1版本的RpcTimeout.scala的源码如下。
1. def awaitResult[T](future: Future[T]): T = { 2. val wrapAndRethrow: PartialFunction[Throwable, T] = { 3. case NonFatal(t) => 4. throw new SparkException("Exception thrown in awaitResult", t) 5. } 6. try { 7. //scalastyle:关闭awaitresult 8. Await.result(future, duration) 9. //scalastyle:打开awaitresult 10. } catch addMessageIfTimeout.orElse(wrapAndRethrow) 11. } 12. }
Spark 2.2.0版本的RpcTimeout.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第2~10行整体被替换,调整为调用ThreadUtils.awaitResult(future, duration)。
1. /** 2. * 等待完成的结果并返回结果。如果结果不在这个超时 timeout 范围内,就抛出一个异常 * [RpcTimeoutException]表示配置控制超时 3. * 4. * @param future `Future` 将被等待 5. * @throws RpcTimeoutException如果在等待指定的时间future还没准备好 6. */ 7. def awaitResult[T](future: Future[T]): T = { 8. try { 9. ThreadUtils.awaitResult(future, duration) 10. } catch addMessageIfTimeout 11. } 12. }
其中的future是Future[T]类型,继承自Awaitable。
1. trait Future[+T] extends Awaitable[T]
Awaitable是一个trait,其中的ready方法是指Duration时间片内,Awaitable的状态变成completed状态,就是ready。在Await.result中,result本身是阻塞的。
Awaitable.scala的源码如下。
1. trait Awaitable[+T] { 2. ...... 3. def ready(atMost: Duration)(implicit permit: CanAwait): this.type 4. ...... 5. @throws(classOf[Exception]) 6. def result(atMost: Duration)(implicit permit: CanAwait): T 7. } 8.
回到RpcEnv.scala中,其中endpointRef方法返回我们注册的RpcEndpoint的引用,是代理的模式。我们要使用RpcEndpoint,是通过RpcEndpointRef来使用的。Address方法是RpcEnv监听的地址;setupEndpoint方法注册时根据RpcEndpoint名称返回RpcEndpointRef。fileServer返回用于服务文件的文件服务器实例。如果RpcEnv不以服务器模式运行,可能是null值。
RpcEnv.scala的源码如下。
1. private[spark] abstract class RpcEnv(conf: SparkConf) { 2. 3. private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout (conf) 4. ...... 5. private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef 6. def address: RpcAddress 7. def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef 8. ....... 9. def fileServer: RpcEnvFileServer 10. ......
RpcEnv.scala中的RpcEnvFileServer方法中的RpcEnvConfig是一个case class。RpcEnvFileServer的源码如下。
1. private[spark] trait RpcEnvFileServer { 2. def addFile(file: File): String 3. ...... 4. private[spark] case class RpcEnvConfig( 5. conf: SparkConf, 6. name: String, 7. bindAddress: String, 8. advertiseAddress: String, 9. port: Int, 10. securityManager: SecurityManager, 11. clientMode: Boolean)
RpcEnv是一个抽象类,其具体的子类是NettyRpcEnv。Spark 1.6版本中包括AkkaRpcEnv和NettyRpcEnv两种方式。Spark 2.0版本中只有NettyRpcEnv。
下面看一下RpcEnvFactory。RpcEnvFactory是一个工厂类,创建[RpcEnv],必须有一个空构造函数,以便可以使用反射创建。create根据具体的配置,反射出具体的实例对象。RpcEndpoint方法中定义了receiveAndReply方法和receive方法。
RpcEndpoint.scala的源码如下。
1. private[spark] trait RpcEnvFactory { 2. 3. def create(config: RpcEnvConfig): RpcEnv 4. } 5. private[spark] trait RpcEndpoint { 6. ...... 7. val rpcEnv: RpcEnv 8. 9. ...... 10. final def self: RpcEndpointRef = { 11. require(rpcEnv != null, "rpcEnv has not been initialized") 12. rpcEnv.endpointRef(this) 13. } 14. ....... 15. 16. def receive: PartialFunction[Any, Unit] = { 17. case _ => throw new SparkException(self + " does not implement 'receive'") 18. } 19. ...... 20. def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 21. case _ => context.sendFailure(new SparkException(self + " won't reply anything")) 22. } 23. ......
Master继承自ThreadSafeRpcEndpoint,接收消息使用receive方法和receiveAndReply方法。
其中,ThreadSafeRpcEndpoint继承自RpcEndpoint:ThreadSafeRpcEndpoint是一个trait,需要RpcEnv线程安全地发送消息给它。线程安全是指在处理下一个消息之前通过同样的[ThreadSafeRpcEndpoint]处理一条消息。换句话说,改变[ThreadSafeRpcEndpoint]的内部字段在处理下一个消息是可见的,[ThreadSafeRpcEndpoint]的字段不需要volatile或equivalent,不能保证对于不同的消息在相同的[ThreadSafeRpcEndpoint]线程中来处理。
1. private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
回到RpcEndpoint.scala,重点看一下receiveAndReply方法和receive方法。receive方法处理从[RpcEndpointRef.send]或者[RpcCallContext.reply]发过来的消息,如果收到一个不匹配的消息,[SparkException]会抛出一个异常onError。receiveAndReply方法处理从[RpcEndpointRef.ask]发过来的消息,如果收到一个不匹配的消息,[SparkException]会抛出一个异常onError。receiveAndReply方法返回PartialFunction对象。
RpcEndpoint.scala的源码如下。
1. def receive: PartialFunction[Any, Unit] = { 2. case _ => throw new SparkException(self + " does not implement 'receive'") 3. } 4. 5. ...... 6. def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 7. case _ => context.sendFailure(new SparkException(self + " won't reply anything")) 8. }
在Master中,Receive方法中收到消息以后,不需要回复对方。
Master.scala的Receive方法的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case ElectedLeader => 3. ..... 4. recoveryCompletionTask = forwardMessageThread.schedule(new Runnable { 5. override def run(): Unit = Utils.tryLogNonFatalError { 6. self.send(CompleteRecovery) 7. } 8. }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS) 9. } 10. 11. case CompleteRecovery => completeRecovery() 12. 13. 14. case RevokedLeadership => 15. logError("Leadership has been revoked -- master shutting down.") 16. System.exit(0) 17. 18. case RegisterApplication(description, driver) => 19. ...... 20. schedule() 21.
在Master中,receiveAndReply方法中收到消息以后,都要通过context.reply回复对方。
在Master中,RpcEndpoint如果接收到需要reply的消息,就会交给自己的receiveAndReply来处理(回复时是通过RpcCallContext中的relpy方法来回复发送者的),如果不需要reply,就交给receive方法来处理。
RpcCallContext的源码如下。
1. private[spark] trait RpcCallContext { 2. 3. /** *回复消息的发送者。如果发送者是[RpcEndpoint],其[RpcEndpoint.receive] *将被调用 5. */ 6. def reply(response: Any): Unit 7. 8. /** *向发送方报告故障 9. */ 10. def sendFailure(e: Throwable): Unit 11. 12. /** *此消息的发送者 13. */ 14. def senderAddress: RpcAddress 15. }
回到RpcEndpoint.scala,RpcEnvFactory是一个trait,负责创建RpcEnv,通过create方法创建RpcEnv实例对象,默认用Netty。
RpcEndpoint.scala的源码如下。
1. private[spark] trait RpcEnvFactory { 2. 3. def create(config: RpcEnvConfig): RpcEnv 4. }
RpcEnvFactory的create方法没有具体的实现。下面看一下RpcEnvFactory子类NettyRpcEnvFactory中create的具体实现,使用的方式为nettyEnv。
NettyRpcEnv.scala的create方法的源码如下。
1. def create(config: RpcEnvConfig): RpcEnv = { 2. val sparkConf = config.conf 3. //在多个线程中使用JavaSerializerInstance 是安全的。然而,如果将来计划支持 //KryoSerializer,必须使用ThreadLocal来存储SerializerInstance 4. 5. val javaSerializerInstance = 6. new JavaSerializer(sparkConf).newInstance().asInstanceOf [JavaSerializerInstance] 7. val nettyEnv = 8. new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, 9. config.securityManager) 10. if (!config.clientMode) { 11. val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => 12. nettyEnv.startServer(config.bindAddress, actualPort) 13. (nettyEnv, nettyEnv.address.port) 14. } 15. try { 16. Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 17. } catch { 18. case NonFatal(e) => 19. nettyEnv.shutdown() 20. throw e 21. } 22. } 23. nettyEnv 24. } 25. }
在Spark 2.0版本中回溯一下NettyRpcEnv的实例化过程。在SparkContext实例化时调用createSparkEnv方法。
SparkContext.scala的源码如下。
1. ...... 2. _env = createSparkEnv(_conf, isLocal, listenerBus) 3. SparkEnv.set(_env) 4. ...... 5. 6. private[spark] def createSparkEnv( 7. conf: SparkConf, 8. isLocal: Boolean, 9. listenerBus: LiveListenerBus): SparkEnv = { 10. SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext. numDriverCores(master)) 11. } 12. 13. .....
SparkContext的createSparkEnv方法中调用了SparkEnv.createDriverEnv方法。下面看一下createDriverEnv方法的实现,其调用了create方法。
SparkEnv.scala的createDriverEnv的源码如下。
1. private[spark] def createDriverEnv( 2. ....... 3. create( 4. conf, 5. SparkContext.DRIVER_IDENTIFIER, 6. bindAddress, 7. advertiseAddress, 8. port, 9. isLocal, 10. numCores, 11. ioEncryptionKey, 12. listenerBus = listenerBus, 13. mockOutputCommitCoordinator = mockOutputCommitCoordinator 14. ) 15. } 16. 17. private def create( 18. ........ 19. val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf, 20. securityManager, clientMode = !isDriver) 21. ......
在RpcEnv.scala中,creat方法直接调用new()函数创建一个NettyRpcEnvFactory,调用NettyRpcEnvFactory().create方法,NettyRpcEnvFactory继承自RpcEnvFactory。在Spark 2.0中,RpcEnvFactory直接使用NettyRpcEnvFactory的方式。
RpcEnv.scala的源码如下。
1. private[spark] object RpcEnv { 2. ....... 3. 4. def create( 5. name: String, 6. bindAddress: String, 7. advertiseAddress: String, 8. port: Int, 9. conf: SparkConf, 10. securityManager: SecurityManager, 11. clientMode: Boolean): RpcEnv = { 12. val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager, 13. clientMode) 14. new NettyRpcEnvFactory().create(config) 15. }
NettyRpcEnvFactory().create的方法如下。
NettyRpcEnv.scala的源码如下。
1. private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { 2. 3. def create(config: RpcEnvConfig): RpcEnv = { 4. ...... 5. val nettyEnv = 6. new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, 7. config.securityManager) 8. if (!config.clientMode) { 9. val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => 10. nettyEnv.startServer(config.bindAddress, actualPort) 11. (nettyEnv, nettyEnv.address.port) 12. } 13. try { 14. Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 15. } catch { 16. case NonFatal(e) => 17. nettyEnv.shutdown() 18. throw e 19. } 20. } 21. nettyEnv 22. } 23. }
在NettyRpcEnvFactory().create中调用new()函数创建一个NettyRpcEnv。NettyRpcEnv传入SparkConf参数,包括fileServer、startServer等方法。
NettyRpcEnv的源码如下。
1. private[netty] class NettyRpcEnv( 2. val conf: SparkConf, 3. javaSerializerInstance: JavaSerializerInstance, 4. host: String, 5. securityManager: SecurityManager) extends RpcEnv(conf) with Logging { 6. 7. ...... 8. override def fileServer: RpcEnvFileServer = streamManager 9. ...... 10. def startServer(bindAddress: String, port: Int): Unit = { 11. val bootstraps: java.util.List[TransportServerBootstrap] = 12. if (securityManager.isAuthenticationEnabled()) { 13. java.util.Arrays.asList(new SaslServerBootstrap(transportConf, securityManager)) 14. } else { 15. java.util.Collections.emptyList() 16. } 17. server = transportContext.createServer(bindAddress, port, bootstraps) 18. dispatcher.registerRpcEndpoint( 19. RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher)) 20. }
NettyRpcEnv.scala的startServer中,通过transportContext.createServer创建Server,使用dispatcher.registerRpcEndpoint方法dispatcher注册RpcEndpoint。在createServer方法中调用new()函数创建一个TransportServer。
TransportContext的createServer方法的源码如下。
1. public TransportServer createServer( 2. String host, int port, List<TransportServerBootstrap> bootstraps) { 3. return new TransportServer(this, host, port, rpcHandler, bootstraps); 4. }
TransportServer.java的源码如下。
1. public TransportServer( 2. TransportContext context, 3. String hostToBind, 4. int portToBind, 5. RpcHandler appRpcHandler, 6. List<TransportServerBootstrap> bootstraps) { 7. this.context = context; 8. this.conf = context.getConf(); 9. this.appRpcHandler = appRpcHandler; 10. this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull (bootstraps)); 11. 12. try { 13. init(hostToBind, portToBind); 14. } catch (RuntimeException e) { 15. JavaUtils.closeQuietly(this); 16. throw e; 17. } 18. }
TransportServer.java中的关键方法是init,这是Netty本身的实现内容。
TransportServer.java中的init的源码如下。
1. private void init(String hostToBind, int portToBind) { 2. 3. IOMode ioMode = IOMode.valueOf(conf.ioMode()); 4. EventLoopGroup bossGroup = 5. NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); 6. EventLoopGroup workerGroup = bossGroup; 7. .......
接下来,我们看一下RpcEndpointRef。RpcEndpointRef是一个抽象类,是代理模式。
RpcEndpointRef.scala的源码如下。
1. private[spark] abstract class RpcEndpointRef(conf: SparkConf) 2. extends Serializable with Logging { 3. 4. private[this] val maxRetries = RpcUtils.numRetries(conf) 5. private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) 6. private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) 7. ...... 8. def send(message: Any): Unit 9. def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] 10. .....
NettyRpcEndpointRef是RpcEndpointRef的具体实现子类。ask方法通过调用nettyEnv.ask传递消息。RequestMessage是一个case class。
Spark 2.1.1版本的NettyRpcEnv.scala的NettyRpcEndpointRef的源码如下。
1. private[netty] class NettyRpcEndpointRef( 2. @transient private val conf: SparkConf, 3. endpointAddress: RpcEndpointAddress, 4. @transient @volatile private var nettyEnv: NettyRpcEnv) 5. extends RpcEndpointRef(conf) with Serializable with Logging { 6. ...... 7. override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = { 8. nettyEnv.ask(RequestMessage(nettyEnv.address, this, message), timeout) 9. } 10. ......
Spark 2.2.0版本的NettyRpcEnv.scala的NettyRpcEndpointRef的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第3行endpointAddress增加了private访问限制。
上段代码中第5行删掉了Serializable及Logging的继承。
1. private[netty] class NettyRpcEndpointRef( 2. @transient private val conf: SparkConf, 3. private val endpointAddress: RpcEndpointAddress, 4. @transient @volatile private var nettyEnv: NettyRpcEnv) extends RpcEndpointRef(conf) {
下面从实例的角度来看RPC的应用:
RpcEndpoint的生命周期:构造(constructor)->启动(onStart)、消息接收(receive、receiveAndReply )、停止(onStop)。
Master中接收消息的方式有两种:①receive接收消息不回复;②receiveAndReply通过context.reply的方式回复消息。例如,Worker发送Master的RegisterWorker消息,当Master注册成功,Master就返回Worker RegisteredWorker消息。
Worker启动时,从生命周期的角度,Worker实例化的时候提交Master进行注册。
Worker的onStart的源码如下。
1. override def onStart() { 2. ....... 3. registerWithMaster() 4. 5. metricsSystem.registerSource(workerSource) 6. metricsSystem.start() 7. //Attach the worker metrics servlet handler to the web ui after the //metrics system is started. 8. metricsSystem.getServletHandlers.foreach(webUi.attachHandler) 9. }
进入registerWithMaster方法:
Worker的registerWithMaster的源码如下。
1. private def registerWithMaster() { 2. ...... 3. registerMasterFutures = tryRegisterAllMasters() 4. ....
进入tryRegisterAllMasters方法:在rpcEnv.setupEndpointRef中根据masterAddress、ENDPOINT_NAME名称获取RpcEndpointRef。
Spark 2.1.1版本的Worker的tryRegisterAllMasters的源码如下。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. ...... 3. val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) 4. registerWithMaster(masterEndpoint) 5. ......
Spark 2.2.0版本的Worker的tryRegisterAllMasters的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第4行registerWithMaster方法调整为sendRegisterMessageToMaster方法。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. ...... 3. val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) 4. sendRegisterMessageToMaster(masterEndpoint) 5. ......
基于masterEndpoint,使用registerWithMaster方法注册。registerWithMaster方法中通过ask方法发送RegisterWorker消息,并要求发送返回结果,返回的消息类型为RegisterWorkerResponse。然后进行模式匹配,如果成功,就handleRegisterResponse。如果失败,就退出。
Spark 2.1.1版本的Worker.scala的registerWithMaster的源码如下。
1. private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { 2. masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( 3. workerId, host, port, self, cores, memory, workerWebUiUrl)) 4. .onComplete { 5. //这是一个非常快的行动,所以可以用ThreadUtils.sameThread 6. case Success(msg) => 7. Utils.tryLogNonFatalError { 8. handleRegisterResponse(msg) 9. } 10. case Failure(e) => 11. logError(s"Cannot register with master: ${masterEndpoint .address}", e) 12. System.exit(1) 13. }(ThreadUtils.sameThread) 14. }
handleRegisterResponse方法中的模式匹配,收到RegisteredWorker消息进行相应的处理。
Worker.scala的handleRegisterResponse的源码如下。
1. private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { 2. msg match { 3. case RegisteredWorker(masterRef, masterWebUiUrl) => 4. ....... 5.
Spark 2.1.1版本中,registerWithMaster方法中的Worker发送RegisterWorker消息给Master,此时,Worker同步收到Master回复的RegisterWorkerResponse消息以后还须根据成功或失败的情况,通过handleRegisterResponse进行后续的处理。
Spark 2.2.0版本将registerWithMaster方法调整为sendRegisterMessageToMaster方法。sendRegisterMessageToMaster方法中的Worker发送RegisterWorker消息给Master以后,就完成此次注册。Master节点收到RegisterWorker消息另行处理,如果注册成功,Master就发送Worker节点成功的RegisteredWorker消息;如果注册失败,Master就发送Worker节点失败的RegisterWorkerFailed消息。
1. private def sendRegisterMessageToMaster(masterEndpoint: RpcEndpointRef): Unit = { 2. masterEndpoint.send(RegisterWorker( 3. workerId, 4. host, 5. port, 6. self, 7. cores, 8. memory, 9. workerWebUiUrl, 10. masterEndpoint.address)) 11. }