4.5 打通Spark系统运行内幕机制循环流程
Spark通过DAGScheduler面向整个Job划分出了不同的Stage,划分Stage之后,Stage从后往前划分,执行的时候从前往后执行,每个Stage内部有一系列的任务,Stage里面的任务是并行计算,并行任务的逻辑是完全相同的,但处理的数据不同。DAGScheduler以TaskSet的方式,把一个DAG构建的Stage中的所有任务提交给底层的调度器TaskScheduler。TaskScheduler是一个接口,与具体的任务解耦合,可以运行在不同的调度模式下,如可运行在Standalone模式,也可运行在Yarn上。
Spark基础调度(图4-6)包括RDD Objects、DAGScheduler、TaskScheduler、Worker等内容。
图4-6 Spark基础调度图
DAGScheduler在提交TaskSet给底层调度器的时候是面向接口TaskScheduler的,这符合面向对象中依赖抽象而不依赖具体的原则,带来底层资源调度器的可插拔性,导致Spark可以运行在众多的资源调度器模式上,如Standalone、Yarn、Mesos、Local、EC2、其他自定义的资源调度器;在Standalone的模式下我们聚焦于TaskSchedulerImpl。
TaskScheduler是一个接口Trait,底层任务调度接口,由[org.apache.spark.scheduler. TaskSchedulerImpl]实现。这个接口允许插入不同的任务调度程序。每个任务调度器在单独的SparkContext中调度任务。任务调度程序从每个Stage的DAGScheduler获得提交的任务集,负责发送任务到集群运行,如果任务运行失败,将重试,返回DAGScheduler事件。
Spark 2.1.1版本的TaskScheduler.scala的源码如下。
1. private[spark] trait TaskScheduler { 2. 3. private val appId = "spark-application-" + System.currentTimeMillis 4. 5. def rootPool: Pool 6. 7. def schedulingMode: SchedulingMode 8. 9. def start(): Unit 10. 11. //成功初始化后调用(通常在Spark上下文中)。Yarn使用这个来引导基于优先位置的资源 //分配,等待从节点登记等 12. def postStartHook() { } 13. 14. //从群集断开连接 15. def stop(): Unit 16. 17. //提交要运行的任务序列 18. def submitTasks(taskSet: TaskSet): Unit 19. 20. //取消Stage 21. def cancelTasks(stageId: Int, interruptThread: Boolean): Unit 22. 23. //系统为upcalls设置DAG调度,这是保证在submitTasks被调用前被设置 24. def setDAGScheduler(dagScheduler: DAGScheduler): Unit 25. 26. //获取集群中使用的默认并行级别,作为对作业的提示 27. def defaultParallelism(): Int 28. 29. /** * 更新正运行任务,让master 知道BlockManager 仍活着。如果driver 知道给定的 * 块管理器,则返回true;否则,返回false,指示块管理器应重新注册 30. */ 31. 32. def executorHeartbeatReceived( 33. execId: String, 34. accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], 35. blockManagerId: BlockManagerId): Boolean 36. 37. /** 38. *获取与作业相关联的应用程序ID 39. * @return An application ID 40. */ 41. def applicationId(): String = appId 42. 43. /** *处理丢失的 executor 44. */ 45. def executorLost(executorId: String, reason: ExecutorLossReason): Unit 46. 47. /** 48. *获取与作业相关联的应用程序的尝试ID 49. * 50. * @return应用程序的尝试ID 51. */ 52. def applicationAttemptId(): Option[String] 53. 54. }
Spark 2.2.0版本的TaskScheduler.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第21行之后新增加了killTaskAttempt方法。
1. ...... 2. /** 3. *杀死任务尝试 4. * 5. * @return任务是否成功被杀死 6. */ 7. def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean 8. .......
DAGScheduler把TaskSet交给底层的接口TaskScheduler,具体实现时有不同的方法。TaskScheduler主要由TaskSchedulerImpl实现。
TaskSchedulerImpl也有自己的子类YarnScheduler。
1. private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { 2. 3. //RackResolver 记录INFO日志信息时,解析rack的信息 4. if (Logger.getLogger(classOf[RackResolver]).getLevel == null) { 5. Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN) 6. } 7. 8. //默认情况下,rack是未知的 9. override def getRackForHost(hostPort: String): Option[String] = { 10. val host = Utils.parseHostPort(hostPort)._1 11. Option(RackResolver.resolve(sc.hadoopConfiguration, host). getNetworkLocation) 12. } 13. }
YarnScheduler的子类YarnClusterScheduler实现如下。
1. private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { 2. logInfo("Created YarnClusterScheduler") 3. 4. override def postStartHook() { 5. ApplicationMaster.sparkContextInitialized(sc) 6. super.postStartHook() 7. logInfo("YarnClusterScheduler.postStartHook done") 8. } 9. 10. }
默认情况下,我们研究Standalone的模式,所以主要研究TaskSchedulerImpl。DAGScheduler把TaskSet交给TaskScheduler,TaskScheduler中通过TastSetManager管理具体的任务。TaskScheduler的核心任务是提交TaskSet到集群运算,并汇报结果。
为TaskSet创建和维护一个TaskSetManager,并追踪任务的本地性以及错误信息。
遇到延后的Straggle任务,会放到其他节点重试。
向DAGScheduler汇报执行情况,包括在Shuffle输出lost的时候报告fetch failed错误等信息。
TaskSet是一个普通的类,第一个成员是tasks,tasks是一个数组。TaskSet的源码如下。
1. private[spark] class TaskSet( 2. val tasks: Array[Task[_]], 3. val stageId: Int, 4. val stageAttemptId: Int, 5. val priority: Int, 6. val properties: Properties) { 7. val id: String = stageId + "." + stageAttemptId 8. 9. override def toString: String = "TaskSet " + id 10. }
TaskScheduler内部有SchedulerBackend,SchedulerBackend管理Executor资源。从Standalone的模式来讲,具体实现是StandaloneSchedulerBackend(Spark 2.0版本将之前的AppClient名字更新为StandaloneAppClient)。
SchedulerBackend本身是一个接口,是一个trait。SchedulerBackend的源码如下。
1. private[spark] trait SchedulerBackend { 2. private val appId = "spark-application-" + System.currentTimeMillis 3. 4. def start(): Unit 5. def stop(): Unit 6. def reviveOffers(): Unit 7. def defaultParallelism(): Int 8. 9. def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = 10. throw new UnsupportedOperationException 11. def isReady(): Boolean = true 12. 13. /** 14. *获取与作业关联的应用ID 15. * 16. * @return 应用程序 ID 17. */ 18. def applicationId(): String = appId 19. 20. /** 21. *如果集群管理器支持多个尝试,则获取此运行的尝试ID,应用程序运行在客户端模式将没 *有尝试ID 22. * 23. * @return如果可用,返回应用程序尝试ID 24. */ 25. def applicationAttemptId(): Option[String] = None 26. 27. /** 28. *得到driver 日志的URL。这些URL是用来在用户界面中显示链接driver的Executors *选项卡 29. * 30. * @return Map 包含日志名称和URLs 31. */ 32. def getDriverLogUrls: Option[Map[String, String]] = None 33. 34. }
StandaloneSchedulerBackend:专门负责收集Worker的资源信息。接收Worker向Driver注册的信息,ExecutorBackend启动的时候进行注册,为当前应用程序准备计算资源,以进程为单位。
StandaloneSchedulerBackend的源码如下。
1. private[spark] class StandaloneSchedulerBackend( 2. scheduler: TaskSchedulerImpl, 3. sc: SparkContext, 4. masters: Array[String]) 5. extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) 6. with StandaloneAppClientListener 7. with Logging { 8. private var client: StandaloneAppClient = null 9. ......
StandaloneSchedulerBackend里有一个Client: StandaloneAppClient。
1. private[spark] class StandaloneAppClient( 2. rpcEnv: RpcEnv, 3. masterUrls: Array[String], 4. appDescription: ApplicationDescription, 5. listener: StandaloneAppClientListener, 6. conf: SparkConf) 7. extends Logging {
StandaloneAppClient允许应用程序与Spark standalone集群管理器通信。获取Master的URL、应用程序描述和集群事件监听器,当各种事件发生时可以回调监听器。masterUrls的格式为spark://host:port,StandaloneAppClient需要向Master注册。
StandaloneAppClient在StandaloneSchedulerBackend.scala的start方法启动时进行赋值,用new()函数创建一个StandaloneAppClient。
Spark 2.1.1版本的StandaloneSchedulerBackend.scala的源码如下。
1. private[spark] class StandaloneSchedulerBackend( 2. ...... 3. 4. override def start() { 5. ...... 6. val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) 7. client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) 8. client.start() 9. launcherBackend.setState(SparkAppHandle.State.SUBMITTED) 10. waitForRegistration() 11. launcherBackend.setState(SparkAppHandle.State.RUNNING) 12. }
Spark 2.2.0版本的StandaloneSchedulerBackend.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第6行ApplicationDescription传入的第5个参数appUIAddress更改为webUrl。
1. ...... 2. val appDesc = ApplicationDescription(sc.appName, maxCores, sc. executorMemory, command, webUrl, sc.eventLogDir,sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) 3. .....
StandaloneAppClient.scala中,里面有一个类是ClientEndpoint,核心工作是在启动时向Master注册。StandaloneAppClient的start方法启动时,就调用new函数创建一个ClientEndpoint。
StandaloneAppClient的源码如下。
1. private[spark] class StandaloneAppClient( 2. ...... 3. private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint 4. with Logging { 5. ...... 6. def start() { 7. //启动 rpcEndpoint; 将回调listener 8. endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint (rpcEnv))) 9. }
StandaloneSchedulerBackend在启动时构建StandaloneAppClient实例,并在StandaloneAppClient实例start时启动了ClientEndpoint消息循环体。ClientEndpoint在启动时会向Master注册当前程序。
StandaloneAppClient中ClientEndpoint类的onStart()方法如下。
1. override def onStart(): Unit = { 2. try { 3. registerWithMaster(1) 4. } catch { 5. case e: Exception => 6. logWarning("Failed to connect to master", e) 7. markDisconnected() 8. stop() 9. } 10. }
这是StandaloneSchedulerBackend的第一个注册的核心功能。StandaloneSchedulerBackend继承自CoarseGrainedSchedulerBackend。而CoarseGrainedSchedulerBackend在启动时就创建DriverEndpoint,从实例的角度讲,DriverEndpoint也属于StandaloneSchedulerBackend实例。
1. private[spark] 2. class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv) 3. extends ExecutorAllocationClient with SchedulerBackend with Logging 4. { 5. ...... 6. class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) 7. extends ThreadSafeRpcEndpoint with Logging { 8. ......
StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start的时候会实例化类型为DriverEndpoint(这就是我们程序运行时的经典对象Driver)的消息循环体。StandaloneSchedulerBackend在运行时向Master注册申请资源,当Worker的ExecutorBackend启动时会发送RegisteredExecutor信息向DriverEndpoint注册,此时StandaloneSchedulerBackend就掌握了当前应用程序拥有的计算资源,TaskScheduler就是通过StandaloneSchedulerBackend拥有的计算资源来具体运行Task的;StandaloneSchedulerBackend不是应用程序的总管,应用程序的总管是DAGScheduler、TaskScheduler,StandaloneSchedulerBackend向应用程序的Task分配具体的计算资源,并把Task发送到集群中。
SparkContext、DAGScheduler、TaskSchedulerImpl、StandaloneSchedulerBackend在应用程序启动时只实例化一次,应用程序存在期间始终存在这些对象。
这里基于Spark 2.2版本讲解:
Spark调度器三大核心资源:SparkContext、DAGScheduler、TaskSchedulerImpl,TaskSchedulerImpl作为具体的底层调度器,运行时需要计算资源,因此需要StandaloneSchedulerBackend。StandaloneSchedulerBackend设计巧妙的地方是启动时启动StandaloneAppClient,而StandaloneAppClient在start时有一个ClientEndpoint的消息循环体,ClientEndpoint的消息循环体启动的时候向Master注册应用程序。
StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start启动的时候会实例化DriverEndpoint,所有的ExecutorBackend启动的时候都要向DriverEndpoint注册,注册最后落到了StandaloneSchedulerBackend的内存数据结构中,表面上看是在CoarseGrainedSchedulerBackend,但是实例化的时候是StandaloneSchedulerBackend,注册给父类的成员其实就是子类的成员。
作为前提问题:TaskScheduler、StandaloneSchedulerBackend是如何启动的?TaskSchedulerImpl是什么时候实例化的?
TaskSchedulerImpl是在SparkContext中实例化的。在SparkContext类实例化的时候,只要不是方法体里面的内容,都会被执行,(sched, ts)是SparkContext的成员,将调用createTaskScheduler方法。调用createTaskScheduler方法返回一个Tuple,包括两个元素:sched是我们的schedulerBackend;ts是taskScheduler。
1. class SparkContext(config: SparkConf) extends Logging { 2. ...... 3. //创建启动调度器 scheduler 4. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 5. _schedulerBackend = sched 6. _taskScheduler = ts 7. _dagScheduler = new DAGScheduler(this) 8. _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
createTaskScheduler里有很多运行模式,这里关注Standalone模式,首先调用new()函数创建一个TaskSchedulerImpl,TaskSchedulerImpl和SparkContext是一一对应的,整个程序运行的时候只有一个TaskSchedulerImpl,也只有一个SparkContext;接着实例化StandaloneSchedulerBackend,整个程序运行的时候只有一个StandaloneSchedulerBackend。createTaskScheduler方法如下。
1. private def createTaskScheduler( 2. sc: SparkContext, 3. master: String, 4. deployMode: String): (SchedulerBackend, TaskScheduler) = { 5. import SparkMasterRegex._ 6. ...... 7. master match { 8. ...... 9. case SPARK_REGEX(sparkUrl) => 10. val scheduler = new TaskSchedulerImpl(sc) 11. val masterUrls = sparkUrl.split(",").map("spark://" + _) 12. val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) 13. scheduler.initialize(backend) 14. (backend, scheduler) 15. ......
在SparkContext实例化的时候通过createTaskScheduler来创建TaskSchedulerImpl和StandaloneSchedulerBackend。然后在createTaskScheduler中调用scheduler.initialize(backend)。
initialize的方法参数把StandaloneSchedulerBackend传进来,schedulingMode模式匹配有两种方式:FIFO、FAIR。
initialize的方法中调用schedulableBuilder.buildPools()。buildPools方法根据FIFOSchedulableBuilder、FairSchedulableBuilder不同的模式重载方法实现。
1. private[spark] trait SchedulableBuilder { 2. def rootPool: Pool 3. 4. def buildPools(): Unit 5. 6. def addTaskSetManager(manager: Schedulable, properties: Properties): Unit 7. }
initialize的方法把StandaloneSchedulerBackend传进来了,但还没有启动StandaloneSchedulerBackend。在TaskSchedulerImpl的initialize方法中把StandaloneSchedulerBackend传进来,从而赋值为TaskSchedulerImpl的backend;在TaskSchedulerImpl调用start方法时会调用backend.start方法,在start方法中会最终注册应用程序。
下面来看SparkContext.scala的taskScheduler的启动。
1. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 2. _schedulerBackend = sched 3. _taskScheduler = ts 4. _dagScheduler = new DAGScheduler(this) 5. ...... 6. _taskScheduler.start() 7. _applicationId = _taskScheduler.applicationId() 8. _applicationAttemptId = taskScheduler.applicationAttemptId() 9. _conf.set("spark.app.id", _applicationId) 10. ......
其中调用了_taskScheduler的start方法。
1. private[spark] trait TaskScheduler { 2. ...... 3. 4. def start(): Unit 5. .....
TaskScheduler的start()方法没有具体实现。TaskScheduler子类的TaskSchedulerImpl的start()方法的源码如下。
1. override def start() { 2. backend.start() 3. 4. if (!isLocal && conf.getBoolean("spark.speculation", false)) { 5. logInfo("Starting speculative execution thread") 6. speculationScheduler.scheduleAtFixedRate(new Runnable { 7. override def run(): Unit = Utils.tryOrStopSparkContext(sc) { 8. checkSpeculatableTasks() 9. } 10. }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit. MILLISECONDS) 11. } 12. }
TaskSchedulerImpl的start通过backend.start启动了StandaloneSchedulerBackend的start方法。
StandaloneSchedulerBackend的start方法中,将command封装注册给Master,Master转过来要Worker启动具体的Executor。command已经封装好指令,Executor具体要启动进程入口类CoarseGrainedExecutorBackend。然后调用new()函数创建一个StandaloneAppClient,通过client.start启动client。
StandaloneAppClient的start方法中调用new()函数创建一个ClientEndpoint:
1. def start() { 2. //启动 rpcEndpoint; 将回调listener 3. endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint (rpcEnv))) 4. }
ClientEndpoint的源码如下。
1. private class ClientEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint 2. with Logging { 3. ...... 4. override def onStart(): Unit = { 5. try { 6. registerWithMaster(1) 7. } catch { 8. case e: Exception => 9. logWarning("Failed to connect to master", e) 10. markDisconnected() 11. stop() 12. } 13. }
ClientEndpoint是一个ThreadSafeRpcEndpoint。ClientEndpoint的onStart方法中调用registerWithMaster(1)进行注册,向Master注册程序。registerWithMaster方法如下。
1. private def registerWithMaster(nthRetry: Int) { 2. registerMasterFutures.set(tryRegisterAllMasters()) 3. registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable { 4. override def run(): Unit = { 5. if (registered.get) { 6. registerMasterFutures.get.foreach(_.cancel(true)) 7. registerMasterThreadPool.shutdownNow() 8. } else if (nthRetry >= REGISTRATION_RETRIES) { 9. markDead("All masters are unresponsive! Giving up.") 10. } else { 11. registerMasterFutures.get.foreach(_.cancel(true)) 12. registerWithMaster(nthRetry + 1) 13. } 14. } 15. }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) 16. }
程序注册后,Master通过schedule分配资源,通知Worker启动Executor,Executor启动的进程是CoarseGrainedExecutorBackend,Executor启动后又转过来向Driver注册,Driver其实是StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend的一个消息循环体DriverEndpoint。
总结:
在SparkContext实例化的时候调用createTaskScheduler来创建TaskSchedulerImpl和StandaloneSchedulerBackend,同时在SparkContext实例化的时候会调用TaskSchedulerImpl的start,在start方法中会调用StandaloneSchedulerBackend的start,在该start方法中会创建StandaloneAppClient对象,并调用StandaloneAppClient对象的start方法,在该start方法中会创建ClientEndpoint,创建ClientEndpoint时会传入Command来指定具体为当前应用程序启动的Executor的入口类的名称为CoarseGrainedExecutorBackend,然后ClientEndpoint启动并通过tryRegisterMaster来注册当前的应用程序到Master中,Master接收到注册信息后如果可以运行程序,为该程序生产Job ID并通过schedule来分配计算资源,具体计算资源的分配是通过应用程序的运行方式、Memory、cores等配置信息决定的。最后,Master会发送指令给Worker,Worker为当前应用程序分配计算资源时会首先分配ExecutorRunner。ExecutorRunner内部会通过Thread的方式构建ProcessBuilder来启动另外一个JVM进程,这个JVM进程启动时加载的main方法所在的类的名称就是在创建ClientEndpoint时传入的Command来指定具体名称为CoarseGrainedExecutorBackend的类,此时JVM在通过ProcessBuilder启动时获得了CoarseGrainedExecutorBackend后加载并调用其中的main方法,在main方法中会实例化CoarseGrainedExecutorBackend本身这个消息循环体,而CoarseGrainedExecutorBackend在实例化时会通过回调onStart向DriverEndpoint发送RegisterExecutor来注册当前的CoarseGrainedExecutorBackend,此时DriverEndpoint收到该注册信息并保存在StandaloneSchedulerBackend实例的内存数据结构中,这样Driver就获得了计算资源。
CoarseGrainedExecutorBackend.scala的main方法如下。
1. def main(args: Array[String]) { 2. var driverUrl: String = null 3. var executorId: String = null 4. var hostname: String = null 5. var cores: Int = 0 6. var appId: String = null 7. var workerUrl: Option[String] = None 8. val userClassPath = new mutable.ListBuffer[URL]() 9. 10. var argv = args.toList 11. ...... 12. run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) 13. System.exit(0) 14. }
CoarseGrainedExecutorBackend的main然后开始调用run方法。
1. private def run( 2. driverUrl: String, 3. executorId: String, 4. hostname: String, 5. cores: Int, 6. appId: String, 7. workerUrl: Option[String], 8. userClassPath: Seq[URL]) { 9. ...... 10. env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( 11. env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) 12. ......
在CoarseGrainedExecutorBackend的main方法中,通过env.rpcEnv.setupEndpoint ("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))构建了CoarseGrainedExecutorBackend实例本身。