从1.5.0开始,Flink提供了一种新的State类型,称为Broadcast State。在这篇文章中,咱们将解释什么是Broadcast State,并展现如何将其应用于评估事件流上的动态模式的应用的示例。咱们将向您介绍处理步骤和源代码,以实现此应用。html
Broadcast State可用于以特定方式组合和联合处理两个事件流。第一个流的事件被广播到一个算子的全部并行实例,该算子将它们保存为状态。另外一个流的事件不广播,而是发送给同一个算子的单个实例,并与广播流的事件一块儿处理。对于须要链接低吞吐量和高吞吐量流或须要动态更新处理逻辑的应用来讲,新的broadcast state很是适合。咱们将使用一个具体示例来解释broadcast state,并在本文的其他部分更详细地展现其API。java
想象一下,一个电子商务网站捕获全部用户的交互做为用户行为流。运营网站的公司有兴趣分析交互,以增长收入,改善用户体验,并检测和防止恶意行为。该网站实现了一个流应用,该应用检测用户事件流上的模式。可是,公司但愿避免每次模式改变时修改和从新部署应用。相反,当应用接收到来自模式流的新模式时,它会摄取第二个模式流并更新其活动模式。在下面,咱们将逐步讨论这个应用,并展现它如何利用Flink中的broadcast state特性。apache
咱们的示例应用包含两个数据流。第一个流提供用户在网站上的行为,如上图的左上方所示。用户交互事件包括行为的类型(用户登陆、用户注销、添加到购物车或完成支付)和用户的id,该id由颜色编码(不一样颜色表明不一样用户)。咱们插图中的用户行为事件流包含用户1001的注销行为,用户1003的支付行为和用户1002的加入购物车行为。微信
第二个流提供应用将评估的行为模式。模式由两个连续的行为组成。在上面的图中,模式流包含如下两个:并发
这些模式有助于企业更好地分析用户行为、检测恶意行为和改善网站体验。例如,若是项目被添加到购物车而没有后续购买,网站团队能够采起适当的行动,以更好地理解用户不完成购买的缘由,并启动特定的程序来提升网站转化(例如提供折扣码、限时免费送货优惠等)。ide
在右侧,该图显示了算子的三个并行任务,它们摄取模式和用户行为流,评估行为流上的模式,并向下游发出模式匹配。为了简单起见,在咱们的示例中,算子只计算一个模式,只包含两个后续行为。当从模式流接收到新模式时,替换当前活动的模式。原则上,还能够实现一个算子来同时评估更复杂的模式或多个模式,这些模式能够单独添加或删除。函数
咱们将描述模式匹配应用如何处理用户行为流和模式流。网站
首先,将模式发送给算子。该模式被广播到算子的全部三个并行任务。任务将模式存储在其broadcast state中。因为broadcast state只应该使用广播数据更新,因此全部任务的状态都是相同的。this
接下来,在用户id上对第一个用户行为进行分区,并将其发送给下游算子。分区确保同一个用户的全部行为都由同一个任务处理。上图显示了应用在第一个模式以后的状态,以及算子消耗了前三个行为事件。编码
当任务接收到新的用户行为时,它会经过查看用户的最新和先前行为来评估当前活动模式。对于每一个用户,运算符将前面的操为存储在keyed state。因为图中的任务到目前为止只接收到每一个用户的一个行为(咱们刚刚启动应用),所以不须要对模式进行评估。最后,处于用户keyed state的前一个行为被更新为最新的行为,以便可以在同一用户的下一个行为到达时查找它。
在处理前三个行为以后,下一个事件(用户1001的注销行为)被发送处处理用户1001的事件的任务。当任务接收到行为时,它从broadcast state和用户1001的前一个行为中查找当前模式。因为模式与两个行为匹配,任务将发出模式匹配事件。最后,该任务经过使用最新行为覆盖前一个事件来更新其keyed state。
当一个新模式到达模式流时,它将被广播到全部任务,每一个任务经过用新模式替换当前模式来更新其broadcast state。
一旦用新模式更新broadcast state,匹配逻辑就会像之前同样继续,即用户行为事件按key进行分区,并由负责的任务进行评估。
到目前为止,咱们从概念上讨论了这个应用,并解释了它如何使用broadcast state来评估事件流上的动态模式。接下来,咱们将展现如何使用Flink的Datastream API和broadcast state特性来实现示例应用。
让咱们从应用的输入数据开始。咱们有两个数据流,行为流和模式流。在这一点上,咱们并不关流从何而来。这些流多是从Kafka、Kinesis或任何其余系统中摄取的。行为和模式是Pojos,每一个字段有两个:
DataStream<Action> actions = ???
DataStream<Pattern> patterns = ???
复制代码
Action
和Pattern
Pojos有两个字段:
Action: Long userId, String action
Pattern: String firstAction, String secondAction
第一步,咱们在流上使用userId
属性进行keyBy操做。
KeyedStream<Action, Long> actionsByUser = actions
.keyBy((KeySelector<Action, Long>) action -> action.userId);
复制代码
接下来,咱们准备broadcast state。broadcast state始终表示为MapState,这是Flink提供的最通用的状态原语。
MapStateDescriptor<Void, Pattern> bcStateDescriptor =
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
复制代码
因为咱们一次仅评估和存储单个模式,咱们将broadcast state配置为具备键类型Void和值类型Pattern的MapState。模式始终存储在MapState中,并将null做为键。
BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor);
复制代码
对于broadcast state应该使用MapStateDescriptor
,咱们在patterns流上调用broadcast()
方法将它转换为BroadcastStream流bcedPatterns.
DataStream<Tuple2<Long, Pattern>> matches = actionsByUser
.connect(bcedPatterns)
.process(new PatternEvaluator());
复制代码
咱们获得了keyed以后的actionsByUser流与广播流bcedPatterns,咱们调用connect()
方法将他们链接在一块儿而后在流上应用PatternEvaluator
。PatternEvaluator
实现了KeyedBroadcastProcessFunction
接口。它应用咱们前面讨论过的模式匹配逻辑,并发送包含用户id和匹配模式的记录的Tuple2<Long, Pattern>
。
public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {
// handle for keyed state (per user)
ValueState<String> prevActionState;
// broadcast state descriptor
MapStateDescriptor<Void, Pattern> patternDesc;
@Override
public void open(Configuration conf) {
// initialize keyed state
prevActionState = getRuntimeContext().getState(
new ValueStateDescriptor<>("lastAction", Types.STRING));
patternDesc =
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
}
/** * Called for each user action. * Evaluates the current pattern against the previous and * current action of the user. */
@Override
public void processElement( Action action, ReadOnlyContext ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception {
// get current pattern from broadcast state
Pattern pattern = ctx
.getBroadcastState(this.patternDesc)
// access MapState with null as VOID default value
.get(null);
// get previous action of current user from keyed state
String prevAction = prevActionState.value();
if (pattern != null && prevAction != null) {
// user had an action before, check if pattern matches
if (pattern.firstAction.equals(prevAction) &&
pattern.secondAction.equals(action.action)) {
// MATCH
out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
}
}
// update keyed state and remember action for next pattern evaluation
prevActionState.update(action.action);
}
/** * Called for each new pattern. * Overwrites the current pattern with the new pattern. */
@Override
public void processBroadcastElement( Pattern pattern, Context ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception {
// store the new pattern by updating the broadcast state
BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
// storing in MapState with null as VOID default value
bcState.put(null, pattern);
}
}
复制代码
这个KeyedBroadcastProcessFunction
接口提供了处理记录和发出结果的三种方法。
processBroadcastElement()
: 在广播流的每一个记录调进来的时候用。在PatternEvaluator
函数,咱们简单地将接收到的Pattern
使用null
键(记住,咱们只在MapState
).processElement()
: 在keyed stream的每一个记录进来的时候调用。它提供对Broadcast State的只读访问,以防止对跨函数并行实例的不一样broadcast state的修改。这PatternEvaluator的processElement()
方法从broadcast state检索当前模式,从keyed state检索用户的先前行为。若是二者都存在,它将检查前面和当前的行为是否与模式匹配,若是匹配话,它会发出模式匹配记录。最后,它将keyed state更新为当前用户行为。onTimer()
: 在以前注册过的计时器触发时调用。计时器能够在processElement
方法中注册,用于执行计算或清除未来的状态。为了保持代码的简洁性咱们没有在咱们的示例中实现这个方法。可是,当用户在一段时间内没有活动时,可使用它来删除用户的最后一个行为,以免因为不活动的用户而致使state的增加。你可能已经注意到KeyedBroadcastProcessFunction
的process方法。context 对象容许使用其余功能,如:
TimerService
,它容许访问记录的时间戳、当前watermark,而且能够注册计时器processElement()
方法中可用),以及一种将函数应用于每一个注册key的keyed state的方法(仅在processBroadcastElement()
方法中可用)这个KeyedBroadcastProcessFunction
就像其余ProcessFunction同样彻底能够访问Flink中的state和时间特性,所以能够用来实现复杂的逻辑。broadcast state被设计成一个通用的特性,能够适应不一样的场景和用例。虽然咱们只讨论了一个至关简单和受限的应用,但您能够经过多种方式使用broadcast state来实现应用的需求。
在这篇文章中,咱们向您介绍了一个示例应用,以解释Flink的broadcast state是什么,以及如何使用它来评估事件流上的动态模式。咱们还讨论了API,并展现了示例应用的源码。
欢迎关注个人微信公众号