2.4.2 TimerService时间服务
对于需要依赖时间定时器进行数据处理的算子来讲,需要借助TimerService组件实现对定时器的管理,其中定时器执行的具体处理逻辑主要通过回调函数定义。每个StreamOperator在创建和初始化的过程中,都会通过InternalTimeServiceManager创建TimerService实例,这里的InternalTimeServiceManager管理了Task内所有和时间相关的服务,并向所有Operator提供创建和获取TimerService的方法。
1.TimerService的设计与实现
我们先来看下TimerService的设计与实现,如图2-14所示,在DataStream API中提供了TimerService接口,用于获取和操作时间相关的信息。TimerService接口的默认实现有SimpleTimerService,在Flink Table API模块的AbstractProcessStreamOperator.ContextImpl内部类中也实现了TimerService接口。从图中可以看出,SimpleTimerService会将InternalTimerService接口作为内部成员变量,因此在SimpleTimerService中提供的方法基本上都是借助InternalTimerService实现的。
InternalTimerService实际上是TimerService接口的内部版本,而TimerService接口是专门供用户使用的外部接口。InternalTimerService需要按照Key和命名空间进行划分,并提供操作时间和定时器的内部方法,因此不仅是SimpleTimerService通过InternalTimerService操作和获取时间信息以及定时器,其他还有如WindowOperator、IntervalJoinOperator等内置算子也都会通过InternalTimerService提供的方法执行时间相关的操作。
图2-14 TimerService UML关系图
2.TimerService应用举例
接下来我们以KeyedProcessFunction实现类DeduplicateKeepLastRowFunction为例,详细说明在自定义函数中如何通过调用和操作TimerService服务实现时间信息的获取和定时器的注册。
如代码清单2-28所示,KeyedProcessOperator.open()方法主要包括如下逻辑。
·调用getInternalTimerService()方法创建和获取InternalTimerService实例。实际上最终调用的是AbstractStreamOperator.getInternalTimerService()方法获取InternalTimervService实例。
·基于InternalTimerService实例创建SimpleTimerService实例。
·将创建好的SimpleTimerService封装在ContextImpl和OnTimerContextImpl上下文对象中,此时KeyedProcessFunction的实现类就可以通过上下文获取SimpleTimerService实例了。
代码清单2-28 KeyedProcessOperator.open()方法定义
public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); TimerService timerService = new SimpleTimerService(internalTimerService); context = new ContextImpl(userFunction, timerService); onTimerContext = new OnTimerContextImpl(userFunction, timerService); }
DeduplicateKeepLastRowFunction继承并实现了KeyedProcessFunction接口,如代码清单2-29所示,在DeduplicateKeepLastRowFunction.processElement()方法定义中可以看出,调用Context.timerService()方法获取TimerService实现类,然后调用TimerService.currentProcessingTime()方法获取当前的处理时间,接下来调用registerProcessingCleanupTimer()方法注册状态数据清理定时器。
代码清单2-29 DeduplicateKeepLastRowFunction.processElement()方法定义
public void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) throws Exception { if (generateRetraction) { long currentTime = ctx.timerService().currentProcessingTime(); // 注册状态数据清理定时器 registerProcessingCleanupTimer(ctx, currentTime); } processLastRow(input, generateRetraction, state, out); }
对于registerProcessingCleanupTimer()方法,实际上就是调用timerService.registerProcessingTimeTimer(cleanupTime)注册基于处理时间的定时器。
系统时间到达Timer指定的时间后,TimerService会调用和触发注册的定时器,然后调用DeduplicateKeepLastRowFunction.onTimer()方法。从onTimer()方法定义中可以看出,调用cleanupState()方法完成了对指定状态数据的清理操作,如代码清单2-30所示。
代码清单2-30 DeduplicateKeepLastRowFunction.onTimer()方法定义
public void onTimer(long timestamp, OnTimerContext ctx, Collector<BaseRow> out) throws Exception { if (stateCleaningEnabled) { cleanupState(state); } }
3.InternalTimerService详解
TimerService实际上将InternalTimerService进行了封装,然后供StreamOperator中的KeyedProcessFunction调用,接下来我们看InternalTimerService的具体实现。
如图2-15所示,从InternalTimerService的UML关系图中可以看出,InternalTimerService接口实现了如下方法。
·currentProcessingTime():获取当前的处理时间。
·currentWatermark():获取当前算子基于事件时间的Watermark。
·registerProcessingTimeTimer(...):注册基于处理时间的定时器。
·deleteProcessingTimeTimer(...):删除基于处理时间的定时器。
·registerEventTimeTimer(...):注册基于事件时间的定时器。
·deleteEventTimeTimer(...):删除基于事件时间的定时器。
图2-15 InternalTimerService UML关系图
InternalTimerService接口具有InternalTimerServiceImpl的默认实现类,在InternalTimerServiceImpl中,实际上包含了两个比较重要的成员变量,分别为processingTimeService和KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>>队列。其中processingTimeService是基于系统处理时间提供的TimerService,也就是说,基于ProcessingTimeService的实现类可以注册基于处理时间的定时器。TimerHeapInternalTimer队列主要分为processingTimeTimersQueue和eventTimeTimersQueue两种类型,用于存储相应类型的定时器队列。TimerHeapInternalTimer基于Heap堆内存存储定时器,并通过HeapPriorityQueueSet结构存储注册好的定时器。
在InternalTimerServiceImpl中,会记录currentWatermark信息,用于表示当前算子的最新Watermark,实际上InternalTimerServiceImpl实现了基于Watermark的时钟,此时算子会递增更新InternalTimerServiceImpl中Watermark对应的时间戳。此时InternalTimerService会判断eventTimeTimersQueue队列中是否有定时器、是否满足触发条件,如果满足则将相应的TimerHeapInternalTimer取出,并执行对应算子中的onEventTime()回调方法,此时就和ProcessFunction中的onTimer()方法联系在一起了。
这里我们以IntervalJoinOperator为例说明内部算子如何直接调用InternalTimerService注册定时器。
如代码清单2-31所示,在IntervalJoinOperator.processElement()方法中,实际上会调用internalTimerService.registerEventTimeTimer()方法注册基于事件时间的定时器,专门用于数据清理任务。随后internalTimerService会根据指定的cleanupTime完成对窗口中历史状态数据的清理。
代码清单2-31 IntervalJoinOperator.processElement方法
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; if (isLeft) { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); }
如代码清单2-32所示,当StreamOperator算子中的Watermark更新时,就会通过InternalTimeServiceManager通知所有的InternalTimerService实例,这里实际上就是调用InternalTimerServiceImpl.advanceWatermark()方法实现的。从advanceWatermark()方法中可以看出,首先会通过最新的时间更新currentWatermark,然后从eventTimeTimersQueue队列中获取事件时间定时器,最后判断timer.getTimestamp()是否小于接入的time变量,如果小于,则说明当前算子的时间大于定时器中设定的时间,此时就会执行triggerTarget.onEventTime (timer)方法,这里的triggerTarget实际上就是StreamOperator的具体实现类。
代码清单2-32 InternalTimerServiceImpl.advanceWatermark()方法定义
public void advanceWatermark(long time) throws Exception { currentWatermark = time; InternalTimer<K, N> timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer. getTimestamp() <= time) { eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } }
接下来我们看看在算子中如何通过调用AbstractStreamOperator.getInternalTimerService()方法创建和获取InternalTimerService实例。
如代码清单2-33所示,在AbstractStreamOperator.getInternalTimerService()方法中,实际上会调用InternalTimeServiceManager.getInternalTimerService()方法获取InternalTimerService实例。在一个Operator中可以同时创建多个TimerService实例,且必须具有相应的KeySerializer和NamespaceSerializer序列化类,如果不需要区分Namespace类型,也可以使用VoidNamespaceSerializer。
除了name和timerSerializer参数外,getInternalTimerService()方法还需要传递triggerable回调函数作为参数。当触发定时器时会调用Triggerable接口的onEventTime()或onProcessingTime()方法,以触发定时调度需要执行的逻辑,这里的Triggerable接口实现类实际上就是StreamOperator接口的实现类。
代码清单2-33 InternalTimeServiceManager.getInternalTimerService()方法
//获取InternalTimerService public <N> InternalTimerService<N> getInternalTimerService( String name, TimerSerializer<K, N> timerSerializer, Triggerable<K, N> triggerable) { InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService (name, timerSerializer); timerService.startTimerService( timerSerializer.getKeySerializer(), timerSerializer.getNamespaceSerializer(), triggerable); return timerService; }
如代码清单2-34所示,在getInternalTimerService()方法中实际上会调用registerOrGetTimerService()方法注册和获取InternalTimerService实例。在InternalTimeServiceManager.registerOrGetTimerService中可以看出,会事先根据名称从timerServices的HashMap获取已经注册的InternalTimerService,如果没有获取到,则实例化InternalTimerServiceImpl类,创建新的TimerService。
代码清单2-34 InternalTimeServiceManager.registerOrGetTimerService()方法定义
// 注册及获取TimerService <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) { InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) //先从timerServices中获取创建好的TimerService timerServices.get(name); // 如果没有获取到就创建新的timerService if (timerService == null) { timerService = new InternalTimerServiceImpl<>( localKeyGroupRange, keyContext, processingTimeService, createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer), createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer)); timerServices.put(name, timerService); } return timerService; }