5.3 ExecutorBackend启动原理和源码详解
ExecutorBackend是Executor向集群发送更新消息的一个可插拔的接口。ExecutorBackend拥有不同的实现。Standalone模式下ExecutorBackend的默认实现是CoarseGrainedExecutorBackend;在Local模式下,ExecutorBackend的默认实现是LocalBackend。在Mesos调度模式下,ExecutorBackend的默认实现是MesosExecutorBackend。本节主要探索Standalone模式下的ExecutorBackend,通过源码深入理解ExecutorBackend接口设计的精髓。
5.3.1 ExecutorBackend接口与Executor的关系
本节将详细分析Standalone模式下ExecutorBackend和Executor的关系。在StandaloneSchedulerBackend中会实例化一个StandaloneAppClient。StandaloneAppClient中携带了command信息,command信息中指定了要启动的ExecutorBackend的实现类,Standalone模式下,该ExecutorBackend的实现类是org.apache.spark.executor.CoarseGrainedExecutorBackend类。
StandaloneSchedulerBackend.scala的start方法中构建了一个Command对象,该对象的第一个参数是mainClass,即进程的主类。该类在Standalone模式下为org.apache.spark.executor. CoarseGrainedExecutorBackend。分别将sparkJavaopts、javaOpts、command、appUiAddress、coresPerExecutor、appDes传入StandaloneAppClient构造函数。StandaloneAppClient将会向Master发送RegisterApplication注册请求,Master受理后通过launchExecutor方法在Worker节点启动一个ExecutorRunner对象,该对象用于管理一个Executor进程。在ExecutorRunner中将通过CommandUtil构建一个ProcessBuilder,调用ProcessBuilder的start方法将会以进程的方式启动org.apache.spark.executor.CoarseGrainedExecutorBackend。在CoarseGrainedExecotorBackend的onStart方法中,将会向Driver端发送RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)消息请求注册,完成注册后将立即返回一个RegisteredExecutor(executorAddress. host)消息,CoarseGraiendExecutorBackend收到该消息,马上实例化出一个Executor。源码如下所示。
CoarseGrainedExecutorBackend.scala的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case RegisteredExecutor => 3. logInfo("Successfully registered with driver") 4. try { 5. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) 6. } catch { 7. case NonFatal(e) => 8. exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) 9. }
从这里可以看出,CoarseGrainedExecutorBackend比Executor先实例化。CoarseGrained-ExecutorBackend负责与集群通信,而Executor则专注于任务的处理,它们是一对一的关系,在集群中各司其职。
每个Worker节点上可以启动多个CoarseGrainedExecutorBackend进程,每个进程对应一个Executor。
5.3.2 ExecutorBackend的不同实现
ExecutorBackend是与集群交互的接口,该接口在不同的调度模式下有不同的实现。图5-3是ExecutorBackend及其实现的关系类图。
图5-3 ExecutorBackend及其实现的关系类图
不同模式下,ExecutorRunner启动的进程不一样。在Standalone模式下启动的是org.apache.spark.executor.CoarseGrainedExecutorBackend进程;在Local模式下,启动的是org.apache.spark.executor.LocalExecutorBackend进程;在Mesos模式下,启动的是org.apache. spark.executor.MesosExecutorBackend进程。
下面来看Standalone模式下CoarseGrainedExecutorBackend的启动。在Standalone模式下,会启动org.apache.spark.deploy.Client类,该类将向Master发送RequestSubmitDriver (driverDescription)消息,Master中匹配到RequestSubmitDriver(driverDescription)后,将会调用schedule方法。该调用的源码如下所示。
Master.scala的receiveAndReply的源码如下。
1. override def receiveAndReply(context: RpcCallContext): PartialFunction [Any, Unit] = { 2. ...... 3. case RequestSubmitDriver(description) => 4. //若state不为ALIVE,直接向Client返回SubmitDriverResponse(self,false, //None,msg)消息 5. if (state != RecoveryState.ALIVE) { 6. val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + 7. "Can only accept driver submissions in ALIVE state." 8. context.reply(SubmitDriverResponse(self, false, None, msg)) 9. } else { 10. logInfo("Driver submitted " + description.command.mainClass) 11. //使用description创建driver,该方法返回DriverDescription 12. val driver = createDriver(description) 13. persistenceEngine.addDriver(driver) 14. waitingDrivers += driver 15. //waitingDrivers等待在调度数组中加入该driver 16. drivers.add(driver) 17. //用schedule方法调度资源 18. schedule() 19. //向ClientEndpoint回复SubmitDriverResponse消息 20. 21. context.reply(SubmitDriverResponse(self, true, Some(driver.id), 22. s"Driver successfully submitted as ${driver.id}")) 23. }
Master的receiveAndReply收到RequestSubmitDriver消息后,调用schedule方法。
Master的schedule的源码如下。
1. private def schedule(): Unit = { 2. if (state != RecoveryState.ALIVE) { 3. return 4. } 5. //Drivers 优先于executors 6. val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter (_.state == WorkerState.ALIVE)) 7. val numWorkersAlive = shuffledAliveWorkers.size 8. var curPos = 0 9. for (driver <- waitingDrivers.toList) { //遍历waitingDrivers 10. //以循环的方式给每个等候的driver分配Worker。对于每个driver,我们从分配 //给driver的最后一个Worker开始,继续前进,直到所有活跃的Worker节点 11. 12. var launched = false 13. var numWorkersVisited = 0 14. while (numWorkersVisited < numWorkersAlive && !launched) { 15. val worker = shuffledAliveWorkers(curPos) 16. numWorkersVisited += 1 17. if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { 18. launchDriver(worker, driver) 19. waitingDrivers -= driver 20. launched = true 21. } 22. curPos = (curPos + 1) % numWorkersAlive 23. } 24. } 25. startExecutorsOnWorkers() 26. }
上面代码中,RecoveryState若不为ALIVE,则直接返回,否则使用Random.shuffle将Workers集合打乱,过滤出ALIVE的Worker,生成新的集合shuffledAliveWorkers,尽量考虑到选择Driver的负载均衡。在for语句中遍历waitingDrivers队列,判断Worker剩余内存和剩余物理核是否满足Driver需求,如满足,则调用launchDriver(worker,driver)方法在选中的Worker上启动Driver进程。
实例化SparkContext时,在SparkContext中将实例化出DAGScheduler、StandaloneSchedulerBackend。Driver在Worker节点上启动之后,在StandaloneSchedulerBackend中将会调用new()函数创建一个StandaloneAppClient。StandaloneAppClient中有一个ClientEndpoint,在其onStart方法中将向Master发送RegisterApplication请求注册application,注册好application后,Master又会调用schedule方法,在满足条件的Worker上为application启动Executor,首先会启动ExecutorRunner,在ExecutorRunner中启动CoarseGrainedExecutor-Backend,启动后将会实例化出Executor。为什么在Standalone模式下会启动CoarseGrained-ExecutorBackend呢?在什么地方设置要启动的CoarseGrainedExecutorBackend进程呢?其实,在实例化StandaloneAppClient的时候就已经传入了。
StandaloneSchedulerBackend.scala的start方法代码中设置了Command对象。Command对象的第一个参数是启动进程的mainClass。因此,ExecutorRunner中启动进程时,启动的是org.apache.spark.executor.CoarseGrainedExecutorBackend。
5.3.3 ExecutorBackend中的通信
ExecutorBackend是一个被Executor使用的可插拔的与集群通信的接口。在ExecutorBackend中有statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)方法,通过这个方法向集群发送Task执行的各种信息,如果任务执行失败,则返回失败的信息;如果执行成功,则返回任务执行的结果。本节重点讲解在Standalone模式下CoarseGrainedExecutor-Backend中的通信。CoarseGrainedExecutorBackend在整个集群中的通信如图5-4所示。
在图5-4中,Executor与CoarseGrainedExecutorBackend协作,将任务计算的结果通过CoarseGrainedExecutorBackend的statusUpdate方法将taskId、TaskState以及结果数据发送给Driver。Driver收到StatusUpdate(executorId,tasked,state,data)消息,通过判断state的不同状态,进行不同的处理。例如,当state的状态为TaskState.LOST时,Driver端会移除Executor;当state的状态为TaskState.FINISHED时,Driver端会调用enqueueSuccessfulTask进行处理。
这里主要看CoarseGrainedExecutorBackend与Driver之间的通信。当Worker节点中启动ExecutorRunner时,ExecutorRunner中会启动CoarseGrainedExecutorBackend进程,在CoarseGrainedExecutorBackend的onStart方法中,向Driver发出RegisterExecutor注册请求。源码如下所示。
图5-4 CoarseGrainedExecutorBackend在整个集群中的通信
CoarseGrainedExecutorBackend的onStart方法的源码如下。
1. override def onStart() { 2. logInfo("Connecting to driver: " + driverUrl) 3. rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => 4. //这是一个非常快的行动,所以我们可以用"ThreadUtils.sameThread" 5. driver = Some(ref) 6. //向Driver发送ask请求,等待Driver回应 7. ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) 8. }(ThreadUtils.sameThread).onComplete { 9. //这是一个非常快的行动,所以我们可以用"ThreadUtils.sameThread" 10. case Success(msg) => 11. //经常收到true,可以忽略 12. case Failure(e) => 13. exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) 14. }(ThreadUtils.sameThread) 15. }
上面的代码中,Some(ref)得到Driver的引用,通过ask方法返回Future[Boolean],然后在Future对象上调用onComplete方法进行额外的处理。Driver端收到注册请求,将会注册Executor的请求,并向ListenerBus中发送SparkListenerExecutorAdded事件。
如果executorDataMap中已经存在该Executor的id,就返回RegisterExecutorFailed,如果不存在该Executor的id,则在executorDataMap中加入该Executor的id,并返回RegisteredExecutor消息且向listenerBus中添加SparkListenerExecutorAdded事件。CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,将会新建一个Executor执行器,并为此Executor充当信使,与Driver通信。CoarseGrainedExecutorBackend收到RegisteredExecutor消息的源码如下所示。
CoarseGrainedExecutorBackend.scala的receive的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case RegisteredExecutor => 3. logInfo("Successfully registered with driver") 4. try { 5. //收到RegisteredExecutor消息,立即创建Executor 6. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) 7. } catch { 8. case NonFatal(e) => 9. exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) 10. }
从上面的代码中可以看到,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,将会新建一个Executor。由此可见,Executor在CoarseGrainedExecutorBackend后实例化,这与Executor和CoarseGrainedExecutorBackend的不同职责有关,Executor主要负责计算,而CoarseGrainedExecutorBackend主要负责通信,通信环境准备好了,架起同CoarseGrainedSchedulerBackend通信的桥梁,就可以接收CoarseGrainedSchedulerBackend中调用launchTask方法发送的LaunchTask消息了,因此通信在前,计算在后。
Executor中的计算结果是通过CoarseGrainedExecutorBackend的statusUpdate方法返回给CoarseGrainedExecutorBackend的。statusUpdate方法的代码如下所示。
CoarseGrainedExecutorBackend.scala的源码如下。
1. override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { 2. val msg = StatusUpdate(executorId, taskId, state, data) 3. driver match { 4. //向Driver发送StatusUpdate消息 5. case Some(driverRef) => driverRef.send(msg) 6. case None => logWarning(s"Drop $msg because has not yet connected to driver") 7. } 8. }
上面源码中,通过参数taskId、state、data构建一个StatusUpdate对象,该对象将被当作消息发送到Driver端,Driver根据返回结果的需要,将会向CoarseGrainedExecutorBackend发送新的指令消息,如LaunchTask、KillTask、StopExecutors、Shutdown等。
5.3.4 ExecutorBackend的异常处理
若CoarseGrainedExecutorBackend在运行中出现异常,将调用exitExecutor方法进行处理,处理以后,系统退出。exitExecutor函数可以由其他子类重载来处理,Executor执行的退出方式不同。例如,当Executor挂掉了,后台程序可能不会让父进程也挂掉。如果须通知Driver,Driver将清理挂掉的Executor的数据。
CoarseGrainedExecutorBackend的exitExecutor方法的源码如下。
1. protected def exitExecutor(code: Int, 2. reason: String, 3. throwable: Throwable = null, 4. notifyDriver: Boolean = true) = { 5. val message = "Executor self-exiting due to : " + reason 6. if (throwable != null) { 7. logError(message, throwable) 8. } else { 9. logError(message) 10. } 11. 12. if (notifyDriver && driver.nonEmpty) { 13. driver.get.ask[Boolean]( 14. RemoveExecutor(executorId, new ExecutorLossReason(reason)) 15. ).onFailure { case e => 16. logWarning(s"Unable to notify the driver due to " + e.getMessage, e) 17. }(ThreadUtils.sameThread) 18. } 19. 20. System.exit(code) 21. } 22. }
CoarseGrainedExecutorBackend在运行中一旦出现异常情况,将调用exitExecutor方法处理。
Executor向Driver注册RegisterExecutor失败。
Executor收到Driver的RegisteredExecutor注册成功消息以后,创建Executor实例失败。
Driver返回Executor注册失败消息RegisterExecutorFailed。
Executor收到Driver的LaunchTask启动任务消息,但是Executor为null。
Executor收到Driver的KillTask消息,但是Executor为null。
Executor和Driver失去连接。