从版本1.5.0开始,Apache Flink具备一种称为广播状态的新型状态。 在这篇文章中,咱们解释了广播状态是什么,并展现了如何将其应用于评估事件流上的动态模式的应用程序的示例。 咱们将引导您完成开发步骤和代码,以实现此应用程序。html
什么是广播状态apache
广播状态能够用于以特定的方式组合和联合两个事件流。第一个事件流被广播给算子的全部并行实例,这些实例将他们维持在状态中。 其它事件流将不会被广播,可是会被发给同一个算子的个别实例,并与广播流事件一块儿处理。新的广播状态很是适合须要加入低吞吐量和高吞吐量流或须要动态更新其处理逻辑的应用程序。咱们将使用后一个用例的具体示例来解释广播状态,并在本文的其他部分更详细地展现其API。并发
广播状态的动态模式评估ide
想象一下一个电子商务网站捕获全部用户的交互做为用户行为流。运营该网站的公司对于分析交互以增长收入,改善用户体验,以及检测和防止恶意行为很感兴趣。该网站实现了一个流应用程序,用于检测用户事件流上的模式。可是,公司但愿每次模式更改时都避免修改和从新部署应用程序。相反,应用程序在从模式流接收新行为时获取第二个模式流并更新其活动模式。在下文中,咱们将逐步讨论此应用程序,并展现它如何利用Apache Flink中的广播状态功能。函数
咱们的示例应用程序获取了两个数据流。第一个流在网站上提供用户操做,并在上图的左上方显示。用户交互事件包括操做的类型(用户登陆,用户注销,添加到购物车或完成支付)和用户的ID,他们都被各类颜色进行编码。在咱们的图示中的用户动做事件流包含用户1001的注销动做,其后是用户1003的支付完成事件,以及用户1002的“添加到购物车”动做。网站
第二个流的操做模式将会经过应用进行评估。模式由两个连续的动做组成。this
在上图中,模式流包含如下两个:编码
模式#1:用户登陆并当即注销并无浏览电子商务网站上的其余页面。lua
模式#2:用户将项目添加到购物车并在不完成购买的状况下注销。设计
这些模式有助于企业更好地分析用户行为,检测恶意行为并改善网站体验。例如,若是项目被添加到购物车而没有后续购买,网站团队能够采起适当的措施来更好地了解用户未完成购买的缘由并启动特定程序以改善网站环境( 如提供折扣,限时免费送货优惠等)。
在右侧,该图显示了一个算子的三个并行任务,即侵入模式和用户操做流,评估操做流上的模式,并在下游发出模式匹配。为了简单起见,在咱们例子中的算子仅仅评估具备两个后续操做的单个模式。当从模式流接收到新模式时,当前活动模式会被替换。实质上,这个算子还能够同时评估更复杂的模式或多个模式,这些模式能够单独添加或移除。
咱们将描述匹配应用程序的模式如何处理用户操做和模式流。
首先一个模式被发送给一个算子。这个模式将会被广播给全部算子的三个并行任务。任务将会将这个模式存储在广播状态中。因为广播状态只应使用广播数据进行更新,所以全部任务的状态始终预期相同。
接下来,第一个用户的操做将会根据用户的id进行分区,而且会被发送到相应算子的任务中。这个分区可以确保同一个用户的全部操做都会被同一个任务处理。上图显示了该算子处理了第一个模式和前三个操做事件后应用程序的状态。
当一个任务收到了一个新的用户操做,它会经过查看用户的最新和先前操做来评估当前活动的模式。对于每一个用户,算子会将先前的操做储存在key state中。因为上图中的任务到目前为止仅仅收到了每一个用户的一个操做(咱们刚刚启动了应用程序),所以不须要评估该模式。最后,存储在key state中的用户的先前操做将会被更新为最新动做,以便可以在同一用户的下一个动做到达时查找它。
在前三个动做被处理以后,下一个事件(用户1001的注销操做)是被发送处处理用户1001的事件的任务。当用户获取动做时,它从广播状态和用户1001的先前动做中查找当前模式。模式匹配两个动做以后,任务提交模式匹配事件。 最后,任务经过使用最新操做覆盖上一个事件来更新其key state。
当一个新模式到达模式流时,它被广播到全部任务,而且每一个任务经过用新模式替换当前模式来更新其广播状态。
一旦广播状态被一种新的模式更新后,匹配逻辑可以如先前那样继续,换句话说,用户的操做事件将会按key进行分区,而且由负责的任务进行评估。
如何使用广播状态实现应用程序?
到目前为止,咱们在概念上讨论了该应用程序并解释了它如何使用广播状态来评估事件流上的动态模式。 接下来,咱们将展现如何使用Flink的DataStream API和广播状态功能实现示例应用程序。
让咱们从应用程序的输入数据开始。 咱们有两个数据流,操做和模式。 在这一点上,咱们并不关心流来自何处。 能够从Apache Kafka或Kinesis或任何其余系统获取流。 动做和模式是拥有两个字段的Pojos:
DataStream<Action> actions = ??? DataStream<Pattern> patterns = ???
动做和模式是拥有两个字段的Pojos
Action: Long userId, String action
Pattern: String firstAction, String secondAction
做为第一步,咱们将操做流上的userId属性。
KeyedStream<Action, Long> actionsByUser = actions .keyBy((KeySelector<Action, Long>) action -> action.userId);
接下来,我将开始尝试广播状态。广播状态通常以MapState为表明,这是Flink提供的最通用的状态原语。
MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
因为咱们的应用程序一次仅评估和存储单个Pattern,所以咱们将广播状态配置为具备键类型Void和值类型Pattern的MapState。 Pattern始终存储在MapState中,并将null做为键。
BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor);
使用MapStateDescriptor
做为广播状态,咱们对模式流应用broadcast()转换并接收广播流bcedPatterns
。
DataStream<Tuple2<Long, Pattern>> matches = actionsByUser .connect(bcedPatterns) .process(new PatternEvaluator());
在咱们得到keyed actionsByUser流和广播的bcedPatterns流以后,咱们链接两个流并在链接的流上应用PatternEvaluator。 PatternEvaluator是一个实现KeyedBroadcastProcessFunction接口的自定义函数。 它应用咱们以前讨论过的模式匹配逻辑,并发出包含用户ID和匹配模式的Tuple2 <Long,Pattern>记录。
public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> { // handle for keyed state (per user) ValueState<String> prevActionState; // broadcast state descriptor MapStateDescriptor<Void, Pattern> patternDesc; @Override public void open(Configuration conf) { // initialize keyed state prevActionState = getRuntimeContext().getState( new ValueStateDescriptor<>("lastAction", Types.STRING)); patternDesc = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)); } /** * Called for each user action. * Evaluates the current pattern against the previous and * current action of the user. */ @Override public void processElement( Action action, ReadOnlyContext ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception { // get current pattern from broadcast state Pattern pattern = ctx .getBroadcastState(this.patternDesc) // access MapState with null as VOID default value .get(null); // get previous action of current user from keyed state String prevAction = prevActionState.value(); if (pattern != null && prevAction != null) { // user had an action before, check if pattern matches if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(action.action)) { // MATCH out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern)); } } // update keyed state and remember action for next pattern evaluation prevActionState.update(action.action); } /** * Called for each new pattern. * Overwrites the current pattern with the new pattern. */ @Override public void processBroadcastElement( Pattern pattern, Context ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception { // store the new pattern by updating the broadcast state BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc); // storing in MapState with null as VOID default value bcState.put(null, pattern); } }
KeyedBroadcastProcessFunction接口提供了三种处理记录和发出结果的方法。
processBroadcastElement()
被广播流上的每一个记录调用。在咱们的 PatternEvaluator
函数中, 咱们简单的使用null
健将接收到的 Pattern
记录放入广播状态(记住,咱们只在MapState
中存储单个模式)。processElement()
被 keyed stream上的每条记录调用。 它提供对广播状态的只读访问,以防止经过函数的并行实例修改不一样广播状态中的结果。PatternEvaluator
的processElement()
方法从广播状态检索当前模式,从keyed状态检索用户的上一个动做。若是两个都存在,它将会检查以前的模式和如今操做是否和模式匹配,若是相匹配,将会发送匹配的记录。最后,它会更新当前用户操做的keyed state。onTimer()
将会在先前注册的计时器触发时被调用。定时器能够在processElement
方法中注册,并用于执行计算或未来清理状态。为了保持代码的简洁,在咱们的示例中没有实现该方法。可是,当用户在一段时间内未处于活动状态时,它可用于删除用户的最后一个操做,以免因为非活动用户而致使状态增加您可能已经注意到KeyedBroadcastProcessFunction的处理方法的上下文对象。
上下文对象提供对其余功能的访问,例如:
TimerService
,能够访问记录的时间戳,当前的水印,以及哪些能够注册定时器,processElement()
中可用)和, processBroadcastElement()
中可用)KeyedBroadcastProcessFunction能够像任何其余ProcessFunction同样彻底访问Flink状态和时间功能,所以可用于实现复杂的应用程序逻辑。广播状态被设计为一种适用于不一样场景和用例的通用功能。虽然咱们只讨论了一个至关简单且受限制的应用程序,但您能够经过多种方式使用广播状态来实现应用程序的要求。
结论
在这篇博文中,咱们向您介绍了一个示例应用程序,以解释Apache Flink的广播状态以及它如何用于评估事件流上的动态模式。 咱们还讨论了API并展现了咱们的示例应用程序的源代码。
咱们邀请您查看此功能的文档,并经过咱们的邮件列表提供反馈或建议以进一步改进。
原文连接:https://flink.apache.org/2019/06/26/broadcast-state.html