Flink自定义窗口Trigger

自定义Trigger

class Trigger2 extends Trigger[SensorReading, TimeWindow] {

    // 每来一条数据,就会执行onElement方法
    override def onElement(element: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
      // 经过Trigger.TriggerContext对象建立一个状态变量,有则获取,无则建立
      val sumValue: ValueState[Int] = ctx.getPartitionedState(new ValueStateDescriptor[Int]("sumValue", classOf[Int]))
      // 使用从当前数据提取的时间戳,来建立一个计时器
      ctx.registerEventTimeTimer(timestamp + 1000L) // 建立定时器
      // TriggerResult.CONTINUE表示这条数据达到以后,对应的窗口什么都不作
      TriggerResult.CONTINUE
    }

    override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
    // 定义watermark到达计时器时间时要执行的操做
    override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
      // 表示窗口触发操做,可是不会关闭窗口
      TriggerResult.FIRE

    }
    // 清除状态
    override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
      val sumValue: ValueState[Int] = ctx.getPartitionedState(new ValueStateDescriptor[Int]("sumValue", classOf[Int]))
      sumValue.clear()
    }
  }
  • Trigger中能够定义状态,状态的做用域是当前的窗口,也就是说状态只属于一个单独窗口,processsWindowFunction函数中的状态定义也是如此
  • 触发器中定义的状态变量必定要手动清理,不然在窗口关闭以后,Flink应用不会去清理触发器中以前全部关闭窗口对应的状态