对于Flink来讲,Watermark是个很难绕过去的概念,有的翻译为水位线,有的翻译为水印,都是同一个东西,网络
watermark是一种衡量Event Time进展的机制,它是数据自己的一个隐藏属性。一般基于Event Time的数据,自身都包含一个timestamp.watermark是用于处理乱序事件的,而正确的处理乱序事件,一般用watermark机制结合window来实现。ide
流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分状况下,流到operator的数据都是按照事件产生的时间顺序来的,可是也不排除因为网络、背压等缘由,致使乱序的产生(out-of-order或者说late element)。翻译
可是对于late element,咱们又不能无限期的等下去,必需要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。code
以当前最大的时间戳,减去固定的延迟做为watermark的时间戳,
也是就是每条数据出来都会算下,全部数据里面最大的那个值,再减去固定的延迟blog
我的理解不必定对: 以这个为例:watermark为2,当看到5的时候,watermark是5-2=3,假设桶设置的4,那么0-4的桶就不会关闭,继续往下走
下一个数字3,发现最大数仍是5,那5-2=3,0-4的桶仍是不动,当走到6的时候,最大是6,那6-2=4,这时候0-4的桶就会关掉了,以此类推
watermark是一条特殊的数据记录
watermark必须单调递增,以确保任务的事件时间时钟在向前推动,而不是在后退
watermark与数据的时间戳相关排序
上游向下游传递的时候会把watermark广播出去,
下游可能会接收到多个上游的watermark数据,会在内部创建一个分区watermark,以最小的数据做为最终的watermark
好比上游有3个数据源,输出的watermark分别为4,3,5 那么在下游会把3个数据所有接收到,最终输出最小的为本身的watermark也就是3事件
下面这个例子,有4个上游数据,watermark分为是4,7,6,6,分区数据watermark数据是2,4,3,6
第一张图: 当上游第1,2,3个数据都没来的时候,全部分区数据最小的是2,因此输出当前事件时间时间为2
第二张图:上游第1个数据来了,也就是4数据把原来的2覆盖了,这时候数据变成了4,4,3,6 最小的数据变成3,因此输出当前事件时间为3
第三张图:上游第2个数据7来了,也就是7把原来的4覆盖了,这时候数据变成了4,7,3,6 最小的数据仍是3,因此输出当前事件时间仍是3
第四张图:上游第3个数据6来了,也就是6把3给覆盖了,这时候数据变成了4,7,6,6 最小的数据变成了4,因此输出当前事件时间是4element
Event Time的使用必定要指定数据源中的时间戳
调用assignTimestampAndWatermarks方法,传入一个BoundedOutOfOrdernessTimestampExtractor,就能够指定watermark开发
//先转换成样例类类型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割数据,获取结果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是由于默认分割后是字符串类别 }) // .assignAscendingTimestamps(_.timestamp ) //这种是当时间确定是按照时间排序的,没有乱序的状况,升序提取时间戳(若是数据中timestamp为秒,能够*1000L转为毫秒) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReadingTest5](Time.seconds(3)) { // 指定乱序最大等3 override def extractTimestamp(t: SensorReadingTest5): Long = t.timestamp * 1000L //指定watermark的字段 })
在Flink中,watermark由应用开发人员生成,这一般须要对对应的领域有必定的了解
若是watermark设置的延迟过久,收到结果的速度可能就会很慢,解决办法是在水位线到达以前输出一个近似结果
而若是watermark到达得太早,则可能收到错误结果,不过flink处理迟到数据的机制能够解决这个问题字符串
AssignerWithPeriodicWatermarks周期性地分配timestamp和生成watermark(可能依赖于元素或者纯粹基于处理时间)。
watermark产生的事件间隔(每n毫秒)是经过ExecutionConfig.setAutoWatermarkInterval(...)来定义的,每当分配器的getCurrentWatermark()方法呗调用时,若是返回的watermark是非空而且大于上一个watermark的话,一个新的watermark将会被发射。
间断式的生成watermark。和周期性水印不同,这种方式不是固定时间的,而是能够根据须要对每条数据进行筛选和处理
对于乱序数据处理,flink提供3重保障
一、watermark: 能够设置小一点hold住大部分状况,提供近似正确的结果
二、.allowedLateness(Time.minutes(1)) //容许处理迟到数据1分钟
三、.sideOutputLateData(new OutputTag(String, Double, Long)) //侧输出流,先输出到一个旁路,打上标签,保证数据不会丢