Flink 认为 Batch 是 Streaming 的一个特例,因此 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了很是完善的窗口机制,这是我认为的 Flink 最大的亮点之一(其余的亮点包括消息乱序处理,和 checkpoint 机制)。本文咱们将介绍流式处理中的窗口概念,介绍 Flink 内建的一些窗口和 Window API,最后讨论下窗口在底层是如何实现的。html
在流处理应用中,数据是接二连三的,所以咱们不可能等到全部数据都到了才开始处理。固然咱们能够每来一个消息就处理一次,可是有时咱们须要作一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了咱们的网页。在这种状况下,咱们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。apache
窗口能够是时间驱动的(Time Window,例如:每30秒钟),也能够是数据驱动的(Count Window,例如:每一百个元素)。一种经典的窗口分类能够分红:翻滚窗口(Tumbling Window,无重叠),滚动窗口(Sliding Window,有重叠),和会话窗口(Session Window,活动间隙)。windows
咱们举个具体的场景来形象地理解不一样窗口的概念。假设,淘宝网会记录每一个用户每次购买的商品个数,咱们要作的是统计不一样窗口中用户购买商品的总数。下图给出了几种经典的窗口切分概述图:api
上图中,raw data stream 表明用户的购买行为流,圈中的数字表明该用户本次购买的商品个数,事件是按时间分布的,因此能够看出事件之间是有time gap的。Flink 提供了上图中全部的窗口类型,下面咱们会逐一进行介绍。session
就如名字所说的,Time Window 是根据时间对数据流进行分组的。这里咱们涉及到了流处理中的时间问题,时间问题和消息乱序问题是紧密关联的,这是流处理中现存的难题之一,咱们将在后续的 EventTime 和消息乱序处理中对这部分问题进行深刻探讨。这里咱们只须要知道 Flink 提出了三种时间的概念,分别是event time(事件时间:事件发生时的时间),ingestion time(摄取时间:事件进入流处理系统的时间),processing time(处理时间:消息被计算处理的时间)。Flink 中窗口机制和时间类型是彻底解耦的,也就是说当须要改变时间类型时不须要更改窗口逻辑相关的代码。并发
Tumbling Time Window
如上图,咱们须要统计每一分钟中用户购买的商品的总数,须要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window)。翻滚窗口能将数据流切分红不重叠的窗口,每个事件只能属于一个窗口。经过使用 DataStream API,咱们能够这样实现:ide
// Stream of (userId, buyCnt) val buyCnts: DataStream[(Int, Int)] = ... val tumblingCnts: DataStream[(Int, Int)] = buyCnts // key stream by userId .keyBy(0) // tumbling time window of 1 minute length .timeWindow(Time.minutes(1)) // compute sum over buyCnt .sum(1) |
Sliding Time Window
可是对于某些应用,它们须要的窗口是不间断的,须要平滑地进行窗口聚合。好比,咱们能够每30秒计算一次最近一分钟用户购买的商品总数。这种窗口咱们称为滑动时间窗口(Sliding Time Window)。在滑窗中,一个元素能够对应多个窗口。经过使用 DataStream API,咱们能够这样实现:函数
val slidingCnts: DataStream[(Int, Int)] = buyCnts .keyBy(0) // sliding time window of 1 minute length and 30 secs trigger interval .timeWindow(Time.minutes(1), Time.seconds(30)) .sum(1) |
Count Window 是根据元素个数对数据流进行分组的。源码分析
Tumbling Count Window
当咱们想要每100个用户购买行为事件统计购买总数,那么每当窗口中填满100个元素了,就会对窗口进行计算,这种窗口咱们称之为翻滚计数窗口(Tumbling Count Window),上图所示窗口大小为3个。经过使用 DataStream API,咱们能够这样实现:性能
// Stream of (userId, buyCnts) val buyCnts: DataStream[(Int, Int)] = ... val tumblingCnts: DataStream[(Int, Int)] = buyCnts // key stream by sensorId .keyBy(0) // tumbling count window of 100 elements size .countWindow(100) // compute the buyCnt sum .sum(1) |
Sliding Count Window
固然Count Window 也支持 Sliding Window,虽在上图中未描述出来,但和Sliding Time Window含义是相似的,例如计算每10个元素计算一次最近100个元素的总和,代码示例以下。
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts .keyBy(0) // sliding count window of 100 elements size and 10 elements trigger interval .countWindow(100, 10) .sum(1) |
在这种用户交互事件流中,咱们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。如上图所示,就是须要计算每一个用户在活跃期间总共购买的商品数量,若是用户30秒没有活动则视为会话断开(假设raw data stream是单个用户的购买行为流)。Session Window 的示例代码以下:
// Stream of (userId, buyCnts) val buyCnts: DataStream[(Int, Int)] = ... val sessionCnts: DataStream[(Int, Int)] = vehicleCnts .keyBy(0) // session window based on a 30 seconds session gap interval .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30))) .sum(1) |
通常而言,window 是在无限的流上定义了一个有限的元素集合。这个集合能够是基于时间的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。Flink 的 DataStream API 提供了简洁的算子来知足经常使用的窗口操做,同时提供了通用的窗口机制来容许用户本身定义窗口分配逻辑。下面咱们会对 Flink 窗口相关的 API 进行剖析。
得益于 Flink Window API 松耦合设计,咱们能够很是灵活地定义符合特定业务的窗口。Flink 中定义一个窗口主要须要如下三个组件。
Window Assigner:用来决定某个元素被分配到哪一个/哪些窗口中去。
Trigger:触发器。决定了一个窗口什么时候可以被计算或清除,每一个窗口都会拥有一个本身的Trigger。
Evictor:能够译为“驱逐者”。在Trigger触发以后,在窗口被处理以前,Evictor(若是有Evictor的话)会用来剔除窗口中不须要的元素,至关于一个filter。
上述三个组件的不一样实现的不一样组合,能够定义出很是复杂的窗口。Flink 中内置的窗口也都是基于这三个组件构成的,固然内置窗口有时候没法解决用户特殊的需求,因此 Flink 也暴露了这些窗口机制的内部接口供用户实现自定义的窗口。下面咱们将基于这三者探讨窗口的实现机制。
下图描述了 Flink 的窗口机制以及各组件之间是如何相互工做的。
首先上图中的组件都位于一个算子(window operator)中,数据流源源不断地进入算子,每个到达的元素都会被交给 WindowAssigner。WindowAssigner 会决定元素被放到哪一个或哪些窗口(window),可能会建立新窗口。由于一个元素能够被放入多个窗口中,因此同时存在多个窗口是可能的。注意,Window
自己只是一个ID标识符,其内部可能存储了一些元数据,如TimeWindow
中有开始和结束时间,可是并不会存储窗口中的元素。窗口中的元素实际存储在 Key/Value State 中,key为Window
,value为元素集合(或聚合值)。为了保证窗口的容错性,该实现依赖了 Flink 的 State 机制(参见 state 文档)。
每个窗口都拥有一个属于本身的 Trigger,Trigger上会有定时器,用来决定一个窗口什么时候可以被计算或清除。每当有元素加入到该窗口,或者以前注册的定时器超时了,那么Trigger都会被调用。Trigger的返回结果能够是 continue(不作任何操做),fire(处理窗口数据),purge(移除窗口和窗口中的数据),或者 fire + purge。一个Trigger的调用结果只是fire的话,那么会计算窗口并保留窗口原样,也就是说窗口中的数据仍然保留不变,等待下次Trigger fire的时候再次执行计算。一个窗口能够被重复计算屡次知道它被 purge 了。在purge以前,窗口会一直占用着内存。
当Trigger fire了,窗口中的元素集合就会交给Evictor
(若是指定了的话)。Evictor 主要用来遍历窗口中的元素列表,并决定最早进入窗口的多少个元素须要被移除。剩余的元素会交给用户指定的函数进行窗口的计算。若是没有 Evictor 的话,窗口中的全部元素会一块儿交给函数进行计算。
计算函数收到了窗口的元素(可能通过了 Evictor 的过滤),并计算出窗口的结果值,并发送给下游。窗口的结果值能够是一个也能够是多个。DataStream API 上能够接收不一样类型的计算函数,包括预约义的sum()
,min()
,max()
,还有 ReduceFunction
,FoldFunction
,还有WindowFunction
。WindowFunction 是最通用的计算函数,其余的预约义的函数基本都是基于该函数实现的。
Flink 对于一些聚合类的窗口计算(如sum,min)作了优化,由于聚合类的计算不须要将窗口中的全部数据都保存下来,只须要保存一个result值就能够了。每一个进入窗口的元素都会执行一次聚合函数并修改result值。这样能够大大下降内存的消耗并提高性能。可是若是用户定义了 Evictor,则不会启用对聚合窗口的优化,由于 Evictor 须要遍历窗口中的全部元素,必需要将窗口中全部元素都存下来。
上述的三个组件构成了 Flink 的窗口机制。为了更清楚地描述窗口机制,以及解开一些疑惑(好比 purge 和 Evictor 的区别和用途),咱们将一步步地解释 Flink 内置的一些窗口(Time Window,Count Window,Session Window)是如何实现的。
Count Window 是使用三组件的典范,咱们能够在 KeyedStream
上建立 Count Window,其源码以下所示:
// tumbling count window public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) { return window(GlobalWindows.create()) // create window stream using GlobalWindows .trigger(PurgingTrigger.of(CountTrigger.of(size))); // trigger is window size } // sliding count window public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) { return window(GlobalWindows.create()) .evictor(CountEvictor.of(size)) // evictor is window size .trigger(CountTrigger.of(slide)); // trigger is slide size } |
第一个函数是申请翻滚计数窗口,参数为窗口大小。第二个函数是申请滑动计数窗口,参数分别为窗口大小和滑动大小。它们都是基于 GlobalWindows
这个 WindowAssigner 来建立的窗口,该assigner会将全部元素都分配到同一个global window中,全部GlobalWindows
的返回值一直是 GlobalWindow
单例。基本上自定义的窗口都会基于该assigner实现。
翻滚计数窗口并不带evictor,只注册了一个trigger。该trigger是带purge功能的 CountTrigger。也就是说每当窗口中的元素数量达到了 window-size,trigger就会返回fire+purge,窗口就会执行计算并清空窗口中的全部元素,再接着储备新的元素。从而实现了tumbling的窗口之间无重叠。
滑动计数窗口的各窗口之间是有重叠的,但咱们用的 GlobalWindows assinger 从始至终只有一个窗口,不像 sliding time assigner 能够同时存在多个窗口。因此trigger结果不能带purge,也就是说计算完窗口后窗口中的数据要保留下来(供下个滑窗使用)。另外,trigger的间隔是slide-size,evictor的保留的元素个数是window-size。也就是说,每一个滑动间隔就触发一次窗口计算,并保留下最新进入窗口的window-size个元素,剔除旧元素。
假设有一个滑动计数窗口,每2个元素计算一次最近4个元素的总和,那么窗口工做示意图以下所示:
图中所示的各个窗口逻辑上是不一样的窗口,但在物理上是同一个窗口。该滑动计数窗口,trigger的触发条件是元素个数达到2个(每进入2个元素就会触发一次),evictor保留的元素个数是4个,每次计算完窗口总和后会保留剩余的元素。因此第一次触发trigger是当元素5进入,第三次触发trigger是当元素2进入,并驱逐5和2,计算剩余的4个元素的总和(22)并发送出去,保留下2,4,9,7元素供下个逻辑窗口使用。
一样的,咱们也能够在 KeyedStream
上申请 Time Window,其源码以下所示:
// tumbling time window public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(TumblingProcessingTimeWindows.of(size)); } else { return window(TumblingEventTimeWindows.of(size)); } } // sliding time window public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(SlidingProcessingTimeWindows.of(size, slide)); } else { return window(SlidingEventTimeWindows.of(size, slide)); } } |
在方法体内部会根据当前环境注册的时间类型,使用不一样的WindowAssigner建立window。能够看到,EventTime和IngestTime都使用了XXXEventTimeWindows
这个assigner,由于EventTime和IngestTime在底层的实现上只是在Source处为Record打时间戳的实现不一样,在window operator中的处理逻辑是同样的。
这里咱们主要分析sliding process time window,以下是相关源码:
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long size; private final long slide; private SlidingProcessingTimeWindows(long size, long slide) { this.size = size; this.slide = slide; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp) { timestamp = System.currentTimeMillis(); List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); // 对齐时间戳 long lastStart = timestamp - timestamp % slide; for (long start = lastStart; start > timestamp - size; start -= slide) { // 当前时间戳对应了多个window windows.add(new TimeWindow(start, start + size)); } return windows; } ... } public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { @Override // 每一个元素进入窗口都会调用该方法 public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { // 注册定时器,当系统时间到达window end timestamp时会回调该trigger的onProcessingTime方法 ctx.registerProcessingTimeTimer(window.getEnd()); return TriggerResult.CONTINUE; } @Override // 返回结果表示执行窗口计算并清空窗口 public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; } ... } |
首先,SlidingProcessingTimeWindows
会对每一个进入窗口的元素根据系统时间分配到(size / slide)
个不一样的窗口,并会在每一个窗口上根据窗口结束时间注册一个定时器(相同学口只会注册一份),当定时器超时时意味着该窗口完成了,这时会回调对应窗口的Trigger的onProcessingTime
方法,返回FIRE_AND_PURGE,也就是会执行窗口计算并清空窗口。整个过程示意图以下:
如上图所示横轴表明时间戳(为简化问题,时间戳从0开始),第一条record会被分配到[-5,5)和[0,10)两个窗口中,当系统时间到5时,就会计算[-5,5)窗口中的数据,并将结果发送出去,最后清空窗口中的数据,释放该窗口资源。
Session Window 是一个需求很强烈的窗口机制,但Session也比以前的Window更复杂,因此 Flink 也是在即将到来的 1.1.0 版本中才支持了该功能。因为篇幅问题,咱们将在后续的 Session Window 的实现 中深刻探讨 Session Window 的实现。