本文翻译自flink官网:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.htmlhtml
Assigning Timestamps程序员
Source Functions with Timestamps and Watermarksapache
Timestamp Assigners / Watermark Generatorside
Timestamps per Kafka Partition性能
本章对应执行在事件时间的程序。事件事件、处理时间、摄取时间(收到数据的时间)的介绍能够跳转到 introduction to event time 。spa
要处理事件时间,流程序须要相应地设置时间特性。翻译
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
为了处理事件时间,Flink须要知道事件的时间戳,这意味着流中的每一个元素都须要分配其事件时间戳。这一般经过从元素中的某个字段访问/提取时间戳来完成。code
为了处理事件时间,Flink须要知道事件的时间戳,这意味着流中的每一个元素都须要指定其事件时间戳(用事件的时间戳来指定)。这一般经过从元素中的某个字段访问/提取时间戳来完成。orm
时间戳分配与生成水印密切相关,水印告诉系统事件时间的进展。htm
有两种方法能够分配时间戳并生成水印:
直接在数据流源中
经过时间戳分配器/水印生成器:在Flink中,时间戳分配器还定义要发出的水印
注意:两周时间戳和水印都指定为毫秒,而且自Java纪元 1970-01-01 00:00:00 开始
stream source 能够直接为它们生成的元素分配时间戳,它们也能够发出水印。完成此操做后,不须要时间戳分配器。请注意,若是使用时间戳分配器,则将覆盖source提供的任什么时候间戳和水印。
要直接为source中的元素分配时间戳,source必须在SourceContext上使用collectWithTimestamp(...)方法。要生成水印,source必须调用emitWatermark(水印)功能。
下面是一个简单的(没有checkpoint)源代码示例,用于分配时间戳并生成水印:
override def run(ctx: SourceContext[MyType]): Unit = { while (/* condition */) { val next: MyType = getNext() ctx.collectWithTimestamp(next, next.eventTimestamp) if (next.hasWatermarkTime) { ctx.emitWatermark(new Watermark(next.getWatermarkTime)) } } }
时间戳分配器获取流并生成带有带时间戳元素和水印的新流。若是原始流已经有时间戳和/或水印,则时间戳分配器会覆盖它们。
时间戳分配器一般在数据源以后当即指定,但并不是严格要求这样作。例如,常见的模式是在时间戳分配器以前解析(MapFunction)和过滤(FilterFunction)。在任何状况下,须要在事件时间的第一个操做以前指定时间戳分配器(例如第一个窗口操做)。做为一种特殊状况,当使用Kafka做为流式传输做业的source时,Flink容许在source(或消费者)内指定时间戳分配器/水印发射器。有关如何执行此操做的更多信息,请参阅Kafka Connector文档。
注意:本节的其他部分介绍了程序员必须实现的主要接口,以便建立本身的时间戳提取器/水印发射器。要查看Flink附带的预先实现的提取器,请参阅预约义的时间戳提取器/水印发射器页面。
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream: DataStream[MyEvent] = env.readFile( myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, FilePathFilter.createDefaultFilter()) val withTimestampsAndWatermarks: DataStream[MyEvent] = stream .filter( _.severity == WARNING ) .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()) withTimestampsAndWatermarks .keyBy( _.getGroup ) .timeWindow(Time.seconds(10)) .reduce( (a, b) => a.add(b) ) .addSink(...)
AssignerWithPeriodicWatermarks按期分配时间戳并生成水印(可能取决于流元素,或纯粹基于处理时间)。
生成水印的间隔(每n毫秒)由ExecutionConfig.setAutoWatermarkInterval(...)定义。每次调用分配器的getCurrentWatermark()方法,若是返回的水印非空且大于前一个水印,则会发出新的水印。
这里咱们展现了两个使用周期性水印生成的时间戳分配器的简单示例。请注意,Flink附带了一个BoundedOutOfOrdernessTimestampExtractor,相似于下面显示的BoundedOutOfOrdernessGenerator,您能够在这里阅读。
/** * This generator generates watermarks assuming that elements arrive out of order, * but only to a certain degree. The latest elements for a certain timestamp t will arrive * at most n milliseconds after the earliest elements for timestamp t. */ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { val maxOutOfOrderness = 3500L // 3.5 seconds var currentMaxTimestamp: Long = _ override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { val timestamp = element.getCreationTime() currentMaxTimestamp = max(timestamp, currentMaxTimestamp) timestamp } override def getCurrentWatermark(): Watermark = { // return the watermark as current highest timestamp minus the out-of-orderness bound new Watermark(currentMaxTimestamp - maxOutOfOrderness) } } /** * This generator generates watermarks that are lagging behind processing time by a fixed amount. * It assumes that elements arrive in Flink after a bounded delay. */ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { val maxTimeLag = 5000L // 5 seconds override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { element.getCreationTime } override def getCurrentWatermark(): Watermark = { // return the watermark as current time minus the maximum time lag new Watermark(System.currentTimeMillis() - maxTimeLag) } }
要每一个时间均可能生成新水印时生成水印(To generate watermarks whenever a certain event indicates that a new watermark might be generated,注:只会生成大于上一次水印的新水印),请使用AssignerWithPunctuatedWatermarks。对于此类,Flink将首先调用extractTimestamp(...)方法为元素分配时间戳,而后当即调用该元素上的checkAndGetNextWatermark(...)方法。
checkAndGetNextWatermark(...)方法传递在extractTimestamp(...)方法中分配的时间戳,并能够决定是否要生成水印。每当checkAndGetNextWatermark(...)方法返回非空水印,而且该水印大于最新的先前水印时,将发出该新水印。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] { override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { element.getCreationTime } override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = { if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null } }
注意:能够在每一个事件上生成水印。然而,由于每一个水印在下游引发一些计算,因此过多的水印会下降性能。
Timestamps per Kafka Partition
当使用Apache Kafka做为数据源时,每一个Kafka分区可能具备简单的事件时间模式(升序时间戳或有界无序)。可是,当从Kafka消费流时,多个分区一般并行消耗,交错来自分区的事件并破坏每一个分区模式(这是Kafka的消费者客户端工做的固有方式)。
在这种状况下,您可使用Flink的Kafka分区感知水印生成。使用该功能,根据Kafka分区在Kafka消费者内部生成水印,而且每一个分区水印的合并方式与在流shuffle上合并水印的方式相同。
例如,若是事件时间戳严格按每一个Kafka分区升序,则使用升序时间戳水印生成器生成每分区水印将产生完美的总体水印。
下图显示了如何使用per-Kafka分区水印生成,以及在这种状况下水印如何经过流数据流传播。
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props) kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] { def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp }) val stream: DataStream[MyType] = env.addSource(kafkaSource)
欢迎关注Flink菜鸟公众号,会不按期更新Flink(开发技术)相关的推文