转发请注明原创地址:https://www.cnblogs.com/dongxiao-yang/p/9391815.htmlhtml
背景
flink 提供了完善的窗口机制, api中支持常见的三种窗口形式,滚动窗口,滑动窗口和session窗口。下面的图片显示了三种窗口的划分区别:
滚动窗口
滑动窗口
session窗口api
Tumbing Windows:滚动窗口,窗口之间时间点不重叠。它是按照固定的时间,或固定的事件个数划分的,分别能够叫作滚动时间窗口和滚动事件窗口。
Sliding Windows:滑动窗口,窗口之间时间点存在重叠。对于某些应用,它们须要的时间是不间断的,须要平滑的进行窗口聚合。例如,能够每30s记算一次最近1分钟用户所购买的商品数量的总数,这个就是时间滑动窗口;或者每10个客户点击购买,而后就计算一下最近100个客户购买的商品的总和,这个就是事件滑动窗口。
Session Windows:会话窗口,通过一段设置时间无数据认为窗口完成。session
在默认的场景下,全部的窗口都是到达时间语义上的windown end time后触发对整个窗口元素的计算,可是在部分场景的状况下,业务方须要在窗口时间没有结束的状况下也能够得到当前的聚合结果,好比每隔五分钟获取当前小时的sum值,这种状况下,官方提供了对于上述窗口的定制化计算器ContinuousEventTimeTrigger和ContinuousProcessingTimeTriggersocket
下面是一个使用ContinuousProcessingTimeTrigger的简单例子:ide
public class ContinueTriggerDemo { public static void main(String[] args) throws Exception { // TODO Auto-generated method stub String hostName = "localhost"; Integer port = Integer.parseInt("8001"); ; // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); // 从指定socket获取输入数据 DataStream<String> text = env.socketTextStream(hostName, port); text.flatMap(new LineSplitter()) //数据语句分词 .keyBy(0) // 流按照单词分区 .window(TumblingProcessingTimeWindows.of(Time.seconds(120)))// 设置一个120s的滚动窗口 .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))//窗口每统计一次当前计算结果 .sum(1)// count求和 .map(new Mapdemo())//输出结果加上时间戳 .print(); env.execute("Java WordCount from SocketTextStream Example"); } /** * Implements the string tokenizer that splits sentences into words as a * user-defined FlatMapFunction. The function takes a line (String) and * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String, * Integer>). */ public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } public static final class Mapdemo implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> { @Override public Tuple3<String, String, Integer> map(Tuple2<String, Integer> value) throws Exception { // TODO Auto-generated method stub DateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String s = format2.format(new Date()); return new Tuple3<String, String, Integer>(value.f0, s, value.f1); } } }
在本地启动端口 :nc -lk 8001 并启动flink程序
输入数据:spa
aa
aa
bb
观察程序数据结果日志日志
5> (aa,2018-07-30 16:08:20,2) 5> (bb,2018-07-30 16:08:20,1) 5> (aa,2018-07-30 16:08:40,2) 5> (bb,2018-07-30 16:08:40,1) 5> (aa,2018-07-30 16:09:00,2) 5> (bb,2018-07-30 16:09:00,1) 5> (aa,2018-07-30 16:09:20,2) 5> (bb,2018-07-30 16:09:20,1) 5> (aa,2018-07-30 16:09:40,2) 5> (bb,2018-07-30 16:09:40,1)
在上述输入后继续输入code
aa
日志结果统计为orm
5> (aa,2018-07-30 16:10:00,3) 5> (bb,2018-07-30 16:10:00,1)
根据日志数据可见,flink轻松实现了一个窗口时间长度为120s并每20s向下游发送一次窗口当前聚合结果的功能。htm