FlinkCEP是在Flink上层实现的复琐事件处理库。 它可让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。 官网文档: https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/libs/cep.html 这里给个demo,对比下不用cep和用cep的区别, 实现目标: 从目标csv中读取模拟登陆的数据,实时检测,若是5秒钟以内连续登陆的次数超过2次,则立刻告警
实现步骤: 一、准备环境和数据源加载到内存 二、进行数据切割,转成须要的格式(样例类) 三、指定时间窗口watermark及事件时间取哪一个字段 四、按每一个用户id进行分组,统计每一个用户id的登陆行为(毕竟不能放一块儿统计吧) 五、实现具体的处理逻辑ProcessFunction 六、输出检测数据
准备的模拟数据 userLogin.csv:html
1234,10.0.1.1,fail,1611373940 1235,10.0.1.2,fail,1611373941 1234,10.0.1.3,fail,1611373942 1234,10.0.1.3,success,1611373943 1234,10.0.1.3,fail,1611373943 1234,10.0.1.3,fail,1611373944 1236,10.0.1.4,fail,1611373945 1234,10.0.1.4,fail,1611373957 1234,10.0.1.5,fail,1611373958 1234,10.0.11.55,fail,1611373959 1236,2.2.2.2,fail,1611373960
/* * * @author mafei * @date 2021/1/24 */ package com.mafei import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer /** * 定义一个输入数据的样例类 * * @param userId 用户id * @param ip 客户端的ip * @param loginState 登陆状态,目前只有success/fail,后期能够作扩展,因此定义为string * @param ts 事件的时间戳,单位秒 */ case class userLogin(userId: Long,ip: String,loginState: String,ts: Long) /** * 定义一个输出的样例类 * @param userId 用户id * @param startTs 开始登陆时间 * @param endTs 触发事件的最后一次时间 * @param loginCount 时间段内总共登陆的次数 */ case class userLoginWarning(userId: Long, startTs: Long, endTs:Long, loginCount: Long) object maliceLoginDetect { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //指定事件时间为窗口和watermark的时间 env.setParallelism(1) //从文件中读取数据 val resource = getClass.getResource("/userLogin.csv") val inputStream = env.readTextFile(resource.getPath) // 转换成样例类,并提取时间戳watermark val loginEventStream = inputStream .map(d => { val arr = d.split(",") // 分别对应 userId ip 登陆状态 时间戳 userLogin(arr(0).toLong, arr(1), arr(2), arr(3).toLong) }) .assignAscendingTimestamps(_.ts * 1000L) //把秒转为毫秒 val loginWarningStream = loginEventStream .keyBy(_.userId) .process(new loginMaliceDetect(2)) loginWarningStream.print() env.execute() } } class loginMaliceDetect(warningCount: Long) extends KeyedProcessFunction[Long,userLogin,userLoginWarning]{ //定义状态,保存当前全部的登陆事件为list,方便后边作数据统计 lazy val loginFailListState: ListState[userLogin] = getRuntimeContext.getListState(new ListStateDescriptor[userLogin]("loginFail-list", classOf[userLogin])) //定义定时器的时间戳状态,不然无法删定时器 lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerState", classOf[Long])) override def processElement(i: userLogin, context: KeyedProcessFunction[Long, userLogin, userLoginWarning]#Context, collector: Collector[userLoginWarning]): Unit = { //判断,若是当前事件是登陆失败事件,那再继续操做 if(i.loginState == "fail"){ loginFailListState.add(i) //若是没有注册定时器,那就注册一个定时器,5秒以后触发 if(timerTsState.value()== 0){ val timerTs = i.ts * 1000L + 5000L context.timerService().registerEventTimeTimer(timerTs) timerTsState.update(timerTs) } } else if(i.loginState == "success"){ context.timerService().deleteEventTimeTimer(timerTsState.value()) timerTsState.clear() loginFailListState.clear() } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, userLogin, userLoginWarning]#OnTimerContext, out: Collector[userLoginWarning]): Unit = { // 判断下若是登陆失败次数超过了设置的阈值,则告警 val loginFailList: ListBuffer[userLogin] = new ListBuffer[userLogin] val iterable = loginFailListState.get().iterator() while (iterable.hasNext){ loginFailList += iterable.next() } if (loginFailList.size > warningCount){ out.collect(userLoginWarning(userId = ctx.getCurrentKey, startTs = loginFailList.head.ts, endTs = loginFailList.last.ts, loginCount = loginFailList.size)) } loginFailList.clear() loginFailListState.clear() timerTsState.clear() } }
上面代码栗子是能够实现基本的登陆异常检测了,可是若是碰到数据乱序等状况, 有3个失败事件在时间范围内,可是有个乱序的数据插在中间,这时候按照逻辑中间就会状况从新计算。。这时候就须要用到flink提供的cep(复琐事件检测)的功能了
在pom.xml中增长cep的依赖java
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.10.1</flink.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
/* * * @author mafei * @date 2021/1/24 */ package com.mafei import org.apache.flink.cep.PatternSelectFunction import org.apache.flink.cep.scala.CEP import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import java.util object maliceLoginDetectWithCep { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //指定事件时间为窗口和watermark的时间 env.setParallelism(1) //从文件中读取数据 val resource = getClass.getResource("/userLogin.csv") val inputStream = env.readTextFile(resource.getPath) // 转换成样例类,并提取时间戳watermark val loginEventStream = inputStream .map(d => { val arr = d.split(",") // 分别对应 userId ip 登陆状态 时间戳 userLogin(arr(0).toLong, arr(1), arr(2), arr(3).toLong) }) .assignAscendingTimestamps(_.ts * 1000L) //把秒转为毫秒 // 一、先定义匹配的模式,需求为一个登陆失败事件后,紧接着出现另外一个失败事件 val loginFailPattern = Pattern .begin[userLogin]("firstFail") .where(_.loginState == "fail") .next("secondFail") .where(_.loginState == "fail") .within(Time.seconds(5)) //二、将匹配的规则应用在数据流中,获得一个PatternStream val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern) // 三、匹配中符合模式要求的数据流,须要调用select val loginFailWarningStream = patternStream.select(new LoginFailEventMatch()) loginFailWarningStream.print() env.execute("login fail detect with cep") } } class LoginFailEventMatch() extends PatternSelectFunction[userLogin,userLoginWarning]{ override def select(map: util.Map[String, util.List[userLogin]]): userLoginWarning = { //前边定义的全部pattern,都在Map里头,由于map的value里面只定义了一个事件,因此只会有一条,取第一个就能够,若是定义了多个,须要按实际状况来 val firstFailEvent = map.get("firstFail").get(0) val secondFailEvent = map.get("secondFail").iterator().next() userLoginWarning(firstFailEvent.userId,firstFailEvent.ts,secondFailEvent.ts,2) } }