Flink设计与实现:核心原理与源码解析
上QQ阅读APP看书,第一时间看更新

2.3.1 RichFunction详解

RichFunction接口实际上对Function进行了补充和拓展,提供了控制函数生命周期的open()和close()方法,所有实现了RichFunction的子类都能够获取RuntimeContext对象。而RuntimeContext包含了算子执行过程中所有运行时的上下文信息,例如Accumulator、BroadcastVariable和DistributedCache等变量。

1.RuntimeContext上下文

如图2-8所示,RuntimeContext接口定义了非常丰富的方法,例如创建和获取Accumulator、BroadcastVariable变量的方法以及在状态操作过程中使用到的getState()和getListState()等方法。

图2-8 RuntimeContext UML关系图

不同类型的Operator创建的RuntimeContext也有一定区别,因此在Flink中提供了不同的RuntimeContext实现类,以满足不同Operator对运行时上下文信息的获取。其中AbstractRuntimeUDFContext主要用于获取提供UDF函数的相关运行时上下文信息,且AbstractRuntimeUDFContext又分别被RuntimeUDFContext、DistributedRuntimeUDFContext以及StreamingRuntimeContext三个子类继承和实现。RuntimeUDFContext主要用于CollectionExecutor;DistributedRuntimeUDFContext则主要用于BatchTask、DataSinkTask以及DataSourceTask等离线场景。流式数据处理中使用最多的是StreamingRuntimeContext。

当然还有其他场景使用到的RuntimeContext实现类,例如CepRuntimeContext、SavepointRuntimeContext以及IterationRuntimeContext,这些RuntimeContext实现类主要服务于相应类型的数据处理场景,在这里我们就不再详细展开介绍,有兴趣的读者可以参考相关代码实现。

2.自定义RichMapFunction实例

以下我们通过自定义实现了一个RichMapFunction接口的实例,借此了解RichFunction的主要功能和作用。如代码清单2-14所示,在CustomMapper.open()方法中,首先调用getRuntimeContext()方法获取RuntimeContext,这里的RuntimeContext实际上就是前面提到的StreamingRuntimeContext对象。接下来使用RuntimeContext提供的接口方法获取运行时上下文信息。例如获取MetricGroup创建Counter指标累加器以及调用getState()方法创建ValueState。最后创建好的Metric和ValueState都可以应用在map()转换操作中。

从这里我们可以看出,正因为有了RuntimeContext的设计和实现,使得Function接口实现类可以获取运行时执行过程中的上下文信息,从而实现了更加复杂的统计运算。

代码清单2-14 自定义RichMapFunction实现


public class CustomMapper extends RichMapFunction<String, String> {
   private transient Counter counter;
   private ValueState<Long> state;
   @Override
   public void open(Configuration config) {
      this.counter = getRuntimeContext()
         .getMetricGroup()
         .counter("myCounter");
      state = getRuntimeContext().getState(
         new ValueStateDescriptor<Long>("count", LongSerializer.INSTANCE, 0L));
   }
   @Override
   public String map(String value) throws Exception {
      this.counter.inc();
      long count = state.value() + 1;
         state.update(count);
      return value;
   }
}