2.3.3 ProcessFunction的定义与实现
在Flink API抽象栈中,最底层的是Stateful Function Process接口,代码实现对应的是ProcessFunction接口。通过实现ProcessFunction接口,能够灵活地获取底层处理数据和信息,例如状态数据的操作、定时器的注册以及事件触发周期的控制等。
根据数据元素是否进行了KeyBy操作,可以将ProcessFunction分为KeyedProcessFunction和ProcessFunction两种类型,其中KeyedProcessFunction使用相对较多,常见的实现类有TopNFunction、GroupAggFunction等函数;ProcessFunction的主要实现类是LookupJoinRunner,主要用于实现维表的关联等操作。Table API模块相关的Operator直接实现自ProcessFunction接口。
如图2-12所示,KeyedProcessFunction主要继承了AbstractRichFunction抽象类,且在内部同时创建了Context和OnTimerContext两个内部类,其中Context主要定义了从数据元素中获取Timestamp和从运行时中获取TimerService等信息的方法,另外还有用于旁路输出的output()方法。OnTimerContext则继承自Context抽象类,主要应用在KeyedProcessFunction的OnTimer()方法中。在KeyedProcessFunction中通过processElement方法读取数据元素并处理,会在processElement()方法中根据实际情况创建定时器,此时定时器会被注册到Context的TimerService定时器队列中,当满足定时器触发的时间条件后,会通过调用OnTimer()方法执行定时器中的计算逻辑,例如对状态数据的异步清理操作。
图2-12 KeyedProcessFunction UML关系图
我们通过Table API中的DeduplicateKeepLastRowFunction实例看下KeyedProcessFunction的具体实现。DeduplicateKeepLastRowFunction主要用于对接入的数据去重,并保留最新的一行记录。
如代码清单2-19所示,在DeduplicateKeepLastRowFunction.processElement()方法中定义了对输入当前Function数据元素的处理逻辑。在方法中调用TimerService获取当前的处理时间,然后基于该处理时间调用registerProcessingCleanupTimer()方法注册状态数据清理的定时器,当处理时间到达注册时间后,就会调用定时器进行数据处理。
代码清单2-19 DeduplicateKeepLastRowFunction.processElement()方法定义
public void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) throws Exception { if (generateRetraction) { long currentTime = ctx.timerService().currentProcessingTime(); // 注册状态数据清理的 Timer registerProcessingCleanupTimer(ctx, currentTime); } processLastRow(input, generateRetraction, state, out); }
在registerProcessingCleanupTimer()方法中,调用CleanupState.registerProcessingCleanupTimer()方法注册状态数据清理的定时器。如代码清单2-20所示,CleanupState.registerProcessingCleanupTimer()方法主要包含如下逻辑。
·通过cleanupTimeState状态获取最新一次清理状态的注册时间curCleanupTime。
·判断当前curCleanupTime是否为空,且currentTime+minRetentionTime总和是否大于curCleanupTime。只有满足以上两个条件才会触发注册状态数据清理的定时器,这里的minRetentionTime是用户指定的状态保留最短时间。
·如果以上条件都满足,则调用TimerService注册ProcessingTimeTimer,在满足定时器的时间条件后触发定时器。
·如果curCleanupTime不为空,即之前的TimerService还包含过期的定时器,则调用timerService.deleteProcessingTimeTimer()方法删除过期的定时器。
更新cleanupTimeState中的curCleanupTime指标。
代码清单2-20 CleanupState.registerProcessingCleanupTimer()方法定义
default void registerProcessingCleanupTimer( ValueState<Long> cleanupTimeState, long currentTime, long minRetentionTime, long maxRetentionTime, TimerService timerService) throws Exception { // 获取最新的Cleanup事件 long curCleanupTime = cleanupTimeState.value(); // 判断curCleanupTime是否为空,满足触发条件则注册定时器 if (curCleanupTime == null || (currentTime + minRetentionTime) > curCleanupTime) { // 获取最新的cleanupTime long cleanupTime = currentTime + maxRetentionTime; // 注册Timer并记录最新的清理时间 timerService.registerProcessingTimeTimer(cleanupTime); // 删除过期的定时器 if (curCleanupTime != null) { timerService.deleteProcessingTimeTimer(curCleanupTime); } cleanupTimeState.update(cleanupTime); } }
触发注册的Timer后,调用DeduplicateKeepLastRowFunction.onTimer()方法处理数据。
如代码清单2-21所示,在onTimer()方法中,主要调用cleanupState()方法对状态进行清理。逻辑相对简单,实际上就是ProcessFunction的定义和实现。当然除了对状态数据的清理,也可以通过定时器完成其他类型的定时操作,实现更加复杂的计算逻辑,定时器最终都会注册在TimerService内部的队列中。
代码清单2-21 DeduplicateKeepLastRowFunction.onTimer()方法定义
@Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<BaseRow> out) throws Exception { if (stateCleaningEnabled) { cleanupState(state); } }
除了上面介绍的,还有其他类型的ProcessFunction实现,但不管是哪种类型的实现,基本都是这些功能的组合或变体。在DataStream API中实际上基于ProcessFunction定义了很多可以直接使用的方法。虽然ProcessFunction接口更加灵活,但使用复杂度也相对较高,因此除非无法通过现成算子实现复杂的计算逻辑,通常情况下用户是不需要自定义实现ProcessFunction处理数据的。