Flink assignAscendingTimestamps 生成水印的三个重载方法

先简单介绍一下Timestamp 和Watermark 的概念: json

1. Timestamp和Watermark都是基于事件的时间字段生成的
2. Timestamp和Watermark是两个不一样的东西,而且一旦生成都跟事件数据没有关系了(全部即便事件中再也不包含生成Timestamp和Watermark的字段也不要紧)
3. 事件数据和 Timestamp 一一对应(事件在流中传递以StreamRecord对象表示,value 和 timestamp 是它的两个成员变量)
4. Watermark 在生成以后与事件数据没有直接关系,Watermark 做为一个消息,和事件数据同样在流中传递(Watermark 和StreamRecord 具备相同的父类:StreamElement)
5. Timestamp 与 Watermark 在生成以后,会在下游window算子中作比较,判断事件数据是不是过时数据
6. 只有window算子才会用Watermark判断事件数据是否过时

Flink 在流上手动生成水印有三个重载的方法(忽略过时的一个)ide

 

  1. assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]性能

此方法是数据流的快捷方式,其中已知元素时间戳在每一个并行流中单调递增。在这种状况下,系统能够经过跟踪上升时间戳自动且完美地生成水印。spa

val input = env.addSource(source)
.map(json => {
        val id = json.get("id").asText()
        val createTime = json.get("createTime").asText()
        val amt = json.get("amt").asText()
        LateDataEvent("key", id, createTime, amt)
      })
      // flink auto create timestamp & watermark
      .assignAscendingTimestamps(element => sdf.parse(element.createTime).getTime)

注:这种方法建立时间戳与水印最简单,返回一个long类型的数字就能够了3d

 

2.assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] code

基于给定的水印生成器生成水印,即便没有新元素到达也会按期检查给定水印生成器的新水印,以指定容许延迟时间
val input = env.addSource(source)
      .map(json => {
        val id = json.get("id").asText()
        val createTime = json.get("createTime").asText()
        val amt = json.get("amt").asText()
        LateDataEvent("key", id, createTime, amt)
      })
      // assign timestamp & watermarks periodically(按期生成水印)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) {
       override def extractTimestamp(element: LateDataEvent): Long = {
         println("want watermark : " + sdf.parse(element.createTime).getTime)
         sdf.parse(element.createTime).getTime
       }
     })
      

3.assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]对象

此方法仅基于流元素建立水印,对于经过[[AssignerWithPunctuatedWatermarks#extractTimestamp(Object,long)]]处理的每一个元素,
调用[[AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark()]]方法,若是返回的水印值大于之前的
水印,会发出新的水印,
此方法能够彻底控制水印的生成,可是要注意,每秒生成数百个水印会影响性能


val input = env.addSource(source)
      .map(json => {
        val id = json.get("id").asText()
        val createTime = json.get("createTime").asText()
        val amt = json.get("amt").asText()
        LateDataEvent("key", id, createTime, amt)
      })
      // assign timestamp & watermarks every event
      .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() {
      // check extractTimestamp emitted watermark is non-null and large than previously
      override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = {
        new Watermark(extractedTimestamp)
      }
      // generate next watermark
      override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = {
        val eventTime = sdf.parse(element.createTime).getTime
        eventTime
      }
    })

注:本文基于所有事件时间blog

 欢迎关注Flink菜鸟公众号,会不按期更新Flink(开发技术)相关的推文
相关文章
相关标签/搜索