时间戳和Watermark生成html
本文翻译自Generating Timestamp / Watermarksapache
------------------------------------------------------------------api
本文是Flink在使用事件时间(Event Time)时相关内容,有关事件时间、处理时间和提取时间的介绍,请见event time introduction。app
流程序须要设置时间特征为Event time,才能在程序中使用事件时间。ide
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);性能
为了使事件时间能够正常使用,Flink须要知道时间的时间戳,即流中的每一个element都须要被赋予它本身的时间戳。Flink一般会从element的一些域访问/提取时间戳(That happens usually by accessing/extracting the timestamp from some field in the element.)。spa
时间戳的赋值一般与Watermark的生成紧密相关,其中Watermark生成负责通知系统事件时间的增加状况。翻译
时间戳赋值和Watermark生成的方式有两种:orm
1. 直接在数据流源处进行htm
2. 经过一个Timestamp assigner / watermark generator:在Flink中,Timestamp assigner一样会定义watermark的发送行为
注意:时间戳和Watermark都是使用从Java历元(epoch) “1970-01-01 T00.00.00Z”开始的毫秒数定义的
流的源能够在它们生产的element中直接赋值时间戳以及发送Watermark。在此状况下,咱们不须要Timestamp Assigner。
要在Source方法中向element直接赋值时间戳,Source方法必须在SourceContext上调用方法collectWithTimestamp(…)。要在Source中生成Watermark,Source必须调用emitWatermark(Watermark)方法。
在下例的(非检查点的)Source方法中,方法直接向element赋值时间戳,而且根据特殊事件生成Watermark:
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
注意:若是流程序在已经拥有时间戳的流上继续使用TimestampAssigner,流中element的原有时间戳将被TimestampAssigner重写。相似地,Watermark也会一样被重写。
Timestamp Assigner接收一个流而且产生一个带有时间戳赋值element和Watermark的新的流。若是原有的流已经拥有了时间戳或Watermark,则Timestamp Assigner将会重写它们。
一般在紧接着数据源以后会定义Timestamp Assigner,但这并非严格要求的。例如在通用的模式中,会在Timestamp Assigner以前进行parse(MapFunction)和filter(FilterFunction)操做。不论在什么状况下,Timestamp Assigner都须要在第一个使用事件时间的Operation(如第一个窗口Operation)以前定义。而在流Job中使用Kafka做为数据源是一个特殊状况,Flink容许在数据源(或数据消费者consumer)内部定义Timestamp Assigner和Watermark emitter,更多相关信息请见Kafka Connector documentation。
注意:本节余下内容呈现了一个开发者建立本身的Timestamp Assigner 和 watermark emitter所须要实现的主要接口。有关Flink自带的预先实现的extractor,请见Pre-defined Timestamp Extractors / Watermark Emitters
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
周期性Watermark
AssignerWithPeriodicWatermark赋值时间戳并周期性生成(生成方式有多是依靠流的element,或者纯粹基于处理时间)。
生成Watermark的时间周期区间(每n毫秒)的大小能够经过ExecutionConfig.setAutoWatermarkInterval(…)设置。每一次生成时,都将会调用Assigner的getCurrentWatermark()方法,若是返回的Watermark是非null且大于前一个Watermark,则会发送一个新的Watermark。
下面是两个生成周期性Watermark和Timestamp Assigner的例子
/**
* This generator generates watermarks assuming that elements come out of order to a certain degree only.
* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
* elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a certain amount.
* It assumes that elements arrive in Flink after at most a certain time.
*/
public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
带标点(punctuated)Watermark
为了在某事件下就产生Watermark,咱们须要使用AssignerWithPunctuatedWatermarks。在该类中,Flink会先调用extractTimestamp(…)方法来给element赋值一个时间戳,而后针对该element即刻调用checkAndGetNextWatermark(…)方法来返回一个非null的Watermark。
checkAndGetNextWatermark(…)方法将得到在extractTimestamp(…)方法中得到的时间戳,并决定是否生成Watermark。一旦checkAndGetNextWatermark(…)方法返回一个非null的Watermark,而且该Watermark大于最近的上一个Watermark,则发送该新的Watermark。
public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return element.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
注意:在每一个事件上都生成一个Watermark是可能存在的,可是因为每一个Watermark都会致使下游的计算开销,过多的Watermark会下降程序的性能