2.3.1 RichFunction详解
RichFunction接口实际上对Function进行了补充和拓展,提供了控制函数生命周期的open()和close()方法,所有实现了RichFunction的子类都能够获取RuntimeContext对象。而RuntimeContext包含了算子执行过程中所有运行时的上下文信息,例如Accumulator、BroadcastVariable和DistributedCache等变量。
1.RuntimeContext上下文
如图2-8所示,RuntimeContext接口定义了非常丰富的方法,例如创建和获取Accumulator、BroadcastVariable变量的方法以及在状态操作过程中使用到的getState()和getListState()等方法。
图2-8 RuntimeContext UML关系图
不同类型的Operator创建的RuntimeContext也有一定区别,因此在Flink中提供了不同的RuntimeContext实现类,以满足不同Operator对运行时上下文信息的获取。其中AbstractRuntimeUDFContext主要用于获取提供UDF函数的相关运行时上下文信息,且AbstractRuntimeUDFContext又分别被RuntimeUDFContext、DistributedRuntimeUDFContext以及StreamingRuntimeContext三个子类继承和实现。RuntimeUDFContext主要用于CollectionExecutor;DistributedRuntimeUDFContext则主要用于BatchTask、DataSinkTask以及DataSourceTask等离线场景。流式数据处理中使用最多的是StreamingRuntimeContext。
当然还有其他场景使用到的RuntimeContext实现类,例如CepRuntimeContext、SavepointRuntimeContext以及IterationRuntimeContext,这些RuntimeContext实现类主要服务于相应类型的数据处理场景,在这里我们就不再详细展开介绍,有兴趣的读者可以参考相关代码实现。
2.自定义RichMapFunction实例
以下我们通过自定义实现了一个RichMapFunction接口的实例,借此了解RichFunction的主要功能和作用。如代码清单2-14所示,在CustomMapper.open()方法中,首先调用getRuntimeContext()方法获取RuntimeContext,这里的RuntimeContext实际上就是前面提到的StreamingRuntimeContext对象。接下来使用RuntimeContext提供的接口方法获取运行时上下文信息。例如获取MetricGroup创建Counter指标累加器以及调用getState()方法创建ValueState。最后创建好的Metric和ValueState都可以应用在map()转换操作中。
从这里我们可以看出,正因为有了RuntimeContext的设计和实现,使得Function接口实现类可以获取运行时执行过程中的上下文信息,从而实现了更加复杂的统计运算。
代码清单2-14 自定义RichMapFunction实现
public class CustomMapper extends RichMapFunction<String, String> { private transient Counter counter; private ValueState<Long> state; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); state = getRuntimeContext().getState( new ValueStateDescriptor<Long>("count", LongSerializer.INSTANCE, 0L)); } @Override public String map(String value) throws Exception { this.counter.inc(); long count = state.value() + 1; state.update(count); return value; } }