本文根据Apache Flink 实战&进阶篇系列直播课程整理而成,由哈啰出行大数据实时平台资深开发刘博分享。经过一些简单的实际例子,从概念原理,到如何使用,再到功能的扩展,但愿可以给打算使用或者已经使用的同窗一些帮助。数据库
主要的内容分为以下三个部分:服务器
CEP的意思是复琐事件处理,例如:起床-->洗漱-->吃饭-->上班等一系列串联起来的事件流造成的模式称为CEP。若是发现某一次起床后没有刷牙洗脸亦或是吃饭就直接上班,就能够把这种非正常的事件流匹配出来进行分析,看看今天是否是起晚了。网络
下图中列出了几个例子:架构
Flink CEP内部是用NFA(非肯定有限自动机)来实现的,由点和边组成的一个状态图,以一个初始状态做为起点,通过一系列的中间状态,达到终态。点分为起始状态、中间状态、最终状态三种,边分为take、ignore、proceed三种。运维
下面以一个打车的例子来展现状态是如何流转的,规则见下图所示。函数
以乘客制定行程做为开始,匹配乘客的下单事件,若是这个订单超时尚未被司机接单的话,就把行程事件和下单事件做为结果集往下游输出。大数据
假如消息到来顺序为:行程-->其余-->下单-->其余。优化
状态流转以下:阿里云
一、开始时状态处于行程状态,即等待用户制定行程。spa
二、当收到行程事件时,匹配行程状态的条件,把行程事件放到结果集中,经过take边将状态往下转移到下单状态。
三、因为下单状态上有一条ignore边,因此能够忽略收到的其余事件,直到收到下单事件时将其匹配,放入结果集中,而且将当前状态往下转移到超时未接单状态。这时候结果集当中有两个事件:制定行程事件和下单事件。
四、超时未接单状态时,若是来了一些其余事件,一样能够被ignore边忽略,直到超时事件的触发,将状态往下转移到最终状态,这时候整个模式匹配成功,最终将结果集中的制定行程事件和下单事件输出到下游。
上面是一个匹配成功的例子,若是是不成功的例子会怎么样?
假如当状态处于超时未接单状态时,收到了一个接单事件,那么就不符合超时未被接单的触发条件,此时整个模式匹配失败,以前放入结果集中的行程事件和下单事件会被清理。
本节将详细介绍Flink CEP的程序结构以及API。
主要分为两部分:定义事件模式和匹配结果处理。
官方示例以下:
程序结构分为三部分:首先须要定义一个模式(Pattern),即第2行代码所示,接着把定义好的模式绑定在DataStream上(第25行),最后就能够在具备CEP功能的DataStream上将匹配的结果进行处理(第27行)。
下面对关键部分作详细讲解:
定义模式:上面示例中,分为了三步,首先匹配一个ID为42的事件,接着匹配一个体积大于等于10的事件,最后等待收到一个name等于end的事件。
匹配结果输出:此部分,须要重点注意select函数(第30行,注:本文基于Flink 1.7版本)里边的Map类型的pattern参数,Key是一个pattern的name,它的取值是模式定义中的Begin节点start,或者是接下来next里面的middle,或者是第三个步骤的end。后面的map中的value是每一步发生的匹配事件。因在每一步中是可使用循环属性的,能够匹配发生屡次,因此map中的value是匹配发生屡次的全部事件的一个集合。
上图中,蓝色方框表明的是一个个单独的模式;浅黄色的椭圆表明的是这个模式上能够添加的属性,包括模式能够发生的循环次数,或者这个模式是贪婪的仍是可选的;橘色的椭圆表明的是模式间的关系,定义了多个模式之间是怎么样串联起来的。经过定义模式,添加相应的属性,将多个模式串联起来三步,就能够构成了一个完整的Flink CEP程序。
下面是示例代码:
定义模式主要有以下5个部分组成:
pattern:前一个模式
next/followedBy/...:开始一个新的模式
start:模式名称
where:模式的内容
filter:核心处理逻辑
接下来介绍一下怎样设置模式的属性。模式的属性主要分为循环属性和可选属性。
循环属性能够定义模式匹配发生固定次数(times),匹配发生一次以上(oneOrMore),匹配发生屡次以上。(timesOrMore)。
可选属性能够设置模式是贪婪的(greedy),即匹配最长的串,或设置为可选的(optional),有则匹配,无则忽略。
因为模式的匹配事件存放在状态中进行管理,因此须要设置一个全局的有效期(within)。 若不指定有效期,匹配事件会一直保存在状态中不会被清除。至于有效期能开多大,要依据具体使用场景和数据量来衡量,关键要看匹配的事件有多少,随着匹配的事件增多,新到达的消息遍历以前的匹配事件会增长CPU、内存的消耗,而且随着状态变大,数据倾斜也会愈来愈严重。
主要分为三种:严格连续性(next/notNext),宽松连续性(followedBy/notFollowedBy),和非肯定宽松连续性(followedByAny)。
三种模式匹配的差异见下表所示:
总结以下:
除了前面提到的模式定义和模式间的联系,还能够把相连的多个模式组合在一块儿当作一个模式组,相似于视图,能够在这个模式视图上进行相关操做。
上图这个例子里面,首先匹配了一个登陆事件,而后接下来匹配浏览,下单,购买这三个事件反复发生三次的用户。
若是没有模式组的话,代码里面浏览,下单,购买要写三次。有了模式组,只需把浏览,下单,购买这三个事件当作一个模式组,把相应的属性加上times(3)就能够了。
处理匹配的结果主要有四个接口: PatternFlatSelectFunction,PatternSelectFunction,PatternFlatTimeoutFunction和PatternTimeoutFunction。
从名字上能够看出,输出能够分为两类:select和flatSelect指定输出一条仍是多条,timeoutFunction和不带timeout的Function指定可不能够对超时事件进行旁路输出。
下图是输出的综合示例代码:
当一个事件到来时,若是这个事件同时符合多个输出的结果集,那么这个事件是如何保存的?
Flink CEP经过Dewey计数法在多个结果集共享同一个事件副本,以实现对事件副本进行资源共享。
本章主要介绍一些Flink CEP的扩展,讲述如何作到超时机制的精确管理,以及规则的动态加载与更新。
原生Flink CEP中超时触发的功能能够经过within+outputtag结合来实现,可是在复杂的场景下处理存在问题,以下图所示,在下单事件后还有一个预付款事件,想要获得下单而且预付款后超时未被接单的订单,该如何表示呢?
参照下单后超时未被接单的作法,把下单而且预付款后超时未被接单规则表示为下单.followedBy(预付款).followedBy(接单).within(time),那么这样实现会存在问题吗?
这种作法的计算结果是会存在脏数据的,由于这个规则不只匹配到了下单而且预付款后超时未被接单的订单(想要的结果),一样还匹配到了只有下单行为后超时未被接单的订单(脏数据,没有预付款)。缘由是由于超时within是控制在整个规则上,而不是某一个状态节点上,因此不论当前的状态是处在哪一个状态节点,超时后都会被旁路输出。
那么就须要考虑可否经过时间来直接对状态转移作到精确的控制,而不是经过规则超时这种曲线救国的方式。 因而乎,在经过消息触发状态的转移以外,须要增长经过时间触发状态的转移的支持。要实现此功能,须要在原来的状态以及状态转移中,增长时间属性的概念。以下图所示,经过wait算子来获得waiting状态,而后在waiting状态上设置一个十秒的时间属性以定义一个十秒的时间窗口。
wait算子对应NFA中的ignore状态,将在没有到达时间窗口结束时间时自旋,在ComputationState中记录wait的开始时间,在NFA的doProcess中,将到来的数据与waiting状态处理,若是到了waiting的结束时间,则进行状态转移。
上图中红色方框中为waiting状态设置了两条ignore边:
1.waitingStatus.addIgnore(lastSink,waitingCondition),waitingCondition中的逻辑是获取当前的时间(支持事件时间),判断有没有超过设置的waiting阈值,若是超过就把状态向后转移。
2.waitingStatus.addIgnore(waitingCondition),waitingCondition中若是未达到设置的waiting阈值,就会自旋在当前的waiting状态不变。
线上运行的CEP中确定常常遇到规则变动的状况,若是每次变动时都将任务重启、从新发布是很是不优雅的。尤为在营销或者风控这种对实时性要求比较高的场景,若是规则窗口过长(一两个星期),状态过大,就会致使重启时间延长,期间就会形成一些想要处理的异常行为不能及时发现。
那么要怎么样作到规则的动态更新和加载呢?
梳理一下总体架构,Flink CEP是运行在Flink Job里的,而规则库是放在外部存储中的。首先,须要在运行的Job中能及时发现外部存储中规则的变化,即须要在Job中提供访问外部库的能力。 其次,须要将规则库中变动的规则动态加载到CEP中,即把外部规则的描述解析成Flink CEP所能识别的pattern结构体。最后,把生成的pattern转化成NFA,替换历史NFA,这样对新到来的消息,就会使用新的规则进行匹配。
下图就是一个支持将外部规则动态注入、更新的接口。
这个接口里面主要实现了四个方法:
新规则动态加载到Flink CEP的Job中,替换掉原来的NFA以后,还须要对历史匹配的结果集进行清理。在AbstractKeyedCEPPatternOperator中实现刷新NFA,注意,历史状态是否须要清理和业务相关:
使用Flink CEP,熟知其原理是很重要的,特别是NFA的状态转移流程,而后再去看源码中的状态图的构建就会很清晰了。
本文做者:巴蜀真人
本文为阿里云内容,未经容许不得转载。