自定义Trigger
class Trigger2 extends Trigger[SensorReading, TimeWindow] {
// 每来一条数据,就会执行onElement方法
override def onElement(element: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
// 经过Trigger.TriggerContext对象建立一个状态变量,有则获取,无则建立
val sumValue: ValueState[Int] = ctx.getPartitionedState(new ValueStateDescriptor[Int]("sumValue", classOf[Int]))
// 使用从当前数据提取的时间戳,来建立一个计时器
ctx.registerEventTimeTimer(timestamp + 1000L) // 建立定时器
// TriggerResult.CONTINUE表示这条数据达到以后,对应的窗口什么都不作
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
// 定义watermark到达计时器时间时要执行的操做
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
// 表示窗口触发操做,可是不会关闭窗口
TriggerResult.FIRE
}
// 清除状态
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
val sumValue: ValueState[Int] = ctx.getPartitionedState(new ValueStateDescriptor[Int]("sumValue", classOf[Int]))
sumValue.clear()
}
}
- Trigger中能够定义状态,状态的做用域是当前的窗口,也就是说状态只属于一个单独窗口,processsWindowFunction函数中的状态定义也是如此
- 触发器中定义的状态变量必定要手动清理,不然在窗口关闭以后,Flink应用不会去清理触发器中以前全部关闭窗口对应的状态