生成Timestamp和Watermark 的三个重载方法介绍可参见上一篇博客: Flink assignAscendingTimestamps 生成水印的三个重载方法html
以前想研究下Flink是怎么处理乱序的数据,看了相关的源码,加上测试,发现获得了与预期彻底不相同的结果。java
预期是:乱序到达的数据,flink能够基于数据的事件时间,自动整理数据,依次计算输出ide
结果是:在assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]指派timestamp和watermark的状况下,乱序到达的数据:迟到的数据直接从侧边输出了,超前的数据直接结束当前的窗口,开启超前数据对应的窗口,后面到达的正常数据,直接做为迟到数据处理了函数
在获得上面的结果的过程当中,仔细的研究了一下生产Timestamp和Watermark相关的源码。post
Flink DataStream API 目前只能经过 assignTimestampsAndWatermarks方法建立时间戳和水印有两种生成模式:
性能
一、基于事件时间建立每一个事件的Timestamp 和 基于事件时间周期性的建立Watermark(默认周期为200ms)测试
二、基于事件时间建立每一个事件的Timestamp 和 基于事件时间每一个事件都建立一个Watermark(若是新的Watermark大于当前的Watermark,才会发出)this
事件时间下,事件的Timestamp的建立都是直接依赖于事件携带的事件时间,而Watermark则是基于事件时间生成Watermark,因此有周期性建立Watermark和标记的Watermark(With Punctuated Watermarks)的区分(官网中基于Kafka 的分区时间做为Watermark 也是周期性的生成Watermark,只不过传入的事件时间改成事件在kafka中的timestamp了) url
一、周期性的建立Watermarkspa
周期性的建立Watermark的有两种方法(kafka的分区时间的忽略):
assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]
assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]
1.1 assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] 对应源码
调用方法以下:
.assignAscendingTimestamps(element => { // 方便打断点debug println("xxxxxx : " + element.createTime) sdf.parse(element.createTime).getTime })
周期性的建立Watermark 是在 TimestampsAndPeriodicWatermarksOperator 中生成、发出,对应的时间来源是调用不一样的生成timestamp 和 Watermark 的实现类
TimestampsAndPeriodicWatermarksOperator 相应代码以下:
/* 处理事件元素: 获取对应的事件时间的时间戳,替换事件默认的时间戳(若是数据源是kafka,时间戳就是数据在kafka中的时间戳) */ @Override public void processElement(StreamRecord<T> element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); } /* 处理时间(Watermark) : 获取当前时间对应的上一次的事件时间,生成新的watermark,新的watermark的时间戳大于当前的watermark,就发出新的watermark */ @Override public void onProcessingTime(long timestamp) throws Exception { // 从这里能够看到,每200ms 打印一次 System.out.println("timestamp : " + timestamp + ", system.current : " + System.currentTimeMillis()); // register next timer Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } long now = getProcessingTimeService().getCurrentProcessingTime();
// 注册timer ,周期性的调用,下面会展开 getProcessingTimeService().registerTimer(now + watermarkInterval, this); }
在这种生成timestamp 和 Watermark 的状况下,userFunction 对应的类是:AscendingTimestampExtractor
对应源码以下:
@Override public final long extractTimestamp(T element, long elementPrevTimestamp) { // 调用 assignAscendingTimestamps 的参数函数 final long newTimestamp = extractAscendingTimestamp(element); if (newTimestamp >= this.currentTimestamp) {
// 这是为了下面生成Watermark的方法,总能获得 大于等于 当前Watermark的 时间戳 this.currentTimestamp = newTimestamp; return newTimestamp; } else { violationHandler.handleViolation(newTimestamp, this.currentTimestamp); return newTimestamp; } } @Override public final Watermark getCurrentWatermark() { return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1); }
timestamp的生成: TimestampsAndPeriodicWatermarksOperator#processElement 方法,调用AscendingTimestampExtractor#extractTimestamp 再调用 用户代码中具体生成timestamp 的方法,最终生成事件对应的timestamp,替换原有的timestamp
Watermark的生成:TimestampsAndPeriodicWatermarksOperator#onProcessingTime 方法,调用 AscendingTimestampExtractor#getCurrentWatermark, 返回生成timestamp 时的 currentTimestamp -1 ,生成 Watermark,若是生成的Watermark的timestamp 大于当前的 Watermark的timestamp 就发出新的Watermark
1.2 assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]
调用方法以下:
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) { override def extractTimestamp(element: LateDataEvent): Long = { println("current timestamp : " + sdf.parse(element.createTime).getTime) sdf.parse(element.createTime).getTime } })
在这种生成timestamp 和 Watermark 的状况下,userFunction 对应的类是:BoundedOutOfOrdernessTimestampExtractor
对应源码
@Override public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. long potentialWM = currentMaxTimestamp - maxOutOfOrderness; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } @Override public final long extractTimestamp(T element, long previousElementTimestamp) { long timestamp = extractTimestamp(element); if (timestamp > currentMaxTimestamp) {
// 这是为了上面上次Watermark的方法总能获取到 大于等于 当前Watermark的时间戳 currentMaxTimestamp = timestamp; } return timestamp; }
基本上与上面相同,只是这种状况下,生成Watermark会 减去相应的 maxOutOfOrderness (容许延迟时间,就是代码中BoundedOutOfOrdernessTimestampExtractor对应的参数)
之因此说是周期性的,是由于生成Watermark的方法是周期性调用的:
// 注册timer 按期执行
getProcessingTimeService().registerTimer(now + watermarkInterval, this); // 对应 watermarkInterval 来自与系统配置 watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); // 对应配置, 默认 200ms env.getConfig.setAutoWatermarkInterval(400)
看代码可知,生成timestamp和Watermark是两条线,timestamp 是每一个事件消息都会生成,而Watermark 是周期的
二、标记的Watermark(With Punctuated Watermarks)
这种Watermark的生成只有一种,对应代码以下:
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() { // check extractTimestamp emitted watermark is non-null and large than previously 生成当前事件的Watermark override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp) } // generate next watermark 生成当前事件的timestamp override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = { val eventTime = sdf.parse(element.createTime).getTime eventTime } })
对应上面生成添加时间戳到事件中和发出Watermark 在 TimestampsAndPunctuatedWatermarksOperator中具体以下:
@Override public void processElement(StreamRecord<T> element) throws Exception { final T value = element.getValue(); // extractTimestamp 方法就是assignTimestampsAndWatermarks 中的 extractTimestamp 生成事件的时间戳 final long newTimestamp = userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); // checkAndGetNextWatermark 方法就是assignTimestampsAndWatermarks 中的 checkAndGetNextWatermark,检查Watermark final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
// 新的Watermark大于当前的Watermark才会发出 if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } }
这里能够看出,每条数据都会生成 tiemstamp 和 Watermark(不必定会发出,若是数据都是正常的,Watermark的消息会和事件的消息同样多,因此会影响性能)
搞定。
欢迎关注Flink菜鸟公众号,会不按期更新Flink(开发技术)相关的推文