Flink Watermark 机制浅析(透彻)

Flink 为实时计算提供了三种时间,即事件时间(event time)、摄入时间(ingestion time)和处理时间(processing time)。在进行 window 计算时,使用摄入时间或处理时间的消息都是以系统的墙上时间(wall clocks)为标准,所以事件都是按序到达的。然而若是使用更为有意义的事件时间则会须要面对乱序事件问题(out-of-order events)和迟到事件问题(late events)。针对这两个问题,Flink 主要采用了以水位线(watermark)为核心的机制来应对。html

窗口与水位线

当基于事件时间的数据流进行窗口计算时,最为困难的一点是如何肯定对应当前窗口的事件已尽所有到达。好比须要统计最近5分钟打开音乐播放器的用户数,服务端怎么确保聚合计算时已经收集好全部用户最近5分钟的打开播放器日志?事实上不存在能百分百准确判断的方法,所以业界经常使用的方法是基于已经收集的消息来估算是否还有消息未到达,这就是水位线的思想。算法

水位线其实是一个时间戳,意义是早于该时间的消息已经彻底抵达计算引擎,即假设不会再有时间小于水位线的事件到达。这个假设是触发窗口计算的基础,只有水位线越过窗口对应的结束时间,窗口才会关闭和进行计算。apache

水位线的计算

理想状况下水位线应该与处理时间一致,而且处理时间与事件时间只相差常数时间甚至为零,这意味着消息产生后立刻被处理。然而水位线的计算老是存在必定的延迟(见图1),具体的延迟根据水位线实现的不一样而也有所差异。Flink 提供了常规的按期水位线以及定制化的标点水位线两种生成水位线的方式供用户选择。ide


Ideal Watermark versus Actual Watermark

图1.理想水位线与实际水位线spa

 

按期水位线

按期水位线(Periodic Watermark)按照固定时间间隔生成新的水位线,不论是否有新的消息抵达。水位线提高的时间间隔是由用户设置的,在两次水位线提高时隔内会有一部分消息流入,用户能够根据这部分数据来计算出新的水位线。举个例子,最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。日志

应用按期水位线须要实现AssignerWithPeriodicWatermarks API,如下是 Flink 官网提供的按期水位线的实现例子。code

 

1htm

2blog

3事件

4

5

6

7

8

9

10

11

12

13

14

15

16

17

 

class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

val maxOutOfOrderness = 3500L; // 3.5 seconds

var currentMaxTimestamp: Long;

override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {

val timestamp = element.getCreationTime()

currentMaxTimestamp = max(timestamp, currentMaxTimestamp)

timestamp;

}

override def getCurrentWatermark(): Watermark = {

// return the watermark as current highest timestamp minus the out-of-orderness bound

new Watermark(currentMaxTimestamp - maxOutOfOrderness);

}

}

其中extractTimestamp用于从消息中提取事件时间,而getCurrentWatermark用于生成新的水位线,新的水位线只有大于当前水位线才是有效的。每一个窗口都会有该类的一个实例,所以能够利用实例的成员变量保存状态,好比上例中的当前最大时间戳。

标点水位线

标点水位线(Punctuated Watermark)经过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于什么时候收到标记事件。

应用标点水位线须要实现AssignerWithPunctuatedWatermarks API,如下是 Flink 官网提供的标点水位线的实现例子。

 

1

2

3

4

5

6

7

8

9

10

 

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {

element.getCreationTime

}

override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {

if (element.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null

}

}

其中extractTimestamp用于从消息中提取事件时间,checkAndGetNextWatermark用于检查事件是否标点事件,如果则生成新的水位线。不一样于按期水位线定时调用getCurrentWatermark,标点水位线是每接受一个事件就须要调用checkAndGetNextWatermark,若返回值非 null 且新水位线大于当前水位线,则触发窗口计算。

迟到事件

虽然说水位线代表着早于它的事件不该该再出现,可是上如上文所讲,接收到水位线之前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和通常乱序事件不一样的是它们的乱序程度超出了水位线的预计,致使窗口在它们到达以前已经关闭。

迟到事件出现时窗口已经关闭并产出了计算结果,所以处理的方法有3种:

  1. 从新激活已经关闭的窗口并从新计算以修正结果。
  2. 将迟到事件收集起来另外处理。
  3. 将迟到事件视为错误消息并丢弃。

Flink 默认的处理方式是第3种直接丢弃,其余两种方式分别使用Side OutputAllowed Lateness

Side Output机制能够将迟到事件单独放入一个数据流分支,这会做为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

Allowed Lateness机制容许用户设置一个容许的最大迟到时长。Flink 会再窗口关闭后一直保存窗口的状态直至超过容许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口从新计算。由于保存窗口状态须要额外内存,而且若是窗口计算使用了 ProcessWindowFunction API 还可能使得每一个迟到事件触发一次窗口的全量计算,代价比较大,因此容许迟到时长不宜设得太长,迟到事件也不宜过多,不然应该考虑下降水位线提升的速度或者调整算法。

参考文献

1.Flink 官方文档 - Event Time
2.Flink – process watermark

相关文章
相关标签/搜索