做者:邱从贤apache
Apache Flink(如下简称 Flink) 是一个自然支持无限流数据处理的分布式计算框架,在 Flink 中 Window 能够将无限流切分红有限流,是处理有限流的核心组件,如今 Flink 中 Window 能够是时间驱动的(Time Window),也能够是数据驱动的(Count Window)。api
下面的代码是在 Flink 中使用 Window 的两个示例性能优化
从第一部分咱们已经知道 Window 的一些基本概念,以及相关 API,下面咱们以一个实际例子来看看怎么使用 Window 相关的 API。微信
代码来自 flink-examples:网络
上面的例子中咱们首先会对每条数据进行时间抽取,而后进行 keyby,接着依次调用 window(),evictor(), trigger() 以及 maxBy()。下面咱们重点来看 window(), evictor() 和 trigger() 这几个方法。session
Window 方法接收的输入是一个WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 Window 中(一条数据可能同时分发到多个 Window 中),Flink 提供了几种通用的 WindowAssigner:tumbling window(窗口间的元素无重复),sliding window(窗口间的元素可能重复),session window 以及 global window。若是须要本身定制数据分发策略,则能够实现一个 class,继承自 WindowAssigner。框架
Tumbling Window机器学习
Sliding Window分布式
Session Windowide
Global Window
Evictor 主要用于作一些数据的自定义操做,能够在执行用户代码以前,也能够在执行用户代码以后,更详细的描述能够参考 org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter 两个方法。Flink 提供了以下三种通用的 evictor:
CountEvictor 保留指定数量的元素
DeltaEvictor 经过执行用户给定的 DeltaFunction 以及预设的 threshold,判断是否删除一个元素。
TimeEvictor设定一个阈值 interval,删除全部再也不 max_ts - interval 范围内的元素,其中 max_ts 是窗口内时间戳的最大值。
Evictor 是可选的方法,若是用户不选择,则默认没有。
Trigger 用来判断一个窗口是否须要被触发,每一个 WindowAssigner 都自带一个默认的 Trigger,若是默认的 Trigger 不能知足你的需求,则能够自定义一个类,继承自 Trigger 便可,咱们详细描述下 Trigger 的接口以及含义:
onElement() 每次往 window 增长一个元素的时候都会触发
onEventTime() 当 event-time timer 被触发的时候会调用
onProcessingTime() 当 processing-time timer 被触发的时候会调用
onMerge() 对两个 trigger 的 state 进行 merge 操做
clear() window 销毁的时候被调用
上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有以下几种可能的选择:
CONTINUE 不作任何事情
FIRE 触发 window
PURGE 清空整个 window 的元素并销毁窗口
FIRE_AND_PURGE 触发窗口,而后销毁窗口
了解完上面的内容后,对于时间驱动的窗口,咱们还有两个概念须要澄清:Time 和 Watermark。
咱们知道在分布式环境中 Time 是一个很重要的概念,在 Flink 中 Time 能够分为三种 Event-Time,Processing-Time 以及 Ingestion-Time,三者的关系咱们能够从下图中得知:
Event Time、Ingestion Time、Processing Time
Event-Time 表示事件发生的时间,Processing-Time 则表示处理消息的时间(墙上时间),Ingestion-Time 表示进入到系统的时间。
在 Flink 中咱们能够经过下面的方式进行 Time 类型的设置
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 设置使用 ProcessingTime
了解了 Time 以后,咱们还须要知道 Watermark 相关的概念。
咱们能够考虑一个这样的例子:某 App 会记录用户的全部点击行为,并回传日志(在网络很差的状况下,先保存在本地,延后回传)。A 用户在 11:02 对 App 进行操做,B 用户在 11:03 操做了 App,可是 A 用户的网络不太稳定,回传日志延迟了,致使咱们在服务端先接受到 B 用户 11:03 的消息,而后再接受到 A 用户 11:02 的消息,消息乱序了。
那咱们怎么保证基于 event-time 的窗口在销毁的时候,已经处理完了全部的数据呢?这就是 watermark 的功能所在。watermark 会携带一个单调递增的时间戳 t,watermark(t) 表示全部时间戳不大于 t 的数据都已经到来了,将来小于等于 t 的数据不会再来,所以能够放心地触发和销毁窗口了。下图中给了一个乱序数据流中的 Watermark 例子
上面的 Watermark 让咱们可以应对乱序的数据,可是真实世界中咱们无法获得一个完美的 Watermark 数值 — 要么无法获取到,要么耗费太大,所以实际工做中咱们会使用近似 watermark — 生成 watermark(t) 以后,还有较小的几率接受到时间戳 t 以前的数据,在 Flink 中将这些数据定义为 “late elements”, 一样咱们能够在 Window 中指定是容许延迟的最大时间(默认为 0),可使用下面的代码进行设置
设置allowedLateness
以后,迟来的数据一样能够触发窗口,进行输出,利用 Flink 的 side output 机制,咱们能够获取到这些迟到的数据,使用方式以下:
须要注意的是,设置了 allowedLateness 以后,迟到的数据也可能触发窗口,对于 Session window 来讲,可能会对窗口进行合并,产生预期外的行为。
在讨论 Window 内部实现的时候,咱们再经过下图回顾一下 Window 的生命周期
每条数据过来以后,会由 WindowAssigner 分配到对应的 Window,当 Window 被触发以后,会交给 Evictor(若是没有设置 Evictor 则跳过),而后处理 UserFunction。其中 WindowAssigner,Trigger,Evictor 咱们都在上面讨论过,而 UserFunction 则是用户编写的代码。
整个流程还有一个问题须要讨论:Window 中的状态存储。咱们知道 Flink 是支持 Exactly Once 处理语义的,那么 Window 中的状态存储和普通的状态存储又有什么不同的地方呢?
首先给出具体的答案:从接口上能够认为没有区别,可是每一个 Window 会属于不一样的 namespace,而非 Window 场景下,则都属于 VoidNamespace ,最终由 State/Checkpoint 来保证数据的 Exactly Once 语义,下面咱们从 org.apache.flink.streaming.runtime.operators.windowing.WindowOperator 摘取一段代码进行阐述
从上面咱们能够知道,Window 中的的元素一样是经过 State 进行维护,而后由 Checkpoint 机制保证 Exactly Once 语义。
至此,Time、Window 相关的全部内容都已经讲解完毕,主要包括为何要有 Window; Window 中的三个核心组件:WindowAssigner、Trigger 和 Evictor;Window 中怎么处理乱序数据,乱序数据是否容许延迟,以及怎么处理迟到的数据;最后咱们梳理了整个 Window 的数据流程,以及 Window 中怎么保证 Exactly Once 语义。
▼ Apache Flink 社区推荐 ▼
Apache Flink 及大数据领域顶级盛会 Flink Forward Asia 2019 重磅开启,目前正在征集议题,限量早鸟票优惠ing。了解 Flink Forward Asia 2019 的更多信息,请查看:
developer.aliyun.com/special/ffa…
首届 Apache Flink 极客挑战赛重磅开启,聚焦机器学习与性能优化两大热门领域,40万奖金等你拿,加入挑战请点击:
tianchi.aliyun.com/markets/tia…
关注 Flink 官方社区微信公众号,了解更多 Flink 资讯!