场景:
窗口中每进入一条数据,就进行一次计算,等时间到了展现最后的结果。
经常使用的聚合算子:session
reduce(reduceFunction) aggregate(aggregateFunction) sum(),min(),max()
(1)reduce算子
场景:窗口内元素的聚合求和app
/** * window的增量聚合 */ public class SocketWindowIncrAgg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999); SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number)); AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator .timeWindowAll(Time.seconds(10)); windowedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer last, Integer current) throws Exception { System.out.println("process last:"+last+" current:"+current); return last + current; } }).print().setParallelism(1); env.execute(SocketWindowIncrAgg.class.getSimpleName()); } }
(2)aggregate算子
需求:求每隔窗口里面的数据的平均值socket
/** * window的增量聚合 * 用aggregate算子计算平均值 */ public class SocketWindowAvgAgg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999); SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number)); AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator .timeWindowAll(Time.seconds(10)); windowedStream.aggregate(new MyAggregate()).print().setParallelism(1); env.execute(SocketWindowAvgAgg.class.getSimpleName()); } /** * IN,输入的数据类型 * ACC,自定义的中间状态 * Tuple2(Integer,Integer) * key: 计算数据的个数 * value:计算总值 * OUT,输出的数据类型 * */ private static class MyAggregate implements AggregateFunction<Integer, Tuple2<Integer,Integer>,Double> { /** * 初始化累加器 * @return */ @Override public Tuple2<Integer, Integer> createAccumulator() { return new Tuple2<>(0,0); } /** * 针对每一个数据的操做 * @param element * @param accumulator * @return */ @Override public Tuple2<Integer, Integer> add(Integer element, Tuple2<Integer, Integer> accumulator) { //个数+1 //总的值累计 accumulator.f0+=1; accumulator.f1+=element; System.out.println("input:"+element+"|accumulator:"+accumulator); return new Tuple2<>(accumulator.f0,accumulator.f1); } /** * 计算结果 * @param accumulator * @return */ @Override public Double getResult(Tuple2<Integer, Integer> accumulator) { System.out.println("call getResult:"+accumulator); return (double) accumulator.f1/accumulator.f0; } /** * 最终结果的合并 * @param a1 * @param b1 * @return */ @Override public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a1, Tuple2<Integer, Integer> b1) { System.out.println("call merge:"+a1+" "+b1); return Tuple2.of(a1.f0+b1.f0,a1.f1+b1.f1); } } }
等属于窗口的数据到齐,才开始进行聚合计算【能够实现对窗口内的数据进行排序等需求】
经常使用算子:ide
apply(windowFunction) process(processWindowFunction) processWindowFunction比windowFunction提供了更多的上下文信息。 相似于map和RichMap的关系
/** * window的全量计算 * 用ProcessAllWindowFunction抽象类,重写process方法 */ public class SocketWindowFullAgg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999); SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number)); AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator .timeWindowAll(Time.seconds(10)); windowedStream.process(new MyProcess()) .print().setParallelism(1); env.execute(SocketWindowFullAgg.class.getSimpleName()); } /** * 抽象类只能继承 * * @tparam IN The type of the input value. * @tparam OUT The type of the output value. * @tparam W The type of the window. * */ private static class MyProcess extends ProcessAllWindowFunction<Integer,Integer,TimeWindow> { @Override public void process(Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception { System.out.println("process start:"+elements); int count = 0; Iterator<Integer> integerIterator = elements.iterator(); while (integerIterator.hasNext()){ Integer number = integerIterator.next(); count += number; } out.collect(count); } } }
两个window之间能够进行join,join操做只支持三种类型的window:
滚动窗口,滑动窗口,会话窗口。
使用方式:code
stream.join(otherStream) //两个流进行关联 .where(<KeySelector>) //选择第一个流的key做为关联字段 .equalTo(<KeySelector>)//选择第二个流的key做为关联字段 .window(<WindowAssigner>)//设置窗口的类型 .apply(<JoinFunction>) //对结果作操做
核心思想:两个事件流先join,而后去开窗
示例:blog
/** * window的join * 两个流进行滚动窗口的join */ public class SocketTumblingWindowJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999); DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998); SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number)); SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number)); DataStream<String> result = dataStream01.join(dataStream02) .where(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return integer; } }) .equalTo(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return integer; } }) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction<Integer, Integer, String>() { @Override public String join(Integer first, Integer second) throws Exception { return first + ","+ second; } }); result.print().setParallelism(1); env.execute(SocketTumblingWindowJoin.class.getSimpleName()); } }
/** * window的join * 两个流进行滑动窗口的join */ public class SocketSlideWindowJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999); DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998); SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number)); SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number)); DataStream<String> result = dataStream01.join(dataStream02) .where(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return 1; } }) .equalTo(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return 1; } }) .window(SlidingProcessingTimeWindows.of(Time.seconds(10),//size Time.seconds(5)//slide )) .apply(new JoinFunction<Integer, Integer, String>() { @Override public String join(Integer first, Integer second) throws Exception { return first + ","+ second; } }); result.print().setParallelism(1); env.execute(SocketSlideWindowJoin.class.getSimpleName()); } }
/** * window的join * 两个流进行session窗口的join */ public class SocketSesssionWindowJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999); DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998); SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number)); SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number)); DataStream<String> result = dataStream01.join(dataStream02) .where(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return 1; } }) .equalTo(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return 1; } }) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) .apply(new JoinFunction<Integer, Integer, String>() { @Override public String join(Integer first, Integer second) throws Exception { return first + ","+ second; } }); result.print().setParallelism(1); env.execute(SocketSesssionWindowJoin.class.getSimpleName()); } }
/** * window的join * 两个流按照key,取窗口一段间隔内jioin * */ public class SocketIntervalWindowJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999); DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998); SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number)); SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number)); DataStream<String> result = dataStream01 .keyBy(0) .intervalJoin(dataStream02.keyBy(0)) .between(Time.seconds(-2),Time.seconds(2)) .process(new ProcessJoinFunction<Integer, Integer, String>() { @Override public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) throws Exception { out.collect(left+","+right); } }); result.print().setParallelism(1); env.execute(SocketIntervalWindowJoin.class.getSimpleName()); } }