3.2.3 Dispatcher的创建与初始化
下面我们了解一下Dispatcher组件的创建和初始化过程。如图3-6所示,Dispatcher的创建过程比较复杂,涉及的组件和服务非常多。
对于运行时的Dispatcher组件,我们需要理解以下几个基础概念。
图3-6 Dispatcher主要涉及模块示意图
·Dispatcher:负责对集群中的作业进行接收和分发处理操作,客户端可以通过与Dispatcher建立RPC连接,将作业过ClusterClient提交到集群Dispatcher服务中。Dispatcher通过JobGraph对象启动JobManager服务。
·DispatcherRunner:负责启动和管理Dispatcher组件,并支持对Dispatcher组件的Leader选举。当Dispatcher集群组件出现异常并停止时,会通过DispatcherRunner重新选择和启动新的Dispatcher服务,从而保证Dispatcher组件的高可用。
·DispatcherLeaderProcess:负责管理Dispatcher生命周期,同时提供了对JobGraph的任务恢复管理功能。如果基于ZooKeeper实现了集群高可用,DispatcherLeader-Process会将提交的JobGraph存储在ZooKeeper中,当集群停止或者出现异常时,就会通过DispatcherLeaderProcess对集群中的JobGraph进行恢复,这些JobGraph都会被存储在JobGraphStore的实现类中。
·DispatcherGatewayService:主要基于Dispatcher实现的GatewayService,用于获取DispatcherGateway。
1.创建DispatcherRunner
在DefaultDispatcherResourceManagerComponentFactory创建集群组件的过程中,需要先创建DefaultDispatcherRunnerFactory才能创建DispatcherRunner。DispatcherRunner主要提供了启动Dispatcher组件以及Leader选举等功能,如图3-7所示。
·DispatcherRunner是通过DispatcherRunnerFactory创建的,DispatcherRunnerFactory中的参数依赖DispatcherFactory,DispatcherFactory最终用于创建Dispatcher实例。
·DispatcherRunner有DefaultDispatcherRunner和DispatcherRunnerLeaderElectionLifecycleManager两种实现,前者是DispatcherRunner接口的主要实现,后者实现了DispatcherRunner的LeaderElection生命周期管理,其中包括使用LeaderElection-Service启动和停止DispatcherRunner线程。
图3-7 DispatcherRunner UML关系图
·在DispatcherRunnerFactory的创建过程中需要同步创建DispatcherLeaderProcessFactoryFactory,用于创建DispatcherLeaderProcess。同时,DispatcherFactory也是DispatcherLeaderProcessFactoryFactory的成员参数。
·DefaultDispatcherRunnerFactory提供了createSessionRunner()和createJobRunner()两个静态方法,用于创建DefaultDispatcherRunnerFactory,从而实现创建不同集群类型的DispatcherRunner。
如代码清单3-16所示,在DefaultDispatcherRunnerFactory.createSessionRunner()方法中提供了创建Session类型DefaultDispatcherRunnerFactory的逻辑,从方法中可以看出,SessionDispatcherLeaderProcessFactoryFactory会作为DefaultDispatcherRunnerFactory工厂类的参数,对于Per-Job类型的集群,此时使用的就是JobDispatcherLeaderProcessFactoryFactory参数。
代码清单3-16 DefaultDispatcherRunnerFactory.createSessionRunner()方法定义
public static DefaultDispatcherRunnerFactory createSessionRunner(Dispatcher Factory dispatcherFactory) { return new DefaultDispatcherRunnerFactory( SessionDispatcherLeaderProcessFactoryFactory.create(dispatcherFactory)); }
接下来我们看DefaultDispatcherRunnerFactory.createDispatcherRunner()方法的具体实现。如代码清单3-17所示,方法主要包含如下逻辑。
·调用dispatcherLeaderProcessFactoryFactory.createFactory()方法创建DispatcherLeader-ProcessFactory。
·调用DefaultDispatcherRunner.create()方法创建DispatcherRunner,此时创建的dispatcher-LeaderProcessFactory会作为参数应用到DefaultDispatcherRunner中。
代码清单3-17 DefaultDispatcherRunnerFactory.createDispatcherRunner()方法
public DispatcherRunner createDispatcherRunner( LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, JobGraphStoreFactory jobGraphStoreFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception { // 创建DispatcherLeaderProcessFactory final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory( jobGraphStoreFactory, ioExecutor, rpcService, partialDispatcherServices, fatalErrorHandler); // 调用DefaultDispatcherRunner.create()方法创建DispatcherRunner return DefaultDispatcherRunner.create( leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory); }
2.创建DispatcherLeaderProcess
根据不同的集群类型,DispatcherLeaderProcess分为SessionDispatcherLeaderProcess和JobDispatcherLeaderProcess两种实现类。其中SessionDispatcherLeaderProcess用于对多个JobGraph进行恢复和提交,在高可用集群下通过JobGraphStore中存储JobGraph进行存储及恢复,当集群重新启动后会将JobGraphStore中存储的JobGraph恢复并创建相应的任务。JobDispatcherLeaderProcess用于单个JobGraph的恢复和提交,处理逻辑比较简单。
如图3-8所示,DispatcherLeaderProcess的实现类都是通过工厂方法创建的,且会涉及工厂类的嵌套。例如对Session类型集群,会事先通过SessionDispatcherLeaderProcessFac-toryFactory创建SessionDispatcherLeaderProcessFactory,然后再通过SessionDispatcherLeader-ProcessFactory创建SessionDispatcherLeaderProcess实现类。
图3-8 DispatcherLeaderProcess的创建关系
如图3-9所示,DispatcherLeaderProcess的实现有SessionDispatcherLeaderProcess和JobDispatcherLeaderProcess两种类型,它们都继承自AbstractDispatcherLeaderProcess基本实现类。
图3-9 DispatcherLeaderProcess UML关系图
·在DispatcherLeaderProcess接口中定义了start()方法,用于启动DispatcherLeader-Process服务,同时提供了获取DispatcherGateway、ShutDownFuture的方法。
·在AbstractDispatcherLeaderProcess基本实现类中,主要实现了DispatcherLeader-Process中的接口方法,并提供了onStart()和OnClose()两个抽象方法,用于定义和实现子类。
·在AbstractDispatcherLeaderProcess类中,通过内部类定义了DispatcherGateway-Service接口以及获取DispatcherGatewayService的工厂接口。
·在SessionDispatcherLeaderProcess实现类中主要实现了与Session集群相关的Dispatcher处理逻辑,主要用于对JobGraphStore中存储的JobGraph进行恢复。在非高可用集群下,JobGraphStore的实现类为StandaloneJobGraphStore,也就是不对JobGraph进行存储和管理;在高可用集群中,JobGraphStore基于ZooKeeper存储集群中的JobGraph。
·在JobDispatcherLeaderProcess实现类中包含了对单个JobGraph进行创建和提交的方法,因此JobDispatcherLeaderProcess主要涵盖了对单个JobGraph的提交逻辑,不存在JobGraphStore的概念。JobDispatcherLeaderProcess伴随作业的结束,其生命周期也会同步终止。
(1)启动SessionDispatcherLeaderProcess
如代码清单3-18所示,SessionDispatcherLeaderProcess.onStart()方法包含如下步骤。
·调用startService()方法启动JobGraphStore服务,JobGraphStore主要用于存储集群中运行的JobGraph,当系统出现异常时,可以从JobGraphStore中获取JobGraph并再次提交到Dispatcher上运行。
·调用recoverJobsAsync()方法对JobGraphStore中的方法进行恢复。
·调用createDispatcherIfRunning()方法,创建Dispatcher并将恢复的JobGraph提交到Dispatcher上运行。
·调用onErrorIfRunning()方法捕获执行过程中出现的异常并处理。
代码清单3-18 SessionDispatcherLeaderProcess.onStart()方法定义
@Override protected void onStart() { // 调用startService()方法,启动jobGraphStore startServices(); // 调用recoverJobsAsync()方法,对JobGraphStore中的方法进行恢复 onGoingRecoveryOperation = recoverJobsAsync() .thenAccept(this::createDispatcherIfRunning) .handle(this::onErrorIfRunning); }
(2)启动JobDispatcherLeaderProcess
和SessionDispatcherLeaderProcess不同,JobDispatcherLeaderProcess服务的启动过程相对简单。这是因为JobDispatcherLeaderProcess无须恢复JobGraphStore中存储的JobGraph,仅支持恢复当前作业的JobGraph。在JobDispatcherLeaderProcess.onStart()方法中会直接创建DispatcherGatewayService,并在DispatcherGatewayService中启动单个JobGraph的Dispatcher组件服务,最终完成独立作业的提交和处理,如代码清单3-19所示。
代码清单3-19 JobDispatcherLeaderProcess.onStart()方法
@Override protected void onStart() { // 通过dispatcherGatewayServiceFactory创建DispatcherGatewayService final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create( DispatcherId.fromUuid(getLeaderSessionId()), Collections.singleton(jobGraph), ThrowingJobGraphWriter.INSTANCE); //完成设定 completeDispatcherSetup(dispatcherService); }
3.Dispatcher的创建和启动
当JobGraph从JobGraphStore中恢复后,会立刻创建和启动Dispatcher组件,然后将恢复出来的JobGraph提交到Dispatcher上运行。如果集群中没有需要恢复的JobGraph,此时就会忽略以上步骤,直接创建并启动Dispatcher。
如代码清单3-20所示,在SessionDispatcherLeaderProcess.createDispatcher()方法中定义了创建Dispatcher的逻辑。从方法中可以看出,本质上是通过dispatcherGatewayServiceFactory接口创建的DispatcherGatewayService,前面我们已经提到,DispatcherGatewayService对Dispatcher和DispatcherGateway进行了服务化封装。
代码清单3-20 SessionDispatcherLeaderProcess.createDispatcher()方法
private void createDispatcher(Collection<JobGraph> jobGraphs) { // 通过dispatcherGatewayServiceFactory创建dispatcherService final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create( DispatcherId.fromUuid(getLeaderSessionId()), jobGraphs, jobGraphStore); // 完成对Dispatcher的后续配置,如设定dispatcherService completeDispatcherSetup(dispatcherService); }
如代码清单3-21所示,DefaultDispatcherGatewayServiceFactory.create()方法包含了创建和启动Dispatcher的过程,方法中的主要逻辑如下。
·调用dispatcherFactory创建dispatcher组件。注意,参数包括rpcService、fencingToken以及从JobGraphStore中恢复的recoveredJobs集合,还有通过partialDispatcherServices和jobGraphWriter创建的PartialDispatcherServicesWithJobGraphStore。其中,DispatcherServices包含了Dispatcher组件用到的服务,Dispatcher组件会在初始化的过程中从DispatcherServices获取这些服务,比如highAvailabilityServices、heartbeatServices等。
·dispatcher创建完毕后,会调用Dispatcher.start()方法启动dispatcher组件,这里实际上调用的是RpcEndpoint.start()方法启动Dispatcher对应RPC服务,关于RPC通信的底层实现,我们将放在第7章重点介绍。
代码清单3-21 DefaultDispatcherGatewayServiceFactory.create()方法
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create( DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, JobGraphWriter jobGraphWriter) { final Dispatcher dispatcher; try { //调用dispatcherFactory创建dispatcher组件 dispatcher = dispatcherFactory.createDispatcher( rpcService, fencingToken, recoveredJobs, PartialDispatcherServicesWithJobGraphStore. from(partialDispatcherServices, jobGraphWriter)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); } //启动dispatcher dispatcher.start(); //将dispatcher放置在DefaultDispatcherGatewayService中并返回 return DefaultDispatcherGatewayService.from(dispatcher); }