Flink设计与实现:核心原理与源码解析
上QQ阅读APP看书,第一时间看更新

2.2.3 StreamOperatorFactory详解

我们已经知道,StreamOperator最终会通过StreamOperatorFactory封装在Transformation结构中,并存储在StreamGraph和JobGraph结构中,直到运行时执行StreamTask时,才会调用StreamOperatorFactory.createStreamOperator()方法在StreamOperatorFactory中定义StreamOperator实例。

通过StreamOperatorFactory封装创建StreamOperator的操作,在DataStreamAPI中主要通过SimpleStreamOperatorFactory创建已经定义的Operator,而在Table API模块中主要通过CodeGenOperatorFactory从代码中动态编译并创建Operator实例。SimpleStreamOperatorFactory和CodeGenOperatorFactory都是StreamOperatorFactory的实现类。

如图2-6所示,StreamOperatorFactory接口定义了创建StreamOperator的方法,并提供了设定ChainingStrategy、InputType等属性的方法。

DataStream API中大部分转换操作都是通过SimpleOperatorFactory进行封装和创建的。SimpleStreamOperatorFactory根据算子类型的不同,拓展出了InputFormatOperatorFactory、UdfStreamOperatorFactory和OutputFormatOperatorFactory三种接口实现。

·InputFormatOperatorFactory:支持创建InputFormat类型输入的StreamSource算子,即SourceFunction为InputFormatSourceFunction类型,并提供getInputFormat()方法生成StreamGraph。

·UdfStreamOperatorFactory:支持AbstractUdfStreamOperator类型的Operator创建,并且在UdfStreamOperatorFactory中提供了获取UserFunction的方法。

·OutputFormatOperatorFactory:支持创建OutputFormat类型输出的StreamSink算子,即SinkFunction为OutputFormatSinkFunction类型,并提供getOutputFormat()方法生成StreamGraph。

图2-6 StreamOperatorFactory UML关系图

如代码清单2-12所示,从SimpleOperatorFactory.of()方法定义中可以看出,基于StreamOperator提供的of()方法对算子进行工厂类的封装,实现将Operator封装在OperatorFactory中。然后根据Operator类型的不同,创建不同的SimpleOperatorFactory实现类,例如当Operator类型为StreamSource且UserFunction定义属于InputFormatSourceFunction时,就会创建SimpleInputFormatOperatorFactory实现类,其他情况类似。

代码清单2-12 SimpleOperatorFactory.of()方法


public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) {
      if (operator == null) {
         return null;
      } else if (operator instanceof StreamSource &&
            ((StreamSource) operator).getUserFunction() instanceof 
               InputFormatSourceFunction) {
      // 如果Operator是StreamSource类型,且UserFunction类型为
         InputFormatSourceFunction
      // 返回SimpleInputFormatOperatorFactory
         return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) 
            operator);
      } else if (operator instanceof StreamSink &&
         ((StreamSink) operator).getUserFunction() instanceof 
            OutputFormatSinkFunction) {
      // 如果Operator是StreamSink类型,且UserFunction类型为
         OutputFormatSinkFunction
      // 返回SimpleOutputFormatOperatorFactory
         return new SimpleOutputFormatOperatorFactory<>((StreamSink) 
            operator);
      } else if (operator instanceof AbstractUdfStreamOperator) {
      // 如果Operator是AbstractUdfStreamOperator则返回
         SimpleUdfStreamOperatorFactory
         return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOper
            ator) operator);
      } else {
     // 其他情况返回SimpleOperatorFactory
         return new SimpleOperatorFactory<>(operator);
      }
   }

如代码清单2-13所示,在集群中执行该算子时,首先会调用SimpleOperatorFactory.createStreamOperator()方法创建StreamOperator实例。如果算子同时实现了SetupableStreamOperator接口,则会调用setup()方法对算子进行基本的设置。

代码清单2-13 SimpleOperatorFactory.createStreamOperator方法


public <T extends StreamOperator<OUT>> T createStreamOperator(StreamTask<?, ?> 
   containingTask,
      StreamConfig config, Output<StreamRecord<OUT>> output) {
   if (operator instanceof SetupableStreamOperator) {
      ((SetupableStreamOperator) operator).setup(containingTask, config, output);
   }
   return (T) operator;
}

对于StreamOperator如何在Task实例中执行,我们会在第4章进行详细介绍。接下来,我们看StreamOperator中的Function是如何定义和实现的。