在流式数据中,数据是连续的,一般是无限的,对流中的全部元素进行计数是不可能的,因此在流上的聚合须要由window来划定范围,例如过去五分钟内用户浏览量的计算或者最后100个元素的和。window就是一种能够把无限数据切割为有限数据块的手段。
窗口能够由时间或者数量来作区分
1.根据时间进行截取,好比每10分钟统计一次,即时间驱动的[Time Window]
2.根据消息数量进行统计,好比每100个数据统计一次,即数据驱动[Count Window]windows
时间窗口又分为滚动窗口,滑动窗口,和会话窗口
(1)滚动窗口-tumbling windows
时间对齐,窗口长度固定,没有重叠
如图:以固定的长度进行分割,好比一分钟的内的计数网络
开窗方法:session
//滚动窗口 stream.keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(2))) .sum(1) .print();
(2)滑动窗口-sliding windows
时间对齐,窗口长度固定,有重叠,展示的是数据的变化趋势
如图:窗口大小为4,步长为2,每隔两秒统计仅4s的数据ide
开窗方法:oop
//滑动窗口 stream.keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(4))) .sum(1) .print();
(3)会话窗口-session windows
当流中达到多长时间没有新的数据到来,上一个会话窗口就是截至到新数据到来前接收到的最后一条数据,当新数据到来后,上一个窗口将会关闭,开启一个新的窗口。线程
开窗方法:翻译
stream .keyBy(0) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) .sum(1) .print();
(4)没有窗口(全局窗口)-global windows
global window + trigger 一块儿配合才能使用日志
// 单词每出现三次统计一次 stream .keyBy(0) .window(GlobalWindows.create()) //若是不加这个程序是启动不起来的 .trigger(CountTrigger.of(3)) .sum(1) .print(); 执行结果: hello,3 hello,6 hello,9
总结:效果跟CountWindow(3)很像,但又有点不像,由于若是是CountWindow(3),单词每次出现的都是3次,不会包含以前的次数,而咱们刚刚的这个每次都包含了以前的次数。code
针对stream数据中的时间,能够分为如下三种:
Event Time:事件产生的时间,它一般由事件中的时间戳描述。
Ingestion time:事件进入Flink的时间
Processing Time:事件被处理时当前系统的时间blog
案例演示:
原始日志以下
2019-11-11 10:00:01,134 INFO executor.Executor: Finished task in state 0.0
这条数据进入Flink的时间是2019-11-11 20:00:00,102
到达window处理的时间为2019-11-11 20:00:01,100
2019-11-11 10:00:01,134 是Event time
2019-11-11 20:00:00,102 是Ingestion time
2019-11-11 20:00:01,100 是Processing time
思考:
若是咱们想要统计每分钟内接口调用失败的错误日志个数,使用哪一个时间才有意义?
需求:每隔5秒计算最近10秒的单词出现的次数
自定义source,模拟:第 13 秒的时候连续发送 2 个事件,第 16 秒的时候再发送 1 个事件
输出结果:
开始发送事件的时间:16:16:40 (hadoop,2) (hadoop,3) (hadoop,1)
窗口验证过程:
需求:每隔5秒计算最近10秒的单词出现的次数
自定义source,模拟:第 13 秒的时候连续发送 2 个事件,第 16 秒的时候再发送 1 个事件
可是这里在13秒的时候正常发送了一个事件,有一个事件因为网络等其余缘由,没有成功发送,在19秒的时候才发送出去。
输出结果:
开始发送事件的时间:16:18:50 (hadoop,1) (1573287543001,1) (1573287543001,1) (hadoop,3) (1573287546016,1) (1573287543016,1) (1573287546016,1) (hadoop,2) (1573287543016,1)
使用Event Time处理
在源数据流中map获取时间时间 .assignTimestampsAndWatermarks(new EventTimeExtractor() ) EventTimeExtractor()实现AssignerWithPeriodicWatermarks接口,获取事件时间 //拿到第一个事件的 Event Time @Override public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { return element.f1; }
执行结果:
start send event time:16:40:50
(hadoop,1)
(hadoop,3)
(hadoop,1)
如今咱们第三个window的结果已经计算准确了,可是咱们仍是没有完全的解决问题(黄色事件应该在第一个窗口中计数,可是没有)。接下来就须要咱们使用WaterMark机制来解决了。
核心实现是在窗口触发的时候延时一段时间 //window的触发时间 @Nullable @Override public Watermark getCurrentWatermark() { //window延迟5秒触发 return new Watermark(System.currentTimeMillis() - 5000); }
执行结果:
start send event time:16:54:30
(hadoop,2)
(hadoop,3)
(hadoop,1)
使用eventTime的时候如何处理乱序数据?
在使用eventTime做为处理时间的时候须要考虑乱序时间。
咱们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分状况下,流到operator的数据都是按照事件产生的时间顺序来的,可是也不排除因为网络延迟等缘由,致使乱序的产生,特别是使用kafka的话,多个分区的数据没法保证有序(单个分区是保证有序的)。因此在进行window计算的时候,咱们又不能无限期的等下去,必需要有个机制来保证一个特定的时间后,必须触发window去进行计算了。
这个特别的机制,就是watermark,watermark是用于处理乱序事件的。watermark能够
翻译为水位线
需求:获得并打印每隔 3 秒钟统计前 3 秒内的相同的 key 的全部的事件
要求:按事件时间开窗并处理乱序问题
思路:
设置flink的时间处理机制为enventTime,
事件流的事件进入后更新最新的事件时间,
最新的事件时间减掉容许最大的乱序时间为水印时间,
当水印时间大于等于窗口时间时,计算当前窗口数据
核心思想:事件流时间推进窗口的移动和计算
演示数据:
-- window 计算触发的条件 一条一条输入: 000001,1461756862000 000001,1461756866000 000001,1461756872000 000001,1461756873000 000001,1461756874000 000001,1461756876000 000001,1461756877000 输出结果: event = (000001,1461756862000)|Event Time:2016-04-27 19:34:22|Max Event Time:2016-04-27 19:34:22|Current Watermark:2016-04-27 19:34:12 water mark... event = (000001,1461756866000)|Event Time:2016-04-27 19:34:26|Max Event Time:2016-04-27 19:34:26|Current Watermark:2016-04-27 19:34:16 water mark... event = (000001,1461756872000)|Event Time:2016-04-27 19:34:32|Max Event Time:2016-04-27 19:34:32|Current Watermark:2016-04-27 19:34:22 water mark... event = (000001,1461756873000)|Event Time:2016-04-27 19:34:33|Max Event Time:2016-04-27 19:34:33|Current Watermark:2016-04-27 19:34:23 water mark... event = (000001,1461756874000)|Event Time:2016-04-27 19:34:34|Max Event Time:2016-04-27 19:34:34|Current Watermark:2016-04-27 19:34:24 water mark... event = (000001,1461756876000)|Event Time:2016-04-27 19:34:36|Max Event Time:2016-04-27 19:34:36|Current Watermark:2016-04-27 19:34:26 water mark... process start time:2021-01-04 18:26:51 window start time:2016-04-27 19:34:21 [(000001,1461756862000)|2016-04-27 19:34:22] window end time:2016-04-27 19:34:24 water mark... event = (000001,1461756877000)|Event Time:2016-04-27 19:34:37|Max Event Time:2016-04-27 19:34:37|Current Watermark:2016-04-27 19:34:27 water mark... process start time:2021-01-04 18:26:52 window start time:2016-04-27 19:34:24 [(000001,1461756866000)|2016-04-27 19:34:26] window end time:2016-04-27 19:34:27 当2016-04-27 19:34:37的事件时间更新为最大的currentMaxEventTime,此时得到的时间水印是2016-04-27 19:34:27,触发窗口[2016-04-27 19:34:24—2016-04-27 19:34:27)的计算 再输入: 000001,1461756885000 000001,1461756892000 输出: event = (000001,1461756885000)|Event Time:2016-04-27 19:34:45|Max Event Time:2016-04-27 19:34:45|Current Watermark:2016-04-27 19:34:35 water mark... process start time:2021-01-04 18:27:05 window start time:2016-04-27 19:34:30 [(000001,1461756872000)|2016-04-27 19:34:32] window end time:2016-04-27 19:34:33 water mark... event = (000001,1461756892000)|Event Time:2016-04-27 19:34:52|Max Event Time:2016-04-27 19:34:52|Current Watermark:2016-04-27 19:34:42 water mark... process start time:2021-01-04 18:30:38 window start time:2016-04-27 19:34:33 [(000001,1461756873000)|2016-04-27 19:34:33, (000001,1461756874000)|2016-04-27 19:34:34] window end time:2016-04-27 19:34:36 process start time:2021-01-04 18:30:38 window start time:2016-04-27 19:34:36 [(000001,1461756876000)|2016-04-27 19:34:36, (000001,1461756877000)|2016-04-27 19:34:37] window end time:2016-04-27 19:34:39 新的事件进入后更新了最新的时间时间,触发新的窗口计算 若是再输入: 000001,1461756862000 输出: event = (000001,1461756862000)|Event Time:2016-04-27 19:34:22|Max Event Time:2016-04-27 19:34:52|Current Watermark:2016-04-27 19:34:42 能够看到此时窗口时间已超过事件时间,会丢弃这个事件,不作处理
总结:window触发的时间
(1)丢弃
这个是默认的处理方式
(2)allowedLateness
指定容许数据延迟的时间
核心思想:在容许最大延迟的基础上再加一个容忍时间。
).assignTimestampsAndWatermarks(new EventTimeExtractor() ) .keyBy(0) .timeWindow(Time.seconds(3)) .allowedLateness(Time.seconds(2)) // 容许事件迟到 2 秒 .process(new SumProcessWindowFunction()) .print().setParallelism(1); 输入数据: 000001,1461756870000 000001,1461756883000 000001,1461756870000 000001,1461756871000 000001,1461756872000 000001,1461756884000 000001,1461756870000 000001,1461756871000 000001,1461756872000 000001,1461756885000 000001,1461756870000 000001,1461756871000 000001,1461756872000
总结:
当咱们设置容许迟到 2 秒的事件,第一次 window 触发的条件是 watermark >=
window_end_time;
第二次(或者屡次)触发的条件是 watermark < window_end_time + allowedLateness。
(3)sideOutputLateData
收集迟到的数据
输入:
000001,1461756870000 000001,1461756883000 迟到的数据 000001,1461756870000 000001,1461756871000 000001,1461756872000
一个window可能会接受到多个waterMark,咱们以最小的为准。
//把并行度设置为2 env.setParallelism(2); 输入数据: 000001,1461756870000 000001,1461756883000 000001,1461756888000 输出结果: 当前线程ID:55event = (000001,1461756883000)|19:34:43|19:34:43|19:34:33 当前线程ID:56event = (000001,1461756870000)|19:34:30|19:34:30|19:34:20 当前线程ID:56event = (000001,1461756888000)|19:34:48|19:34:48|19:34:38 处理时间:19:31:25 window start time : 19:34:30 2> [(000001,1461756870000)|19:34:30] window end time : 19:34:33
ID为56的线程有两个WaterMark:20,38那么38这个会替代20,因此ID为56的线程的WaterMark是38而后ID为55的线程的WaterMark是33,而ID为56是WaterMark是38,会在里面求一个小的值做为waterMark,就是33,这个时候会触发Window为[30-33)的窗口,那这个窗口里面就有(000001,1461756870000)这条数据。