Flink设计与实现:核心原理与源码解析
上QQ阅读APP看书,第一时间看更新

2.3.3 ProcessFunction的定义与实现

在Flink API抽象栈中,最底层的是Stateful Function Process接口,代码实现对应的是ProcessFunction接口。通过实现ProcessFunction接口,能够灵活地获取底层处理数据和信息,例如状态数据的操作、定时器的注册以及事件触发周期的控制等。

根据数据元素是否进行了KeyBy操作,可以将ProcessFunction分为KeyedProcessFunction和ProcessFunction两种类型,其中KeyedProcessFunction使用相对较多,常见的实现类有TopNFunction、GroupAggFunction等函数;ProcessFunction的主要实现类是LookupJoinRunner,主要用于实现维表的关联等操作。Table API模块相关的Operator直接实现自ProcessFunction接口。

如图2-12所示,KeyedProcessFunction主要继承了AbstractRichFunction抽象类,且在内部同时创建了Context和OnTimerContext两个内部类,其中Context主要定义了从数据元素中获取Timestamp和从运行时中获取TimerService等信息的方法,另外还有用于旁路输出的output()方法。OnTimerContext则继承自Context抽象类,主要应用在KeyedProcessFunction的OnTimer()方法中。在KeyedProcessFunction中通过processElement方法读取数据元素并处理,会在processElement()方法中根据实际情况创建定时器,此时定时器会被注册到Context的TimerService定时器队列中,当满足定时器触发的时间条件后,会通过调用OnTimer()方法执行定时器中的计算逻辑,例如对状态数据的异步清理操作。

图2-12 KeyedProcessFunction UML关系图

我们通过Table API中的DeduplicateKeepLastRowFunction实例看下KeyedProcessFunction的具体实现。DeduplicateKeepLastRowFunction主要用于对接入的数据去重,并保留最新的一行记录。

如代码清单2-19所示,在DeduplicateKeepLastRowFunction.processElement()方法中定义了对输入当前Function数据元素的处理逻辑。在方法中调用TimerService获取当前的处理时间,然后基于该处理时间调用registerProcessingCleanupTimer()方法注册状态数据清理的定时器,当处理时间到达注册时间后,就会调用定时器进行数据处理。

代码清单2-19 DeduplicateKeepLastRowFunction.processElement()方法定义


public void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) 
   throws Exception {
   if (generateRetraction) {
      long currentTime = ctx.timerService().currentProcessingTime();
      // 注册状态数据清理的 Timer
      registerProcessingCleanupTimer(ctx, currentTime);
   }
   processLastRow(input, generateRetraction, state, out);
}

在registerProcessingCleanupTimer()方法中,调用CleanupState.registerProcessingCleanupTimer()方法注册状态数据清理的定时器。如代码清单2-20所示,CleanupState.registerProcessingCleanupTimer()方法主要包含如下逻辑。

·通过cleanupTimeState状态获取最新一次清理状态的注册时间curCleanupTime。

·判断当前curCleanupTime是否为空,且currentTime+minRetentionTime总和是否大于curCleanupTime。只有满足以上两个条件才会触发注册状态数据清理的定时器,这里的minRetentionTime是用户指定的状态保留最短时间。

·如果以上条件都满足,则调用TimerService注册ProcessingTimeTimer,在满足定时器的时间条件后触发定时器。

·如果curCleanupTime不为空,即之前的TimerService还包含过期的定时器,则调用timerService.deleteProcessingTimeTimer()方法删除过期的定时器。

更新cleanupTimeState中的curCleanupTime指标。

代码清单2-20 CleanupState.registerProcessingCleanupTimer()方法定义


default void registerProcessingCleanupTimer(
      ValueState<Long> cleanupTimeState,
      long currentTime,
      long minRetentionTime,
      long maxRetentionTime,
      TimerService timerService) throws Exception {
   // 获取最新的Cleanup事件
   long curCleanupTime = cleanupTimeState.value();
   // 判断curCleanupTime是否为空,满足触发条件则注册定时器
   if (curCleanupTime == null || (currentTime + minRetentionTime) >
      curCleanupTime) {
      // 获取最新的cleanupTime 
      long cleanupTime = currentTime + maxRetentionTime;
      // 注册Timer并记录最新的清理时间
      timerService.registerProcessingTimeTimer(cleanupTime);
      // 删除过期的定时器
      if (curCleanupTime != null) {
         timerService.deleteProcessingTimeTimer(curCleanupTime);
      }
      cleanupTimeState.update(cleanupTime);
   }
}

触发注册的Timer后,调用DeduplicateKeepLastRowFunction.onTimer()方法处理数据。

如代码清单2-21所示,在onTimer()方法中,主要调用cleanupState()方法对状态进行清理。逻辑相对简单,实际上就是ProcessFunction的定义和实现。当然除了对状态数据的清理,也可以通过定时器完成其他类型的定时操作,实现更加复杂的计算逻辑,定时器最终都会注册在TimerService内部的队列中。

代码清单2-21 DeduplicateKeepLastRowFunction.onTimer()方法定义


@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<BaseRow> 
   out) throws Exception {
   if (stateCleaningEnabled) {
      cleanupState(state);
   }
}

除了上面介绍的,还有其他类型的ProcessFunction实现,但不管是哪种类型的实现,基本都是这些功能的组合或变体。在DataStream API中实际上基于ProcessFunction定义了很多可以直接使用的方法。虽然ProcessFunction接口更加灵活,但使用复杂度也相对较高,因此除非无法通过现成算子实现复杂的计算逻辑,通常情况下用户是不需要自定义实现ProcessFunction处理数据的。