本文翻译自官网:FlinkCEP - Complex event processing for Flink html
FlinkCEP是在Flink之上实现的复琐事件处理(CEP)库。 它使您能够检测无穷无尽的事件流中的事件模式,从而有机会掌握数据中的重要信息。java
本页描述Flink CEP中可用的API调用。 咱们首先介绍模式API,该API容许您指定要在流中检测的模式,而后介绍如何检测和处理匹配的事件序列。 而后,咱们介绍CEP库在处理事件时间的延迟时所作的假设,以及如何将做业从较旧的Flink版本迁移到Flink-1.3。 apache
若是您想直接使用,请设置一个Flink程序,并将FlinkCEP依赖项添加到您项目的pom.xml中。c#
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.11</artifactId> <version>1.9.0</version> </dependency>
Info: FlinkCEP不是二进制分发包的一部分。 在此处查看如何与它连接以执行集群。api
如今,您能够开始使用Pattern API编写第一个CEP程序。markdown
注意:您要向其应用模式匹配的DataStream中的事件必须实现适当的equals()和hashCode()方法,由于FlinkCEP使用它们来比较和匹配事件。ide
val input: DataStream[Event] = ... val pattern = Pattern.begin[Event]("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end") val patternStream = CEP.pattern(input, pattern) val result: DataStream[Alert] = patternStream.process( new PatternProcessFunction[Event, Alert]() { override def processMatch( `match`: util.Map[String, util.List[Event]], ctx: PatternProcessFunction.Context, out: Collector[Alert]): Unit = { out.collect(createAlertFrom(pattern)) } })
模式API容许您定义要从输入流中提取的复杂模式序列。函数
每一个复杂模式序列都包含多个简单模式,即寻找具备相同属性的单个事件的模式。 从如今开始,咱们将称这些简单模式为模式,以及咱们在流中搜索的最终复杂模式序列,即模式序列。 您能够将模式序列视为此类模式的图,其中根据用户指定的条件(例如,从一个模式到下一个模式)的转换。 event.getName()。equals(“ end”)。 匹配是一系列输入事件,经过一系列有效的模式转换来访问复杂模式图的全部模式。oop
注意:每一个模式都必须具备惟一的名称,稍后您将使用该名称来标识匹配的事件。 ui
注意:模式名称不能包含字符":"
。
在本节的其他部分,咱们将首先描述如何定义单个模式,而后如何将单个模式组合为复杂模式。
模式能够是单例或循环模式。 单例模式接受一个事件,而循环模式能够接受多个事件。 在模式匹配符号中,模式“ a b + c?d”(或“ a”,后跟一个或多个“ b”,可选地后跟“ c”,后跟“ d”),a,c ?和d是单例模式,而b +是循环的模式。 默认状况下,模式是单例模式,您可使用“量词”将其转换为循环模式。 每一个模式均可以具备一个或多个条件,基于该条件能够接受事件。
在FlinkCEP中,您可使用如下方法指定循环模式:pattern.oneOrMore(),用于指望一个或多个给定事件发生的模式(例如,前面提到的b +); 和pattern.times(#ofTimes),用于指望在给定类型的事件中出现特定次数的模式,例如 4次;pattern.times(#fromTimes,#toTimes)用于指望在给定类型的事件中出现特定的最小发生次数和最大发生次数的模式,例如 2-4
您可使用pattern.greedy()方法使循环模式变得贪婪(注:尽量的多匹配),但仍没法使组模式变得贪婪。 您可使用pattern.optional()方法将全部模式(是否循环)设为可选。
对于名为start的模式,如下是有效的量词:
// expecting 4 occurrences start.times(4) // expecting 0 or 4 occurrences start.times(4).optional() // expecting 2, 3 or 4 occurrences start.times(2, 4) // expecting 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).greedy() // expecting 0, 2, 3 or 4 occurrences start.times(2, 4).optional() // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).optional().greedy() // expecting 1 or more occurrences start.oneOrMore() // expecting 1 or more occurrences and repeating as many as possible start.oneOrMore().greedy() // expecting 0 or more occurrences start.oneOrMore().optional() // expecting 0 or more occurrences and repeating as many as possible start.oneOrMore().optional().greedy() // expecting 2 or more occurrences start.timesOrMore(2) // expecting 2 or more occurrences and repeating as many as possible start.timesOrMore(2).greedy() // expecting 0, 2 or more occurrences start.timesOrMore(2).optional() // expecting 0, 2 or more occurrences and repeating as many as possible start.timesOrMore(2).optional().greedy()
对于每种模式,您均可以指定一个传入事件必须知足的条件才能被“接受”到模式中,例如 其值应大于5,或大于先前接受的事件的平均值。 您能够经过pattern.where(),pattern.or()或pattern.until()方法在事件属性上指定条件。 这些能够是IterativeConditions或SimpleConditions。
迭代条件:这是最通用的条件类型。 这样,您能够根据先前接受的事件的属性或部分事件的统计信息来指定接受后续事件的条件。
如下是一个迭代条件的代码,若是名称以“ foo”开头,而且该模式先前接受的事件的价格加上当前价格不超过5.0 ,则接受下一个事件 为“中间”的模式的事件。 迭代条件可能很强大,尤为是与循环模式(例如循环模式)结合使用时 一个或多个()。
middle.oneOrMore() .subtype(classOf[SubEvent]) .where( (value, ctx) => { lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum value.getName.startsWith("foo") && sum + value.getPrice < 5.0 } )
注意:调用ctx.getEventsForPattern(...)会找到给定潜在匹配项的全部先前接受的事件。 该操做的成本可能会有所不一样,所以在实施时,请尽可能减小其使用。
所描述的上下文也使事件时间特性具备一种访问方式。 有关更多信息,请参见时间上下文。
简单条件:这种类型的条件扩展了前面提到的IterativeCondition类,并仅基于事件自己的属性来决定是否接受事件。
start.where(event => event.getName.startsWith("foo"))
最后,您还能够经过pattern.subtype(subClass)方法将接受事件的类型限制为初始事件类型的子类型(此处为Event)。
start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
组合条件:如上所示,您能够将子类型条件与其余条件组合。 这适用于全部条件。 您能够经过顺序调用where()任意组合条件。 最终结果将是各个条件结果的逻辑与。
要使用OR合并条件,可使用or()方法,以下所示。
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
中止条件:若是是循环模式(oneOrMore()和oneOrMore()。optional()),您还能够指定中止条件,例如 接受值大于5的事件,直到值的总和小于50。
为了更好地理解它,请看如下示例。
"(a+ until b)"
(一个或多个"a"
直到"b"
)的模式
一系列传入事件 "a1" "c" "a2" "b" "a3"
该库将输出结果:{a1 a2} {a1} {a2} {a3}
。
如您所见{a1 a2 a3}
,{a2 a3}
因为中止条件而未返回。
Pattern Operation | Description |
---|---|
where(condition) | 定义当前模式的条件。 为了匹配模式,事件必须知足条件。 多个连续的where()子句致使其条件与: pattern.where(event => ... /* some condition */) |
or(condition) | 添加与现有条件进行“或”运算的新条件。 一个事件只有经过至少一个条件才能匹配模式: pattern.where(event => ... /* some condition */) .or(event => ... /* alternative condition */) |
until(condition) | 指定循环模式的中止条件。 意味着若是发生符合给定条件的事件,则模式中将再也不接受任何事件. 仅与oneOrMore()结合使用 注意:它容许在基于事件的状况下为相应的模式清除状态. pattern.oneOrMore().until(event => ... /* some condition */) |
subtype(subClass) | 定义当前模式的子类型条件。 若是事件属于此子类型,则该事件只能与该模式匹配: pattern.subtype(classOf[SubEvent]) |
oneOrMore() | 指定此模式指望至少发生一次匹配事件. 默认状况下,使用宽松的内部连续性(在后续事件之间)。 有关内部连续性的更多信息,请参见连续. 注意:建议使用until()或within()来启用清除状态 pattern.oneOrMore() |
timesOrMore(#times) | 指定此模式指望至少出现 #times 次匹配事件. 默认状况下,使用宽松的内部连续性(在后续事件之间)。 有关内部连续性的更多信息,请参见连续. pattern.timesOrMore(2) |
times(#ofTimes) | 指定此模式指望发生匹配事件的确切次数. 默认状况下,使用宽松的内部连续性(在后续事件之间)。 有关内部连续性的更多信息,请参见连续. pattern.times(2) |
times(#fromTimes, #toTimes) | 指定此模式指望匹配事件的#fromTimes和#toTimes之间发生. 默认状况下,使用宽松的内部连续性(在后续事件之间)。 有关内部连续性的更多信息,请参见连续. pattern.times(2, 4) |
optional() | 指定此模式是可选的,即它可能根本不会发生。 这适用于全部上述量词. pattern.oneOrMore().optional() |
greedy() | 指定此模式为贪婪模式,即将重复尽量多的匹配。 这仅适用于量词,目前不支持分组模式. pattern.oneOrMore().greedy() |
既然您已经了解了单个模式的描述,那么如今该看看如何将它们组合成完整的模式序列了。
模式序列必须从初始模式开始,以下所示:
val start : Pattern[Event, _] = Pattern.begin("start")
接下来,能够经过在模式序列之间指定所需的连续性条件,将更多模式附加到模式序列中。FlinkCEP支持如下事件之间的连续性形式:
严格连续性:指望全部匹配事件严格地一个接一个地出现,而中间没有任何不匹配事件。
宽松连续性:忽略在匹配事件之间出现的不匹配事件。
非肯定性宽松连续性:进一步的宽松连续性,容许其余匹配忽略某些匹配事件。
要在连续模式之间应用它们,可使用:
next()
为严格的followedBy()
为宽松的followedByAny()
,用于不肯定的宽松邻接或
notNext()
,若是您不但愿某个事件类型直接跟随另外一个事件notFollowedBy()
,若是您不但愿事件类型介于其余两个事件类型之间。注意:模式序列不能以 notFollowedBy() 结尾。
注意:NOT模式不能在可选模式以前。
// strict contiguity val strict: Pattern[Event, _] = start.next("middle").where(...) // relaxed contiguity val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...) // non-deterministic relaxed contiguity val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...) // NOT pattern with strict contiguity val strictNot: Pattern[Event, _] = start.notNext("not").where(...) // NOT pattern with relaxed contiguity val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)
宽松连续性意味着仅将匹配第一个后续匹配事件,而对于不肯定的宽松连续性,将针对同一开始发出多个匹配项。例如,"a b"
给定事件序列的pattern "a", "c", "b1", "b2"
将给出如下结果:
"a"
和"b"之间的严格邻接:{}(不匹配),“ a”以后的“ c”会致使“ a”被丢弃。
"a"
和"b"
之间宽松连续性:{a b1}
做为轻松的连续性被视为“跳过不匹配的事件,直到下一个匹配的一个”。
“ a”和“ b”之间的不肯定肯定的宽松连续性:{a b1},{a b2},由于这是最通用的形式。
也能够定义时间约束以使模式有效。例如,您能够经过pattern.within()
方法定义一个模式应该在10秒内发生。处理和事件时间都支持时间模式。
注意:模式序列只能具备一个时间约束。若是在不一样的单独模式上定义了多个这样的约束,那么将应用最小的约束。
next.within(Time.seconds(10))
您能够在循环模式中应用与上一节中讨论的相同的邻接条件。 连续性将应用于接受到这种模式的元素之间。 为了举例说明上述内容,请使用输入“ a”的模式序列“ a b + c”(“ a”,而后是一个或多个“ b”的任意(非肯定性宽松)序列,后跟“ c”) “,” b1“,” d1“,” b2“,” d2“,” b3“,” c“将具备如下结果:
严格连续性:{a b3 c} –在“ b1”以后的“ d1”致使“ b1”被丢弃,因为“ d2”,对于“ b2”也同样。
宽松的连续性:{a b1 c},{a b1 b2 c},{a b1 b2 b3 c},{a b2 c},{a b2 b3 c},{a b3 c}-忽略“ d”。
非肯定性宽松的连续性:{a b1 c},{a b1 b2 c},{a b1 b3 c},{a b1 b2 b3 c},{a b2 c},{a b2 b3 c},{a b3 c}-请注意{ a b1 b3 c},这是“ b”之间的松弛连续性的结果。
对于循环模式(例如oneOrMore()
和times()
),默认设置为宽松连续性。若是要严格邻接,则必须使用consecutive()
调用显式指定它,若是要 非肯定性宽松邻接,则可使用该allowCombinations()
调用。
也能够将模式序列定义为begin,followedBy,followedByAny和next的条件。 模式序列在逻辑上将被视为匹配条件,而且将返回GroupPattern,而且能够应用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),continuous(), allowCombinations()到GroupPattern。
val start: Pattern[Event, _] = Pattern.begin( Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...) ) // strict contiguity val strict: Pattern[Event, _] = start.next( Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...) ).times(3) // relaxed contiguity val relaxed: Pattern[Event, _] = start.followedBy( Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...) ).oneOrMore() // non-deterministic relaxed contiguity val nonDetermin: Pattern[Event, _] = start.followedByAny( Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...) ).optional()
Pattern Operation | Description |
---|---|
begin(#name) | 定义开始模式: val start = Pattern.begin[Event]("start") |
begin(#pattern_sequence) | 定义开始模式: val start = Pattern.begin( Pattern.begin[Event]("start").where(...).followedBy("middle").where(...) ) |
next(#name) | 追加一个新模式。 匹配事件必须直接接在先前的匹配事件以后(严格连续): val next = start.next("middle") |
next(#pattern_sequence) | 追加一个新模式。 一系列匹配事件必须直接接续先前的匹配事件(严格连续): val next = start.next( Pattern.begin[Event]("start").where(...).followedBy("middle").where(...) ) |
followedBy(#name) | 追加一个新模式。 匹配事件和上一个匹配事件之间可能会发生其余事件(宽松的连续性): val followedBy = start.followedBy("middle") |
followedBy(#pattern_sequence) | 追加一个新模式。 在一系列匹配事件和上一个匹配事件之间可能会发生其余事件(宽松的连续性): val followedBy = start.followedBy( Pattern.begin[Event]("start").where(...).followedBy("middle").where(...) ) |
followedByAny(#name) | 追加一个新模式。 在匹配事件和先前的匹配事件之间可能会发生其余事件, 而且将为每一个替代匹配事件显示替代匹配(不肯定性宽松邻接): val followedByAny = start.followedByAny("middle") |
followedByAny(#pattern_sequence) | 追加一个新模式。 在匹配事件序列和先前的匹配事件之间可能会发生其余事件, 而且将为匹配事件的每一个替代序列(非肯定性宽松连续性)提供替代匹配: val followedByAny = start.followedByAny( Pattern.begin[Event]("start").where(...).followedBy("middle").where(...) ) |
notNext() | 附加新的否认模式。 匹配(否认)事件必须直接继承先前的匹配事件(严格连续性)才能放弃部分匹配: val notNext = start.notNext("not") |
notFollowedBy() | 附加新的否认模式。 即便在匹配(负)事件和上一个匹配事件(宽松的邻接)之间发生其余事件,部分匹配事件序列也将被丢弃: val notFollowedBy = start.notFollowedBy("not") |
within(time) | 定义事件序列与模式匹配的最大时间间隔。 若是未完成的事件序列超过此时间,则将其丢弃: pattern.within(Time.seconds(10)) |
对于给定的模式,能够将同一事件分配给多个成功的匹配。 要控制将事件分配给多少个匹配项,您须要指定一个名为AfterMatchSkipStrategy的跳过策略。 跳过策略有五种类型,列出以下:
请注意,在使用SKIP_TO_FIRST和SKIP_TO_LAST跳过策略时,还应指定有效的PatternName。
例如,对于给定的模式b + c和数据流b1 b2 b3 c,这四种跳过策略之间的差别以下:
Skip Strategy | Result | Description |
---|---|---|
NO_SKIP | b1 b2 b3 c b2 b3 c b3 c |
找到匹配的b1 b2 b3 c后,匹配过程将不会丢弃任何结果。 |
SKIP_TO_NEXT | b1 b2 b3 c b2 b3 c b3 c |
找到匹配的b1 b2 b3 c以后,匹配过程将不会丢弃任何结果, 由于没有其余匹配能够从b1开始. |
SKIP_PAST_LAST_EVENT | b1 b2 b3 c |
找到匹配的b1 b2 b3 c后,匹配过程将丢弃全部开始的部分匹配. |
SKIP_TO_FIRST[b ] |
b1 b2 b3 c b2 b3 c b3 c |
找到匹配的b1 b2 b3 c以后,匹配过程将尝试丢弃b1以前开始的全部部分匹配, 但没有此类匹配。 所以,什么都不会被丢弃. |
SKIP_TO_LAST[b ] |
b1 b2 b3 c b3 c |
找到匹配的b1 b2 b3 c以后,匹配过程将尝试丢弃b3以前开始的全部部分匹配。 有这样的一次匹配 b2 b3 c |
再看看另外一个示例,以更好地了解NO_SKIP和SKIP_TO_FIRST之间的区别:模式:(a | b | c)(b | c)c + .greedy d和序列:a b c1 c2 c3 d而后结果将是:
Skip Strategy | Result | Description |
---|---|---|
NO_SKIP | a b c1 c2 c3 d b c1 c2 c3 d c1 c2 c3 d |
找到匹配a b c1 c2 c3 d后,匹配过程将不会丢弃任何结果. |
SKIP_TO_FIRST[c* ] |
a b c1 c2 c3 d c1 c2 c3 d |
在找到与b c1 c2 c3 d匹配以后,匹配过程将丢弃在c1以前开始的全部部分匹配。 有一个这样的匹配b c1 c2 c3 d. |
为了更好地理解NO_SKIP和SKIP_TO_NEXT之间的区别,请看如下示例:模式:a b +和序列:a b1 b2 b3而后结果将是:
Skip Strategy | Result | Description |
---|---|---|
NO_SKIP | a b1 a b1 b2 a b1 b2 b3 |
找到匹配的b1以后,匹配过程将不会丢弃任何结果. |
SKIP_TO_NEXT | a b1 |
找到匹配的b1以后,匹配过程将丢弃全部从a开始的部分匹配。 这意味着既不能生成b1 b2也不能生成b1 b2 b3. |
要指定要使用的跳过策略,只需经过调用如下内容来建立AfterMatchSkipStrategy:
Function | Description |
---|---|
AfterMatchSkipStrategy.noSkip() |
Create a NO_SKIP skip strategy |
AfterMatchSkipStrategy.skipToNext() |
Create a SKIP_TO_NEXT skip strategy |
AfterMatchSkipStrategy.skipPastLastEvent() |
Create a SKIP_PAST_LAST_EVENT skip strategy |
AfterMatchSkipStrategy.skipToFirst(patternName) |
使用引用的模式名称patternName建立一个SKIP_TO_FIRST跳过策略 |
AfterMatchSkipStrategy.skipToLast(patternName) |
使用引用的模式名称patternName建立一个SKIP_TO_LAST跳过策略 |
而后经过调用将跳过策略应用于模式:
val skipStrategy = ...
Pattern.begin("patternName", skipStrategy)
注意:对于SKIP_TO_FIRST / LAST,有两个选项能够处理在没有元素映射到指定变量时的状况。 默认状况下,将使用NO_SKIP策略。 另外一个选择是在这种状况下引起异常。 能够经过如下方式启用此选项:
AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
在指定了所需的模式序列以后,是时候将其应用于输入流以检测潜在的匹配了。 要针对模式序列运行事件流,必须建立一个PatternStream。 给定一个输入流输入,一个模式模式和一个可选的比较器比较器(用于在EventTime的状况下对具备相同时间戳的事件或在同一时刻到达的事件进行排序),您能够经过调用如下代码来建立PatternStream:
val input : DataStream[Event] = ... val pattern : Pattern[Event, _] = ... var comparator : EventComparator[Event] = ... // optional val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)
输入流能够是键的,也能够是非键的,具体取决于您的用例。
注意:将模式应用于非键控流将致使并行度等于1的做业。
得到PatternStream以后,能够将转换应用于检测到的事件序列。 建议的实现方式是经过PatternProcessFunction。
PatternProcessFunction具备一个processMatch方法,每一个匹配事件序列都会调用该方法。 它以Map <String,List <IN >>的形式接收匹配,其中键是模式序列中每一个模式的名称,值是该模式全部可接受事件的列表(IN是您的类型 输入元素)。 给定模式的事件按时间戳排序。 返回每一个模式的接受事件列表的缘由是,当使用循环模式(例如oneToMany()和times())时,给定模式可能会接受多个事件。
class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> { @Override public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception; IN startEvent = match.get("start").get(0); IN endEvent = match.get("end").get(0); out.collect(OUT(startEvent, endEvent)); } }
PatternProcessFunction容许访问Context对象。 有了它,就能够访问与时间相关的特征,例如currentProcessingTime或当前匹配的时间戳(这是分配给匹配的最后一个元素的时间戳)。 有关更多信息,请参见时间上下文。 经过这种状况,还能够将结果发送到副输出。
只要某个模式具备经过inner关键字附加的窗口长度,则可能会丢弃部分事件序列,由于它们超过了窗口长度。 要对超时的部分匹配采起行动,可使用TimedOutPartialMatchHandler接口。 该接口应该以混合样式使用。 这意味着您还可使用PatternProcessFunction实现此接口。 TimedOutPartialMatchHandler提供了额外的processTimedOutMatch方法,将为每一个超时的部分匹配调用该方法。
class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> { @Override public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception; ... } @Override public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception; IN startEvent = match.get("start").get(0); ctx.output(outputTag, T(startEvent)); } }
注意:processTimedOutMatch不能访问主输出。 可是,仍然能够经过Context对象经过侧面输出发出结果。
前面提到的PatternProcessFunction是在Flink 1.8中引入的,从那时起,它是与匹配项进行交互的推荐方法。 仍然可使用老式的API,例如select / flatSelect,该API在内部将转换为PatternProcessFunction。
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) val outputTag = OutputTag[String]("side-output") val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){ (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) => out.collect(TimeoutEvent()) } { (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) => out.collect(ComplexEvent()) } val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(outputTag)
在CEP中,处理元素的顺序很重要。 为了确保在事件时间内工做时按正确的顺序处理元素,将传入的元素最初放在缓冲区中,在该缓冲区中,元素根据其时间戳按升序排序,而且当水印到达时,该缓冲区中的全部元素 小于水印的时间戳被处理。 这意味着水印之间的元素按事件时间顺序进行处理。
注意:在事件时间内工做时,cep 库假定水印正确无误。
为了确保跨水印的元素按事件时间顺序进行处理,Flink的CEP库假定水印是正确的,并将其时间戳小于最后看到的水印的时间戳视为晚元素。 后期元素不会进一步处理。 另外,您能够指定sideOutput标签来收集在最后看到的水印以后出现的后期元素,您能够像这样使用它。
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) val lateDataOutputTag = OutputTag[String]("late-data") val result: SingleOutputStreamOperator[ComplexEvent] = patternStream .sideOutputLateData(lateDataOutputTag) .select{ pattern: Map[String, Iterable[ComplexEvent]] => ComplexEvent() } val lateData: DataStream[String] = result.getSideOutput(lateDataOutputTag)
在PatternProcessFunction和IterativeCondition中,用户能够访问实现TimeContext的上下文,以下所示:
/** * Enables access to time related characteristics such as current processing time or timestamp of * currently processed element. Used in {@link PatternProcessFunction} and * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition} */ @PublicEvolving public interface TimeContext { /** * Timestamp of the element currently being processed. * * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this * will be set to the time when event entered the cep operator. */ long timestamp(); /** Returns the current processing time. */ long currentProcessingTime(); }
此上下文使用户能够访问已处理事件的时间特征(在IterativeCondition状况下为传入记录,在PatternProcessFunction状况下为匹配)。 调用TimeContext#currentProcessingTime始终会为您提供当前处理时间的值,而且此调用应优先于例如 调用System.currentTimeMillis()。
若是使用TimeContext#timestamp(),则返回值等于使用EventTime时分配的时间戳。 在ProcessingTime中,该时间等于所述事件进入cep运算符的时间点(或在PatternProcessFunction的状况下生成匹配项的时间点)。 这意味着该值在对该方法的屡次调用中将保持一致。
如下示例在事件的键控数据流上检测模式的开始,中间(名称=“错误”)->结束(名称=“严重”)。 这些事件由其ID进行键控,而且有效模式必须在10秒内发生。 整个处理过程随事件时间而定。
val env : StreamExecutionEnvironment = ... env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val input : DataStream[Event] = ... val partitionedInput = input.keyBy(event => event.getId) val pattern = Pattern.begin[Event]("start") .next("middle").where(_.getName == "error") .followedBy("end").where(_.getName == "critical") .within(Time.seconds(10)) val patternStream = CEP.pattern(partitionedInput, pattern) val alerts = patternStream.select(createAlert(_))
在Flink-1.4中,删除了CEP库与<= Flink 1.2的向后兼容性。 不幸的是,不可能恢复曾经使用1.2.x运行的CEP做业。
Flink-1.3中的CEP库附带了许多新功能,这些新功能致使API发生了一些更改。 在这里,咱们描述了您须要对旧的CEP做业进行的更改,以便可以使用Flink-1.3运行它们。 进行这些更改并从新编译做业后,您将可以从做业的旧版本获取的保存点恢复执行,即无需从新处理过去的数据。
所需的更改是:
更改条件(where(...)
子句中的条件)以扩展SimpleCondition
类,而不是实现FilterFunction
接口。
更改做为select(...)和flatSelect(...)方法的参数提供的函数,以获取与每一个模式关联的事件列表(Java中为List,Scala中为Iterable)。 这是由于经过添加循环模式,多个输入事件能够匹配单个(循环)模式。
Flink 1.1和1.2中的followedBy()隐含了不肯定的宽松连续性(请参见此处)。 在Flink 1.3中,此更改已更改,而且followBy()表示宽松邻接,而若是须要非肯定性宽松邻接,则应使用followByAny()。
欢迎关注Flink菜鸟公众号,会不按期更新Flink(开发技术)相关的推文