flink中支持多种窗口,包括:时间窗口,session窗口,统计窗口等等,能想到的基本均可以实现java
时间窗口(Time Windows)
最简单经常使用的窗口形式是基于时间的窗口,flink支持三种种时间窗口:apache
翻滚时间窗口的窗口是固定的,好比设定一个1分钟的时间窗口,该时间窗口将只计算当前1分钟内的数据,而不会管前1分钟或后1分钟的数据。
时间是对齐的,数据不会同时出如今2个窗口内,不会重叠api
滑动窗口,顾名思义,该时间窗口是滑动的。因此,从概念上讲,这里有两个方面的概念须要理解:session
窗口:须要定义窗口的大小
滑动:须要定义在窗口中滑动的大小,但理论上讲滑动的大小不能超过窗口大小
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成
窗口长度是固定的,能够有重叠的部分socket
由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到新数据就会生成新的窗口
主要特色就是: 时间无对齐maven
window() 方法接收的输入参数是一个WindowAssigner WindowAssigner 负责将每条输入的数据分发到正确的window中 Flink提供了通用的WindowAssigner 滚动窗口(tumbling window) 滑动窗口(sliding window) 会话窗口(session window) 全局窗口(global window) 建立不一样类型的窗口 滚动时间窗口(tumbling time window) timeWindow(Time.seconds(15)) 滑动时间窗口(sliding time window) .timeWindow(Time.seconds(15),Time.seconds(5)) 会话窗口(session window) .window(EventTimeSessionWindows.withGap(Time.minutes(10)) 窗口函数(window function) window function 定义了要对窗口中收集的数据作的计算操做,能够分为两类; 增量聚合函数(incrementalggergation functions) 每条数据来了就会进行计算,保持一个简单的状态 ReduceFunction, AggregateFunction 全窗口函数(full windowfunctions) 先把窗口全部数据收集起来,等到计算的时候会遍历全部数据 ProcessWindowFunction 其余一些经常使用的API .trigger()---------触发器 定义window何时关闭,触发计算并输出结果 .evicotr()---------移除器 定义移除某些数据的逻辑 .allowedLateness() ------容许处理迟到的数据 .sideOutputLateData() -----将迟到的数据放入侧输出流 .getSideOutput() ----获取侧输出流
理论说半天其实仍是萌的,上个栗子ide
新建一个scala Object WindowTest.scala函数
package com.mafei.apitest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.streaming.api.windowing.time.Time object WindowTest { def main(args: Array[String]): Unit = { //建立执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //以事件时间做为窗口聚合 //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) //以数据进入flink的时间做为窗口时间 // env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //以Flink实际处理时间做为窗口时间 //若是发现没有输出,那多是由于数据太少,不到15s都处理完成了,能够换成socket或者kafka来进行测试 val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") env.setParallelism(1) inputStream.print() //先转换成样例类类型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割数据,获取结果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一个传感器类的数据,参数中传toLong和toDouble是由于默认分割后是字符串类别 }) //每15秒统计一次,窗口内各传感器全部温度的最小值,以及最小的时间戳 val resultStream = dataStream .map(data=>(data.id,data.temperature,data.timestamp)) .keyBy(_._1) //按照二元组的第一个元素(id)分组 // .window(TumblingEventTimeWindows.of(Time.seconds(15))) //滚动时间窗口 // .window(SlidingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(3))) //滑动时间窗口,15秒一个窗口,每次日后划3秒 // .window(EventTimeSessionWindows.withGap(Time.seconds(15))) //会话窗口,超过15秒算下一个会话 // .countWindow(15) //滚动计数窗口 .timeWindow(Time.seconds(15)) //每15秒统计一次,滚动时间窗口 // .minBy(1) //第二个元素作最小值的统计,若是只是获取全部温度的最小值,直接用这个方法就能够了。。 .reduce((curRes,newData)=>(curRes._1, curRes._2.min(newData._2),newData._3)) resultStream.print() env.execute() } } //上面reduce代码若是用这个自定义的方式也是同样能够实现,效果是同样的 class MyReducer extends ReduceFunction[SensorReadingTest5]{ override def reduce(t: SensorReadingTest5, t1: SensorReadingTest5): SensorReadingTest5 = SensorReadingTest5(t.id, t1.timestamp,t.temperature.min(t1.temperature)) }
准备一个sensor.txt 放到指定目录下内容: 测试
sensor1,1603766281,1 sensor2,1603766282,42 sensor3,1603766283,43 sensor4,1603766240,40.1 sensor4,1603766284,20 sensor4,1603766249,40.2
最终代码的结构,和运行效果scala