Flink 滑动窗口使用触发器会触发多个窗口的计算

以前有小伙伴在群里说:滑动窗口使用触发器让每条数据都触发一次计算dom

可是他并无获得预期的结果:每条数据都触发一次计算,输出一条结果,而是天天数据都输出了不少条结果ide

为何会这样呢?spa

写了个小案例,来解释这种状况code

为了方便使用自定义的 source 开发数据:orm

class StringSourceFunction extends SourceFunction[String] {

  var flag = true

  override def cancel(): Unit = {

    flag = false
  }

  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {

    while (flag) {
      val str = StringUtil.getRandomString(1).toUpperCase
      ctx.collect(str + "," + StringUtil.getRandomString(1).toUpperCase)
      Thread.sleep(1000)
    }
  }

}

就是个简单的 souce,每秒对外发出随机的 string 字符串(基本一分钟 60 条)blog

对应的计算程序以下:element

env.addSource(new StringSourceFunction)
      .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(10)))
      // 每条数据触发一次计算
      //.trigger(CountTrigger.of(1))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
      override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
        // 窗口
        val window = context.window.toString
        // 简单计算下窗口里面的元素个数
        var count: Long = 0
        elements.iterator.foreach(s => count += 1)


        out.collect("time : " + sdf.format(System.currentTimeMillis()) + ", window : " + window + ", element counter : " + count)
      }
    })
      .print("")

定义了一个 一分钟的窗口,滑动间隔是10秒,一条数据就应该属于6个窗口开发

好比: 5 这条数据属于:(-50,10)(-40,20)(-30,30)(-20,40)(-10,50)(0,60) 这6 个窗口字符串

注释 trigger 看结果:get

10秒滑动间隔,就是10秒有一个滑动一次,一个窗口结束,触发一次计算,输出一个结果(前面6个窗口,由于刚启动数据不够60条)

开启了tirgger 结果就彻底不同了

能够看到,第一条数据进去的时候,触发了6次计算,由于它属于6个窗口,tirgger 会触发6次

 欢迎关注Flink菜鸟公众号,会不按期更新Flink(开发技术)相关的推文

相关文章
相关标签/搜索