在大多数场景下,咱们须要统计的数据流都是无界的,所以咱们没法等待整个数据流终止后才进行统计。一般状况下,咱们只须要对某个时间范围或者数量范围内的数据进行统计分析:如每隔五分钟统计一次过去一小时内全部商品的点击量;或者每发生1000次点击后,都去统计一下每一个商品点击率的占比。在 Flink 中,咱们使用窗口 (Window) 来实现这类功能。按照统计维度的不一样,Flink 中的窗口能够分为 时间窗口 (Time Windows) 和 计数窗口 (Count Windows) 。html
Time Windows 用于以时间为维度来进行数据聚合,具体分为如下四类:java
滚动窗口 (Tumbling Windows) 是指彼此之间没有重叠的窗口。例如:每隔1小时统计过去1小时内的商品点击量,那么 1 天就只能分为 24 个窗口,每一个窗口彼此之间是不存在重叠的,具体以下:git
这里咱们以词频统计为例,给出一个具体的用例,代码以下:github
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 接收socket上的数据输入
DataStreamSource<String> streamSource = env.socketTextStream("hadoop001", 9999, "\n", 3);
streamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
String[] words = value.split("\t");
for (String word : words) {
out.collect(new Tuple2<>(word, 1L));
}
}
}).keyBy(0).timeWindow(Time.seconds(3)).sum(1).print(); //每隔3秒统计一次每一个单词出现的数量
env.execute("Flink Streaming");
复制代码
测试结果以下:apache
滑动窗口用于滚动进行聚合分析,例如:每隔 6 分钟统计一次过去一小时内全部商品的点击量,那么统计窗口彼此之间就是存在重叠的,即 1天能够分为 240 个窗口。图示以下:windows
能够看到 window 1 - 4 这四个窗口彼此之间都存在着时间相等的重叠部分。想要实现滑动窗口,只须要在使用 timeWindow 方法时额外传递第二个参数做为滚动时间便可,具体以下:socket
// 每隔3秒统计一次过去1分钟内的数据
timeWindow(Time.minutes(1),Time.seconds(3))
复制代码
当用户在进行持续浏览时,可能每时每刻都会有点击数据,例如在活动区间内,用户可能频繁的将某类商品加入和移除购物车,而你只想知道用户本次浏览最终的购物车状况,此时就能够在用户持有的会话结束后再进行统计。想要实现这类统计,能够经过 Session Windows 来进行实现。ide
具体的实现代码以下:oop
// 以处理时间为衡量标准,若是10秒内没有任何数据输入,就认为会话已经关闭,此时触发统计
window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 以事件时间为衡量标准
window(EventTimeSessionWindows.withGap(Time.seconds(10)))
复制代码
最后一个窗口是全局窗口, 全局窗口会将全部 key 相同的元素分配到同一个窗口中,其一般配合触发器 (trigger) 进行使用。若是没有相应触发器,则计算将不会被执行。测试
这里继续以上面词频统计的案例为例,示例代码以下:
// 当单词累计出现的次数每达到10次时,则触发计算,计算整个窗口内该单词出现的总数
window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();
复制代码
Count Windows 用于以数量为维度来进行数据聚合,一样也分为滚动窗口和滑动窗口,实现方式也和时间窗口彻底一致,只是调用的 API 不一样,具体以下:
// 滚动计数窗口,每1000次点击则计算一次
countWindow(1000)
// 滑动计数窗口,每10次点击发生后,则计算过去1000次点击的状况
countWindow(1000,10)
复制代码
实际上计数窗口内部就是调用的咱们上一部分介绍的全局窗口来实现的,其源码以下:
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}
复制代码
Flink Windows: ci.apache.org/projects/fl…
更多大数据系列文章能够参见 GitHub 开源项目: 大数据入门指南