2.2.3 StreamOperatorFactory详解
我们已经知道,StreamOperator最终会通过StreamOperatorFactory封装在Transformation结构中,并存储在StreamGraph和JobGraph结构中,直到运行时执行StreamTask时,才会调用StreamOperatorFactory.createStreamOperator()方法在StreamOperatorFactory中定义StreamOperator实例。
通过StreamOperatorFactory封装创建StreamOperator的操作,在DataStreamAPI中主要通过SimpleStreamOperatorFactory创建已经定义的Operator,而在Table API模块中主要通过CodeGenOperatorFactory从代码中动态编译并创建Operator实例。SimpleStreamOperatorFactory和CodeGenOperatorFactory都是StreamOperatorFactory的实现类。
如图2-6所示,StreamOperatorFactory接口定义了创建StreamOperator的方法,并提供了设定ChainingStrategy、InputType等属性的方法。
DataStream API中大部分转换操作都是通过SimpleOperatorFactory进行封装和创建的。SimpleStreamOperatorFactory根据算子类型的不同,拓展出了InputFormatOperatorFactory、UdfStreamOperatorFactory和OutputFormatOperatorFactory三种接口实现。
·InputFormatOperatorFactory:支持创建InputFormat类型输入的StreamSource算子,即SourceFunction为InputFormatSourceFunction类型,并提供getInputFormat()方法生成StreamGraph。
·UdfStreamOperatorFactory:支持AbstractUdfStreamOperator类型的Operator创建,并且在UdfStreamOperatorFactory中提供了获取UserFunction的方法。
·OutputFormatOperatorFactory:支持创建OutputFormat类型输出的StreamSink算子,即SinkFunction为OutputFormatSinkFunction类型,并提供getOutputFormat()方法生成StreamGraph。
图2-6 StreamOperatorFactory UML关系图
如代码清单2-12所示,从SimpleOperatorFactory.of()方法定义中可以看出,基于StreamOperator提供的of()方法对算子进行工厂类的封装,实现将Operator封装在OperatorFactory中。然后根据Operator类型的不同,创建不同的SimpleOperatorFactory实现类,例如当Operator类型为StreamSource且UserFunction定义属于InputFormatSourceFunction时,就会创建SimpleInputFormatOperatorFactory实现类,其他情况类似。
代码清单2-12 SimpleOperatorFactory.of()方法
public static <OUT> SimpleOperatorFactory<OUT> of(StreamOperator<OUT> operator) { if (operator == null) { return null; } else if (operator instanceof StreamSource && ((StreamSource) operator).getUserFunction() instanceof InputFormatSourceFunction) { // 如果Operator是StreamSource类型,且UserFunction类型为 InputFormatSourceFunction // 返回SimpleInputFormatOperatorFactory return new SimpleInputFormatOperatorFactory<OUT>((StreamSource) operator); } else if (operator instanceof StreamSink && ((StreamSink) operator).getUserFunction() instanceof OutputFormatSinkFunction) { // 如果Operator是StreamSink类型,且UserFunction类型为 OutputFormatSinkFunction // 返回SimpleOutputFormatOperatorFactory return new SimpleOutputFormatOperatorFactory<>((StreamSink) operator); } else if (operator instanceof AbstractUdfStreamOperator) { // 如果Operator是AbstractUdfStreamOperator则返回 SimpleUdfStreamOperatorFactory return new SimpleUdfStreamOperatorFactory<OUT>((AbstractUdfStreamOper ator) operator); } else { // 其他情况返回SimpleOperatorFactory return new SimpleOperatorFactory<>(operator); } }
如代码清单2-13所示,在集群中执行该算子时,首先会调用SimpleOperatorFactory.createStreamOperator()方法创建StreamOperator实例。如果算子同时实现了SetupableStreamOperator接口,则会调用setup()方法对算子进行基本的设置。
代码清单2-13 SimpleOperatorFactory.createStreamOperator方法
public <T extends StreamOperator<OUT>> T createStreamOperator(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) { if (operator instanceof SetupableStreamOperator) { ((SetupableStreamOperator) operator).setup(containingTask, config, output); } return (T) operator; }
对于StreamOperator如何在Task实例中执行,我们会在第4章进行详细介绍。接下来,我们看StreamOperator中的Function是如何定义和实现的。