Flink设计与实现:核心原理与源码解析
上QQ阅读APP看书,第一时间看更新

3.2.3 Dispatcher的创建与初始化

下面我们了解一下Dispatcher组件的创建和初始化过程。如图3-6所示,Dispatcher的创建过程比较复杂,涉及的组件和服务非常多。

对于运行时的Dispatcher组件,我们需要理解以下几个基础概念。

图3-6 Dispatcher主要涉及模块示意图

·Dispatcher:负责对集群中的作业进行接收和分发处理操作,客户端可以通过与Dispatcher建立RPC连接,将作业过ClusterClient提交到集群Dispatcher服务中。Dispatcher通过JobGraph对象启动JobManager服务。

·DispatcherRunner:负责启动和管理Dispatcher组件,并支持对Dispatcher组件的Leader选举。当Dispatcher集群组件出现异常并停止时,会通过DispatcherRunner重新选择和启动新的Dispatcher服务,从而保证Dispatcher组件的高可用。

·DispatcherLeaderProcess:负责管理Dispatcher生命周期,同时提供了对JobGraph的任务恢复管理功能。如果基于ZooKeeper实现了集群高可用,DispatcherLeader-Process会将提交的JobGraph存储在ZooKeeper中,当集群停止或者出现异常时,就会通过DispatcherLeaderProcess对集群中的JobGraph进行恢复,这些JobGraph都会被存储在JobGraphStore的实现类中。

·DispatcherGatewayService:主要基于Dispatcher实现的GatewayService,用于获取DispatcherGateway。

1.创建DispatcherRunner

在DefaultDispatcherResourceManagerComponentFactory创建集群组件的过程中,需要先创建DefaultDispatcherRunnerFactory才能创建DispatcherRunner。DispatcherRunner主要提供了启动Dispatcher组件以及Leader选举等功能,如图3-7所示。

·DispatcherRunner是通过DispatcherRunnerFactory创建的,DispatcherRunnerFactory中的参数依赖DispatcherFactory,DispatcherFactory最终用于创建Dispatcher实例。

·DispatcherRunner有DefaultDispatcherRunner和DispatcherRunnerLeaderElectionLifecycleManager两种实现,前者是DispatcherRunner接口的主要实现,后者实现了DispatcherRunner的LeaderElection生命周期管理,其中包括使用LeaderElection-Service启动和停止DispatcherRunner线程。

图3-7 DispatcherRunner UML关系图

·在DispatcherRunnerFactory的创建过程中需要同步创建DispatcherLeaderProcessFactoryFactory,用于创建DispatcherLeaderProcess。同时,DispatcherFactory也是DispatcherLeaderProcessFactoryFactory的成员参数。

·DefaultDispatcherRunnerFactory提供了createSessionRunner()和createJobRunner()两个静态方法,用于创建DefaultDispatcherRunnerFactory,从而实现创建不同集群类型的DispatcherRunner。

如代码清单3-16所示,在DefaultDispatcherRunnerFactory.createSessionRunner()方法中提供了创建Session类型DefaultDispatcherRunnerFactory的逻辑,从方法中可以看出,SessionDispatcherLeaderProcessFactoryFactory会作为DefaultDispatcherRunnerFactory工厂类的参数,对于Per-Job类型的集群,此时使用的就是JobDispatcherLeaderProcessFactoryFactory参数。

代码清单3-16 DefaultDispatcherRunnerFactory.createSessionRunner()方法定义


public static DefaultDispatcherRunnerFactory createSessionRunner(Dispatcher
   Factory dispatcherFactory) {
   return new DefaultDispatcherRunnerFactory(
      SessionDispatcherLeaderProcessFactoryFactory.create(dispatcherFactory));
}

接下来我们看DefaultDispatcherRunnerFactory.createDispatcherRunner()方法的具体实现。如代码清单3-17所示,方法主要包含如下逻辑。

·调用dispatcherLeaderProcessFactoryFactory.createFactory()方法创建DispatcherLeader-ProcessFactory。

·调用DefaultDispatcherRunner.create()方法创建DispatcherRunner,此时创建的dispatcher-LeaderProcessFactory会作为参数应用到DefaultDispatcherRunner中。

代码清单3-17 DefaultDispatcherRunnerFactory.createDispatcherRunner()方法


public DispatcherRunner createDispatcherRunner(
      LeaderElectionService leaderElectionService,
      FatalErrorHandler fatalErrorHandler,
      JobGraphStoreFactory jobGraphStoreFactory,
      Executor ioExecutor,
      RpcService rpcService,
      PartialDispatcherServices partialDispatcherServices) throws Exception {
   // 创建DispatcherLeaderProcessFactory
   final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = 
      dispatcherLeaderProcessFactoryFactory.createFactory(
      jobGraphStoreFactory,
      ioExecutor,
      rpcService,
      partialDispatcherServices,
      fatalErrorHandler);
   // 调用DefaultDispatcherRunner.create()方法创建DispatcherRunner
   return DefaultDispatcherRunner.create(
      leaderElectionService,
      fatalErrorHandler,
      dispatcherLeaderProcessFactory);
}

2.创建DispatcherLeaderProcess

根据不同的集群类型,DispatcherLeaderProcess分为SessionDispatcherLeaderProcess和JobDispatcherLeaderProcess两种实现类。其中SessionDispatcherLeaderProcess用于对多个JobGraph进行恢复和提交,在高可用集群下通过JobGraphStore中存储JobGraph进行存储及恢复,当集群重新启动后会将JobGraphStore中存储的JobGraph恢复并创建相应的任务。JobDispatcherLeaderProcess用于单个JobGraph的恢复和提交,处理逻辑比较简单。

如图3-8所示,DispatcherLeaderProcess的实现类都是通过工厂方法创建的,且会涉及工厂类的嵌套。例如对Session类型集群,会事先通过SessionDispatcherLeaderProcessFac-toryFactory创建SessionDispatcherLeaderProcessFactory,然后再通过SessionDispatcherLeader-ProcessFactory创建SessionDispatcherLeaderProcess实现类。

图3-8 DispatcherLeaderProcess的创建关系

如图3-9所示,DispatcherLeaderProcess的实现有SessionDispatcherLeaderProcess和JobDispatcherLeaderProcess两种类型,它们都继承自AbstractDispatcherLeaderProcess基本实现类。

图3-9 DispatcherLeaderProcess UML关系图

·在DispatcherLeaderProcess接口中定义了start()方法,用于启动DispatcherLeader-Process服务,同时提供了获取DispatcherGateway、ShutDownFuture的方法。

·在AbstractDispatcherLeaderProcess基本实现类中,主要实现了DispatcherLeader-Process中的接口方法,并提供了onStart()和OnClose()两个抽象方法,用于定义和实现子类。

·在AbstractDispatcherLeaderProcess类中,通过内部类定义了DispatcherGateway-Service接口以及获取DispatcherGatewayService的工厂接口。

·在SessionDispatcherLeaderProcess实现类中主要实现了与Session集群相关的Dispatcher处理逻辑,主要用于对JobGraphStore中存储的JobGraph进行恢复。在非高可用集群下,JobGraphStore的实现类为StandaloneJobGraphStore,也就是不对JobGraph进行存储和管理;在高可用集群中,JobGraphStore基于ZooKeeper存储集群中的JobGraph。

·在JobDispatcherLeaderProcess实现类中包含了对单个JobGraph进行创建和提交的方法,因此JobDispatcherLeaderProcess主要涵盖了对单个JobGraph的提交逻辑,不存在JobGraphStore的概念。JobDispatcherLeaderProcess伴随作业的结束,其生命周期也会同步终止。

(1)启动SessionDispatcherLeaderProcess

如代码清单3-18所示,SessionDispatcherLeaderProcess.onStart()方法包含如下步骤。

·调用startService()方法启动JobGraphStore服务,JobGraphStore主要用于存储集群中运行的JobGraph,当系统出现异常时,可以从JobGraphStore中获取JobGraph并再次提交到Dispatcher上运行。

·调用recoverJobsAsync()方法对JobGraphStore中的方法进行恢复。

·调用createDispatcherIfRunning()方法,创建Dispatcher并将恢复的JobGraph提交到Dispatcher上运行。

·调用onErrorIfRunning()方法捕获执行过程中出现的异常并处理。

代码清单3-18 SessionDispatcherLeaderProcess.onStart()方法定义


@Override
protected void onStart() {
   // 调用startService()方法,启动jobGraphStore
   startServices();
     // 调用recoverJobsAsync()方法,对JobGraphStore中的方法进行恢复
   onGoingRecoveryOperation = recoverJobsAsync()
      .thenAccept(this::createDispatcherIfRunning)
      .handle(this::onErrorIfRunning);
}

(2)启动JobDispatcherLeaderProcess

和SessionDispatcherLeaderProcess不同,JobDispatcherLeaderProcess服务的启动过程相对简单。这是因为JobDispatcherLeaderProcess无须恢复JobGraphStore中存储的JobGraph,仅支持恢复当前作业的JobGraph。在JobDispatcherLeaderProcess.onStart()方法中会直接创建DispatcherGatewayService,并在DispatcherGatewayService中启动单个JobGraph的Dispatcher组件服务,最终完成独立作业的提交和处理,如代码清单3-19所示。

代码清单3-19 JobDispatcherLeaderProcess.onStart()方法


@Override
protected void onStart() {
   // 通过dispatcherGatewayServiceFactory创建DispatcherGatewayService
   final DispatcherGatewayService dispatcherService = 
      dispatcherGatewayServiceFactory.create(
      DispatcherId.fromUuid(getLeaderSessionId()),
      Collections.singleton(jobGraph),
      ThrowingJobGraphWriter.INSTANCE);
     //完成设定 
   completeDispatcherSetup(dispatcherService);
}

3.Dispatcher的创建和启动

当JobGraph从JobGraphStore中恢复后,会立刻创建和启动Dispatcher组件,然后将恢复出来的JobGraph提交到Dispatcher上运行。如果集群中没有需要恢复的JobGraph,此时就会忽略以上步骤,直接创建并启动Dispatcher。

如代码清单3-20所示,在SessionDispatcherLeaderProcess.createDispatcher()方法中定义了创建Dispatcher的逻辑。从方法中可以看出,本质上是通过dispatcherGatewayServiceFactory接口创建的DispatcherGatewayService,前面我们已经提到,DispatcherGatewayService对Dispatcher和DispatcherGateway进行了服务化封装。

代码清单3-20 SessionDispatcherLeaderProcess.createDispatcher()方法


private void createDispatcher(Collection<JobGraph> jobGraphs) {
   // 通过dispatcherGatewayServiceFactory创建dispatcherService
   final DispatcherGatewayService dispatcherService = 
      dispatcherGatewayServiceFactory.create(
      DispatcherId.fromUuid(getLeaderSessionId()),
      jobGraphs,
      jobGraphStore);
     // 完成对Dispatcher的后续配置,如设定dispatcherService
   completeDispatcherSetup(dispatcherService);
}

如代码清单3-21所示,DefaultDispatcherGatewayServiceFactory.create()方法包含了创建和启动Dispatcher的过程,方法中的主要逻辑如下。

·调用dispatcherFactory创建dispatcher组件。注意,参数包括rpcService、fencingToken以及从JobGraphStore中恢复的recoveredJobs集合,还有通过partialDispatcherServices和jobGraphWriter创建的PartialDispatcherServicesWithJobGraphStore。其中,DispatcherServices包含了Dispatcher组件用到的服务,Dispatcher组件会在初始化的过程中从DispatcherServices获取这些服务,比如highAvailabilityServices、heartbeatServices等。

·dispatcher创建完毕后,会调用Dispatcher.start()方法启动dispatcher组件,这里实际上调用的是RpcEndpoint.start()方法启动Dispatcher对应RPC服务,关于RPC通信的底层实现,我们将放在第7章重点介绍。

代码清单3-21 DefaultDispatcherGatewayServiceFactory.create()方法


public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
      DispatcherId fencingToken,
      Collection<JobGraph> recoveredJobs,
      JobGraphWriter jobGraphWriter) {
   final Dispatcher dispatcher;
   try {
      //调用dispatcherFactory创建dispatcher组件
      dispatcher = dispatcherFactory.createDispatcher(
         rpcService,
         fencingToken,
         recoveredJobs,
         PartialDispatcherServicesWithJobGraphStore.
            from(partialDispatcherServices, jobGraphWriter));
   } catch (Exception e) {
      throw new FlinkRuntimeException("Could not create the Dispatcher rpc 
         endpoint.", e);
   }
      //启动dispatcher
   dispatcher.start();
      //将dispatcher放置在DefaultDispatcherGatewayService中并返回
   return DefaultDispatcherGatewayService.from(dispatcher);
}