2.2.2 OneInputStreamOperator与TwoInputStreamOperator
StreamOperator根据输入流的数量分为两种类型,即支持单输入流的OneInputStreamOperator以及支持双输入流的TwoInputStreamOperator,我们可以将其称为一元输入算子和二元输入算子。下面介绍OneInputStreamOperator和TwoInputStreamOperator的区别。
1.OneInputStreamOperator的实现
OneInputStreamOperator定义了单输入流的StreamOperator,常见的实现类有StreamMap、StreamFilter等算子。OneInputStreamOperator接口主要包含以下方法,专门用于处理接入的单输入数据流,如代码清单2-8所示。
代码清单2-8 OneInputStreamOperator接口定义的主要方法
// 处理输入数据元素的方法 void processElement(StreamRecord<IN> element) throws Exception; // 处理Watermark的方法 void processWatermark(Watermark mark) throws Exception; // 处理延时标记的方法 void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
我们以StreamFilter算子为例,介绍OneInputStreamOperator的实现,如代码清单2-9所示。
·StreamFilter算子在继承AbstractUdfStreamOperator的同时,实现了OneInputStreamOperator接口。
·在StreamFilter算子构造器中,内部的Function类型为FilterFunction,并设定上下游算子的链接策略为ChainingStrategy.ALWAYS,也就是该类型的Operator通常都会与上下游的Operator连接在一起,形成OperatorChain。
·在StreamFilter中实现了OneInputStreamOperator的processElement()方法,通过该方法定义了具体的数据元素处理逻辑。实际上就是使用定义的filterFunction对接入的数据进行筛选,然后通过output.collect(element)方法将符合的条件输出到下游算子中。
代码清单2-9 StreamFilter Class的定义和实现
public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> { private static final long serialVersionUID = 1L; // 初始化FilterFunction并设定ChainingStrategy.ALWAYS public StreamFilter(FilterFunction<IN> filterFunction) { super(filterFunction); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord<IN> element) throws Exception { // 执行userFunction.filter()方法 if (userFunction.filter(element.getValue())) { output.collect(element); } } }
2.TwoInputStreamOperator的实现
TwoInputStreamOperator定义了双输入流类型的StreamOperator接口实现,常见的实现类有CoStreamMap、HashJoinOperator等算子。代码清单2-10是TwoInputStreamOperator接口定义的主要方法,在实现对两个数据流转换操作的同时,还定义了两条数据流中Watermark和LatencyMarker的处理逻辑。
代码清单2-10 TwoInputStreamOperator接口定义的主要方法
// 处理输入源1的数据元素方法 void processElement1(StreamRecord<IN1> element) throws Exception; // 处理输入源2的数据元素方法 void processElement2(StreamRecord<IN2> element) throws Exception; // 处理输入源1的Watermark方法 void processWatermark1(Watermark mark) throws Exception; // 处理输入源2的Watermark方法 void processWatermark2(Watermark mark) throws Exception; // 处理输入源1的LatencyMarker方法 void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception; // 处理输入源2的LatencyMarker方法 void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;
如代码清单2-11所示,我们以CoStreamMap为例,介绍TwoInputStreamOperator算子的具体实现。从CoStreamMap算子定义中可以看出,CoStreamMap继承AbstractUdfStreamOperator的同时,实现了TwoInputStreamOperator接口。其中在processElement1()和processElement2()两个方法的实现中,分别调用了用户定义的CoMapFunction的map1()和map2()方法对输入的数据元素Input1和Input2进行处理。经过函数处理后的结果会通过output.collect()接口推送到下游的Operator中。
代码清单2-11 CoStreamMap Class定义和实现
public class CoStreamMap<IN1, IN2, OUT> extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>> implements TwoInputStreamOperator<IN1, IN2, OUT> { private static final long serialVersionUID = 1L; public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) { super(mapper); } @Override public void processElement1(StreamRecord<IN1> element) throws Exception { output.collect(element.replace(userFunction.map1(element.getValue()))); } @Override public void processElement2(StreamRecord<IN2> element) throws Exception { output.collect(element.replace(userFunction.map2(element.getValue()))); } }