网站通常都须要根据广告点击量来制定对应的订价策略和调整市场推广的方式,通常也会收集用户的一些偏好和其余信息,这里实现一个统计不一样省份/或者市用户对不一样广告的点击状况,有助于市场部对于广告的更精准投放,而且要防止有人恶意点击,不停的点同一个广告(固然同一个ip一直点不一样的广告也是同样)java
准备的日志文件ClickLog.csv:sql
543462,1715,beijing,beijing,1512652431 543461,1713,shanghai,shanghai,1512652433 543464,1715,shanxi,xian,1512652435 543464,1715,shanxi,weinan,1512652441 543464,1715,shanxi,weinan,1512652442 543464,1715,shanxi,weinan,1512652443 543464,1715,shanxi,weinan,1512652444 543464,1715,shanxi,weinan,1512652445 543464,1715,shanxi,weinan,1512652446 543464,1715,shanxi,weinan,1512652447 543464,1715,shanxi,weinan,1512652451 543464,1715,shanxi,weinan,1512652452 543464,1715,shanxi,weinan,1512652453 543464,1715,shanxi,weinan,1512652454 543464,1715,shanxi,weinan,1512652455 543464,1715,shanxi,weinan,1512652456 543464,1715,shanxi,weinan,1512652457 543464,1715,shanxi,hanzhong,1512652461 543464,1715,shanxi,yanan,1512652561
代码:apache
/* * * @author mafei * @date 2021/1/10 */ package com.mafei.market_analysis import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import java.sql.Timestamp /** * 定义输入的样例类 * 543464,1715,shanxi,weinan,1512652459 */ case class AdClickLog(userId: Long,adId: Long,province: String, city: String,timestamp:Long) /** * 定义输出的样例类 * 统计每一个省对每一个广告的点击量 */ case class AdClickCountByProvince(windowEnd: String,province: String, count: Long) object AdClickAnalysis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //指定事件时间为窗口和watermark的时间 env.setParallelism(1) //从文件中读取数据 val resource = getClass.getResource("/ClickLog.csv") val inputStream = env.readTextFile(resource.getPath) // 转换成样例类,并提取时间戳watermark val adLogStream = inputStream .map(d=>{ val arr = d.split(",") AdClickLog(arr(0).toLong,arr(1).toLong,arr(2),arr(3),arr(4).toLong) }) .assignAscendingTimestamps(_.timestamp * 1000L) // 定义窗口,聚合统计 val adCountResultStream = adLogStream .keyBy(_.province) .timeWindow(Time.days(1),Time.seconds(50)) .aggregate(new AdCountAgg(), new AdCountWindowResult()) adCountResultStream.print() env.execute("统计广告点击状况") } } class AdCountAgg() extends AggregateFunction[AdClickLog, Long,Long]{ override def createAccumulator(): Long = 0L override def add(in: AdClickLog, acc: Long): Long = acc+1 override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc + acc1 } class AdCountWindowResult() extends WindowFunction[Long,AdClickCountByProvince,String,TimeWindow]{ override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountByProvince]): Unit = { out.collect(AdClickCountByProvince(windowEnd = new Timestamp(window.getEnd).toString, province = key, count = input.head)) } }
上面代码中,同一个用户的重复点击是会叠加计算的,在实际生产场景中,同一个用户可能会重复点开某一个广告,可是若是用户在一段时间内很是频繁的点击广告,这明显不是个正常行为了,在刷点击量,因此能够作个限制,好比同一个广告,同一我的天天最多点100次,超过了就把这个用户加到黑名单里头并告警,后边的点击行为就再也不统计了
那来个改进的版本:windows
/* * * @author mafei * @date 2021/1/10 */ package com.mafei.market_analysis import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import java.sql.Timestamp /** * 定义输入的样例类 * 543464,1715,shanxi,weinan,1512652459 */ case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp: Long) /** * 定义输出的样例类 * 统计每一个省对每一个广告的点击量 */ case class AdClickCountByProvince(windowEnd: String, province: String, count: Long) /** * 黑名单预警输出的样例类 */ case class UserBlackListWarning(userId: String, adId: String, msg: String) object AdClickAnalysis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //指定事件时间为窗口和watermark的时间 env.setParallelism(1) //从文件中读取数据 val resource = getClass.getResource("/ClickLog.csv") val inputStream = env.readTextFile(resource.getPath) // 转换成样例类,并提取时间戳watermark val adLogStream = inputStream .map(d => { val arr = d.split(",") AdClickLog(arr(0).toLong, arr(1).toLong, arr(2), arr(3), arr(4).toLong) }) .assignAscendingTimestamps(_.timestamp * 1000L) // 插入一步操做,把有刷单行为的用户信息输出到黑名单(侧输出流中)并作过滤 val userBlackListFilterStream: DataStream[AdClickLog] = adLogStream .keyBy(data => { (data.userId, data.adId) }) .process(new FilterUserBlackListResult(10L)) // 定义窗口,聚合统计 val adCountResultStream = userBlackListFilterStream .keyBy(_.province) .timeWindow(Time.days(1), Time.seconds(50)) .aggregate(new AdCountAgg(), new AdCountWindowResult()) adCountResultStream.print() //打印测输出流 userBlackListFilterStream.getSideOutput(new OutputTag[UserBlackListWarning]("warning")).print("测输出流") env.execute("统计广告点击状况") } } class AdCountAgg() extends AggregateFunction[AdClickLog, Long, Long] { override def createAccumulator(): Long = 0L override def add(in: AdClickLog, acc: Long): Long = acc + 1 override def getResult(acc: Long): Long = acc override def merge(acc: Long, acc1: Long): Long = acc + acc1 } class AdCountWindowResult() extends WindowFunction[Long, AdClickCountByProvince, String, TimeWindow] { override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdClickCountByProvince]): Unit = { out.collect(AdClickCountByProvince(windowEnd = new Timestamp(window.getEnd).toString, province = key, count = input.head)) } } /** * key是上面定义的二元组 * 输入和输出不变,只是作过滤 */ class FilterUserBlackListResult(macCount: Long) extends KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog] { /** * 定义状态,保存每个用户对每一个广告的点击量 */ lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count", classOf[Long])) /** * 定义天天0点定时清空状态的时间戳 */ lazy val resetTimeTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("resetTs", classOf[Long])) /** * 定义用户有没有进入黑名单 */ lazy val isBlackList: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isBlackList", classOf[Boolean])) override def processElement(i: AdClickLog, context: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#Context, collector: Collector[AdClickLog]): Unit = { val curCount = countState.value() //初始状态 if(curCount == 0){ /** * 获取明天0点的时间戳,用来注册定时器,明天0点把状态所有置空 * * * 获取明天的天数: context.timerService().currentProcessingTime()/(1000*60*60*24)+1 * * (24*60*60*1000) 是转换成明天0点的时间戳 * - 8*60*60*1000 是从伦敦时间转为东8区 * */ val ts = (context.timerService().currentProcessingTime()/(1000*60*60*24)+1) * (24*60*60*1000) - 8*60*60*1000 context.timerService().registerProcessingTimeTimer(ts) resetTimeTsState.update(ts) //定义重置的时间点 } //判断次数是否是超过了定义的阈值,若是超过了那就输出到侧输出流 if(curCount > macCount){ // println("超出阈值了,curCount:"+curCount + " isBlackList:"+isBlackList.value()) //判断下,是否是在黑名单里头,没有的话才输出到侧输出流,不然就会重复输出 if(!isBlackList.value()){ isBlackList.update(true) context.output(new OutputTag[UserBlackListWarning]("warning"),UserBlackListWarning(i.userId.toString,i.adId.toString,curCount+"超过了出现的次数"+macCount)) } return } //正常状况,每次都计数加1,而后把数据原样输出,毕竟这里只是为了裹一层 countState.update(curCount +1) collector.collect(i) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = { if(timestamp == resetTimeTsState.value()){ isBlackList.clear() countState.clear() } } }