本文参考Apache Beam官方编程手册
能够结合官方的Mobile Game 代码阅读本文。apache
在默认状况下,Apache Beam是不分窗的,也就是采用GlobalWindow,而若是同时也不设置自定义的触发器,那么Beam会在全部数据都收集到以后才开始对数据进行处理。这一般只能适用于有限数据且对实时性要求不高的状况。当输入为无限流数据,咱们能够
1)设置合适的窗口大小(根据时间戳),在窗口末端进行数据处理;
2)设置触发器,当条件知足时触发,进行数据处理;
3)同时设置窗口和触发器。
时间戳说明:Beam的数据都是保存在PCollection中。当读入数据时,PCollection为每一个元素都自动生成一个内置的时间戳,对于无限输入,数据的时间戳不一样。而对于有限输入,因为是同时读入,全部的元素的时间戳都是同样的,这时候分窗是没有意义的(都在一个窗口)。而咱们能够手动为每一个元素设置时间戳,一般采用数据中已有的时间属性(好比日志中通常都会带有事件时间)。能够在DoFn中为数据带上时间戳,如:编程
@ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element(), new Instant(XXX)); }
1)全局窗口
就是默认不分窗的状况。
apply(Windows.<TYPE>into(new GlobalWindows()));
2)固定时间大小窗口
最多见的分窗方式,按照时间戳把数据处理窗口分为固定长度。
apply(Windows.<TYPE>into(FixedWindows.of(Duration.standardMinutes(XX))))app
3)滑动窗口
须要设置2个参数,窗口大小和窗口产生周期。窗口之间有重叠,一般用于计算平均数的状况(暂没用过)ide
4)会话窗口
通常用于相同key数据聚合,同一个key的数据之间时间间隔较大的会被分到不一样的窗口。ui
**spa
**
当使用用户自定义的时间戳时,先处理的数据并不老是时间戳较小的,有可能出现时间戳小的数据在后面才产生的状况。Beam一般会给窗口设定一个处理期限时间(图中纵轴),当超过这个时间的数据被视为超时数据,而这些期限时间的连线即水位线。日志
系统会根据实际状况进行预测生成水位线,在默认状况下不对超时数据进行处理,而咱们能够经过设置触发器对超时数据进行额外处理。code
1)时间时间触发器
根据时间戳进行触发。blog
.triggering(AfterWatermark.pastEndOfWindow()//水位线到达时触发一次 .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(FIVE_MINUTES))//水位线以前,每次触发后第一个数据来到以后的5分钟时再触发 .withLateFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TEN_MINUTES)))//水位线以后,每次触发后第一个数据来到以后的10分钟时再触发
以上分别对水位线上中下的3种数据进行不一样的处理。须要注意的是withEarlyFirings和withLateFirings方法生成的触发器是连续的而不是一次性的。
2)处理时间触发器three
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)
如方法的字面意思,仅在第一个数据到达后的5分钟时触发一次。
3)数据驱动型触发器
AfterPane.elementCountAtleast(XX)
当处理到XX个时触发一次。须要注意的是当数据个数小于XX时永远不会触发数据处理。
4)混合触发器
将多个触发器混合起来,好比1)中的代码就是3个触发器混合。其余的还有
①Repeatedly.forever(一次性触发器)
将一次性触发器变为连续型触发器,触发后再次等待触发。例如与AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)一块儿用能够实现每一个数据到达后的5分钟进行处理,常常用于全局窗口,能够用orFinally(触发器)来设置中止条件。
②AfterEach.inOrder(触发器1,触发器2...)
当触发器1知足后等待触发器2...知道全部触发器知足后开始数据处理。
③AfterFirst(触发器1,触发器2..)和AfterAll(触发器1,触发器2..)
这2个分别为或,与的逻辑。
④orFinally
见①
Accumulating Mode:
If our trigger is set to .accumulatingFiredPanes, the trigger emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:
First trigger firing: [5, 8, 3] Second trigger firing: [5, 8, 3, 15, 19, 23] Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10]
Discarding Mode:
If our trigger is set to .discardingFiredPanes, the trigger emits the following values on each firing:
First trigger firing: [5, 8, 3] Second trigger firing: [15, 19, 23] Third trigger firing: [9, 13, 10]
.withAllowedLateness(Duration.XXXX(XXX))可设置容许超时多长时间的数据。