Fink| CEP

 

什么是复琐事件CEP?

一个或多个由简单事件构成的事件流经过必定的规则匹配,而后输出用户想获得的数据,知足规则的复琐事件。java

特征:

  • 目标:从有序的简单事件流中发现一些高阶特征apache

  • 输入:一个或多个由简单事件构成的事件流架构

  • 处理:识别简单事件之间的内在联系,多个符合必定规则的简单事件构成复琐事件ide

  • 输出:知足规则的复琐事件spa

           

CEP架构

            

 

CEP用于分析低延迟、频繁产生的不一样来源的事件流。CEP能够帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的得到通知并阻止一些行为。scala

CEP支持在流上进行模式匹配,根据模式的条件不一样,分为连续的条件或不连续的条件;模式的条件容许有时间的限制,当在条件范围内没有达到知足的条件时,会致使模式匹配超时。设计

CEP的工做流图:3d

             

看起来很简单,可是它有不少不一样的功能:code

  1. 输入的流数据,尽快产生结果对象

  2. 2个event流上,基于时间进行聚合类的计算

  3. 提供实时/准实时的警告和通知

  4. 多样的数据源中产生关联并分析模式

  5. 高吞吐、低延迟的处理

市场上有多种CEP的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的library支持。可是Flink提供了专门的CEP library

Flink CEP

Flink为CEP提供了专门的Flink CEP library,它包含以下组件:

  1. Event Stream

  2. pattern定义

  3. pattern检测

  4. 生成Alert

  

 

首先,开发人员要在DataStream流上定义出模式条件,以后Flink CEP引擎进行模式检测,必要时生成告警。

为了使用Flink CEP,咱们须要导入依赖:

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-cep_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>

Event Streams

咱们首先须要为Stream Event设计java pojo,可是注意,因为要对event对象进行对比,因此咱们须要重写hashCode()方法和equals()方法。下面进行监控温度事件流。

建立抽象类MonitoringEvent,重写hashCode()和equals()方法;再建立POJO:TemperatureEvent,一样重写hashCode()和equals()方法:

MonitoringEvent:

 TemperatureEvent:

 建立env,建立source:

Pattern API

每一个Pattern都应该包含几个步骤,或者叫作state。从一个state到另外一个state,一般咱们须要定义一些条件,例以下列的代码:

DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 42)
    // next表示"middle"事件紧跟着"start"事件发生
    .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() >= 10.0)
    // followedBy表示"end"事件不必定紧跟着"middle"事件发生
    .followedBy("end").where(evt -> evt.getName().equals("end"));

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.select(pattern -> {
    return createAlertFrom(pattern);
});

例子:

Flink-CEP:
登陆事件检测:同一用户连续两次登陆失败,报警; LoginEvent Stream
--> key by -->userid:LoginEvent Stream --> pattern matching -->select(定义报警输出) -->LoginWarning Stream -->print()
温度事件检测:同一机箱连续两次温度超标,报警; TemperatureEvent Stream
--> pattern matching(filter>27℃) --> select(定义报警输出) --> Alert Stream -->print()

 

异常检测:机箱温度检测

需求:同一个机箱连续两次温度超标,报警

拓展需求:锅炉房温度检测;信用卡反欺诈:连续大额消费;反做弊:同一个用户短期内连续登录失败 

  • flink cep

  • pattern定义

  • pattern匹配

  • select选出匹配到的事件,报警

public class CEPExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<TemperatureEvent> inputEventStream = env.fromElements( //不是DataStreamSource类型
                new TemperatureEvent("xyz", 22.0),
                new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1),
                new TemperatureEvent("xyz", 22.2), new TemperatureEvent("xyz", 22.1),
                new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),
                new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),
                new TemperatureEvent("xyz", 27.0));
        // 定义Pattern,检查10秒钟内温度是否高于26度
        Pattern<TemperatureEvent,?> warningPattern = Pattern.<TemperatureEvent>begin("start") //加泛型
                .subtype(TemperatureEvent.class)
                .where(new SimpleCondition<TemperatureEvent>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public boolean filter(TemperatureEvent value) throws Exception {
                        if (value.getTemperature() >= 26) {
                            return true;
                        }
                        return false;
                    }
                }).within(Time.seconds(10));

        //匹配pattern并select事件,符合条件的发生警告,即输出
        //Alert类属性message,表示在知足必定的pattern条件后,须要告警的内容:
        DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern) //DataStream类型的
                .select(new PatternSelectFunction<TemperatureEvent, Alert>() {
                    @Override
                    public Alert select(Map<String, List<TemperatureEvent>> pattern) throws Exception {
                        return new Alert("Temperature Rise Detected: " + pattern.get("start").get(0).getTemperature() +
                                " on machine name: " + pattern.get("start").get(0).getMachineName());
                    }
                });
        patternStream.print();
        env.execute();
    }

}

 

 登陆事件异常检测:同一个用户连续两次登录失败,报警

  • flink cep

  • pattern定义

  • pattern匹配

  • select输出报警事件

//需求: 若是同一个userid在三秒以内连续两次登录失败,报警。
public class FlinkLoginFail {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 这里mock了事件流,这个事件流通常从Kafka过来
        DataStream<LoginEvent> loginEventStream = env.fromCollection(Arrays.asList(  //自定义一个pojo类:userId、ip、type
                new LoginEvent("1", "192.168.0.1", "fail"),
                new LoginEvent("1", "192.168.0.2", "fail"),
                new LoginEvent("1", "192.168.0.3", "fail"),
                new LoginEvent("2", "192.168.10.10", "success")
        ));//不用DataStreamSource,使用DataStream

        Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("start")
                //泛型类或泛型接口上的泛型形参是不能用于静态成员的,那么当静态方法须要用到泛型时,只能用泛型方法。
                .where(new IterativeCondition<LoginEvent>() { // 模式开始事件的匹配条件为事件类型为fail, 为迭代条件
                    @Override
                    public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
                        return loginEvent.getType().equals("fail");
                    }
                }).next("next")
                .where(new IterativeCondition<LoginEvent>() { // 事件的匹配条件为事件类型为fail
                    @Override
                    public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
                        return loginEvent.getType().equals("fail");
                    }
                }).within(Time.seconds(3));// 要求紧邻的两个事件发生的时间间隔不能超过3秒钟

        // 以userid分组,造成keyedStream,而后进行模式匹配   ::方法引用
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);

        DataStream<LoginWarning> loginFailDataStream = patternStream.select((Map<String, List<LoginEvent>> pattern) -> {
            List<LoginEvent> first = pattern.get("start");
            List<LoginEvent> second = pattern.get("next");
            return new LoginWarning(first.get(0).getUserId(), first.get(0).getIp(), first.get(0).getType());
        });//不用SingleOutputStreamOperator类型的,使用

        loginFailDataStream.print();
        env.execute();

    }

}
相关文章
相关标签/搜索