4.4 SchedulerBackend解析
本节讲解SchedulerBackend原理剖析、SchedulerBackend源码解析、Spark程序的注册机制、Spark程序对计算资源Executor的管理等内容。
4.4.1 SchedulerBackend原理剖析
以Spark Standalone部署方式为例,StandaloneSchedulerBackend在启动的时候构造了StandaloneAppClient实例,并在该实例start的时候启动了ClientEndpoint消息循环体,ClientEndpoint在启动的时候会向Master注册当前程序。而StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start的时候会实例化类型为DriverEndPoint(这就是程序运行时的经典的对象Driver)的消息循环体,StandaloneSchedulerBackend专门负责收集Worker上的资源信息,当ExecutorBackend启动的时候,会发送RegisteredExecutor信息向DriverEndpoint注册,此时StandaloneSchedulerBackend就掌握了当前应用程序拥有的计算资源,TaskScheduler就是通过StandaloneSchedulerBackend拥有的计算资源来具体运行Task的。
4.4.2 SchedulerBackend源码解析
StandaloneSchedulerBackend收集和分配资源给调度的Task使用。
StandaloneSchedulerBackend.scala的源码如下。
1. private[spark] class StandaloneSchedulerBackend( 2. ...... 3. override def start() { 4. ...... 5. val command = Command("org.apache.spark.executor. CoarseGrainedExecutorBackend", 6. args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) 7. ...... 8. client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) 9. client.start() 10. ......
在StandaloneAppClient的start方法中调用new()函数创建一个ClientEndpoint,将在ClientEndpoint中向Master注册。
StandaloneAppClient.scala的源码如下。
1. ...... 2. def start() { 3. //启动 rpcEndpoint; 将直接回调到listener 4. endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint (rpcEnv))) 5. }
4.4.3 Spark程序的注册机制
在上面的源码分析中,StandaloneAppClient在启动的时候创建了StandaloneAppClient内部类ClientEndpoint的实例对象作为消息循环体,以便向Master注册当前的Application。既然ClientEndpoint是RpcEndpoint的子类,那么就会有这样的生命周期:constructor -> onStart -> receive ->onStop。根据这个原理,我们来看ClientEndpoint的onStart方法代码。
StandaloneAppClient.scala的源码如下。
1. override def onStart(): Unit = { 2. try { 3. registerWithMaster(1) 4. } catch { 5. .......
ClientEndpoint在启动时就立即调用registerWithMaster来注册Application,继续查看registerWithMaster方法代码。
StandaloneAppClient.scala的源码如下。
1. private def registerWithMaster(nthRetry: Int) { 2. //向所有Master异步地尝试注册Application 3. registerMasterFutures.set(tryRegisterAllMasters()) 4. ...... 5. }
ClientEndpoint在tryRegiesterAllMasters方法中会向所有的Master尝试注册Application。向Master发送RegisterApplication消息。
StandaloneAppClient.scala的源码如下。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. ...... 3. val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master. ENDPOINT_NAME) 4. masterRef.send(RegisterApplication(appDescription, self)) 5. ......
Master也是RpcEndpoint的子类,所以可以通过receive方法接收DeployMessage类型的消息RegisterApplication。
Master.scala的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. ...... 3. case RegisterApplication(description, driver) => 4. ....... 5. registerApplication(app) 6. ....... 7. driver.send(RegisteredApplication(app.id, self)) 8. .......
ClientEndpoint最后在receive方法中得到来自Master注册好Application的确认消息RegisteredApplication。
StandaloneAppClient.scala的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case RegisteredApplication(appId_, masterRef) => 3. ...... 4. appId.set(appId_) 5. registered.set(true) 6. ......
至此,Application向Master注册完毕。在上面的RegisterApplication中,调用了schedule方法,这个方法将完成Application的调度,并在Worker节点上启动分配好的Executor给Application使用。
4.4.4 Spark程序对计算资源Executor的管理
从TaskSchedulerImpl的submitTasks的方法中我们知道,Spark Standalone部署模式调用StandaloneSchedulerBackend的reviveOffers方法进行TaskSet所需要资源的分配,得到足够的资源后,将TaskSet中的Task逐个发送到Executor去执行。下面来看这里的资源,即Executor是如何得到和分配的。
StandaloneSchedulerBackend的reviveOffers方法很简单,就是发送一个ReviveOffers消息给内部类DriverEndpoint,代码如下所示。
CoarseGrainedSchedulerBackend.scala的源码如下。
1. override def reviveOffers() { 2. driverEndpoint.send(ReviveOffers) 3. }
DriverEndpoint的receive方法处理ReviveOffers消息也很简单,就是调用makeOffers方法。receive方法部分关键代码如下所示。
CoarseGrainedSchedulerBackend.scala的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. ...... 3. case ReviveOffers => 4. makeOffers() 5. ......
DriverEndpoint的makeOffers方法首先过滤出Alive状态的Executor放到activeExecutorsHahMap变量中,然后使用id、ExecutorData.ExecutorHost、ExecutorData.freeCores构建代表Executor可用资源的WorkerOffer。然后是最重要的两个方法调用。先是调用TaskSchedulerImpl的resourceOffers得到TaskDescription的二维数组,包含Task ID、Executor ID、Task Index等Task执行需要的信息。然后回调DriverEndpoint的launchTask给每个Task对应的Executor发执行Task的LaunchTask消息(其实是由CourseGrainedExecutorBackend转发LauchTask消息)。
TaskSchedulerImpl的resourceOffers方法返回二维数组TaskDescription后作为DriverEndpoint的launchTasks方法的参数,DriverEndpoint的launchTasks方法中首先对传入的tasks进行扁平化操作(例如,将多维数组降维成一维数组),得到所有的Task,然后遍历所有的Task。在遍历过程中,调用serialize()方法对task进行序列化,得到serializedTask。判断如果serializedTask大于等于Akka帧减去Akka预留空间大小,则调用TaskSetManager的abort方法终止该任务的执行,否则将LaunchTask(new SerializableBuffer(serializedTask))消息发送到CoarseGrainedExecutorBackend。
CoarseGrainedExecutorBackend匹配到LaunchTask(data)消息后,首先调用deserialized方法,反序列化出task,然后调用Executor的lauchTask方法执行Task的处理。