从日志中读取数据作一些统计聚合,最终把想要的数据输出,基本是每一个公司都会有的场景,好比用户分析,商品分析,实时大屏等等,咱们最先的作法是所有读出来,写到redis或者es去,而后web端再去实时查询统计,其实有不少的弊端 要么每次请求都会去从新算一遍耗性能不说还慢的不行,业务方接受不了 或者直接把统计信息存进去,但要实时统计就比较麻烦,好比1小时存一个点,那业务方可能要的是当前10:05-11:05的数据。。。 用Flink能够很方便的实现,这里分别展现从csv和kafka里读大批量日志,flink来作统计每小时的热门商品,以5分钟作一个间隔,实现上每次延迟1s输出一次模拟实时
统计近一小时的热门商品,每5分钟更新一次 热门度用浏览次数(pv)来衡量
在全部用户行为中,过滤出来浏览(pv)行为进行统计 构建一个滑动窗口,窗口长度为1小时,每次滑动5分钟
DataStream -> 过滤出来是浏览行为的数据 -> 根据商品id进行分组生成KeydStream(key by 操做) -> 构建滑动窗口,窗口长度1小时,每次滑动5分钟 -> 进行聚合算总数 -> 生成一个最终输出的DataStream
一、新建项目
打开IDEA新建一个maven项目,UserBehaviorAnalysis,新建一个包 com.mafei.hotitems_analysis
二、父项目中准备依赖和打包相关 pom.xmljava
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mafei</groupId> <artifactId>UserBehaviorAnalysis</artifactId> <packaging>pom</packaging> <version>1.0</version> <modules> <module>HotItemsAnalysis</module> </modules> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.10.1</flink.version> <scala.binary.version>2.12</scala.binary.version> <kafka.version>2.2.0</kafka.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 用hutool只是为了造模拟数据用,实际生产中能够不用引入--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.5.6</version> </dependency> </dependencies> <build> <plugins> <!--该插件用于将scala代码编译成class文件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>4.4.0</version> <executions> <execution> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!--打包用 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <descriptiorRef>jar-with-dependencies</descriptiorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
最终的项目结构:web
实现步骤主要包含几步:redis
一、从文件中读取数据 二、构建flink运行环境 三、按照商品id进行聚合,取出来每一个商品一段时间的数量 四、排序取TopN,而后输出
实现代码: 在 com.mafei.hotitems_analysis下面新建一个scala的object : HotItems.scalasql
package com.mafei.hotitems_analysis import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.java.tuple.{Tuple, Tuple1} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import java.sql.Timestamp import scala.collection.mutable.ListBuffer //定义输入数据样例类(跟数据源的csv对应的上) case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long ) // 定义窗口聚合结果样例类 case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long) object HotItems { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //防止乱序 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //定义取事件时间 //从文件中读取数据 val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotItemsAnalysis/src/main/resources/UserBehavior.csv") val dataStream: DataStream[UserBehavior] = inputStream .map(data =>{ //data是读的csv每一行的数据 val arr = data.split(",") //按照,分割成list UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt,arr(3),arr(4).toLong) //把对应字段塞到样例类里面 }) .assignAscendingTimestamps(_.timestamp * 1000L) //定时时间戳为事件时间,*1000转为毫秒 //进行窗口聚合,获得聚合结果 val aggStream: DataStream[ItemViewCount] = dataStream .filter(_.behavior == "pv") //过滤出来pv行为的数据 .keyBy("itemId") .timeWindow(Time.hours(1),Time.minutes(5)) //设置一个滑动窗口,窗口大小是1小时,每次滑动5分钟 .aggregate(new CountAgg(), new ItemViewCountWindowResult()) /** * 须要针对每个商品作分组,定义次数的状态,由于要作topN,因此要作排序,定义延迟触发,多久排序并输出一次 */ val resultStream = aggStream .keyBy("windowEnd") //按照结束时间进行分组,收集当前窗口内的,取必定时间内的数据 .process(new TopNHostItem(10)) resultStream.print() env.execute("获取商品的访问次数") } } /** * 自定义一个预聚合函数AggregateFunction * 这里AggregateFunction 传入3个参数, * 第一个是输入的数据类型,这个在 dataStream 中已经定义的输出的类型了 * 第二个是中间的状态,由于要计算总数,因此每次数量+1,有个值记录下,因此是Long类型的 * 第三个是最终的输出数据, 最终的数量,因此也是Long类型 */ class CountAgg() extends AggregateFunction[UserBehavior, Long, Long]{ override def createAccumulator(): Long = 0L // 定义初始化的数据,确定是0咯 //这里每一个数据来了都会调用一次,因此直接在上一次结果上加一就能够,这个acc就是中间状态 override def add(in: UserBehavior, acc: Long): Long = acc +1 //最终输出的数据 override def getResult(acc: Long): Long = acc //这个在session窗口中才有用,是多个状态的时候作窗口合并的时候,这里只有1个状态,直接2个相加就能够 override def merge(acc: Long, acc1: Long): Long = acc+acc1 } /** * 自定义一个窗口函数windowFunction,用于生成最终咱们要的数据结构 * WindowFunction 须要4个参数,分别是 * @tparam IN The type of the input value. 输入类型,也就是数量 * @tparam OUT The type of the output value. 输出类型,这个是自定义的,这里定义一个输出类,com.mafei.hotitems_analysis.ItemViewCount * @tparam KEY The type of the key. key的类型,由于作聚合了么keyBy,由于keyBy输出的是JavaTuple类型(能够点到keyBy源码里面看下),因此须要定义类型的时候定义成Tuple */ class ItemViewCountWindowResult() extends WindowFunction[Long,ItemViewCount, Tuple,TimeWindow]{ override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = { val itemId = key.asInstanceOf[Tuple1[Long]].f0 //是一个一元组(只有itemId字段),因此直接这样子取 val windowEnd = window.getEnd val count = input.iterator.next() out.collect(ItemViewCount(itemId,windowEnd, count)) } } /** * * @param topN * * KeyedProcessFunction 传入3个参数 * K, I, O * K: 排序的key字段,这里用的是windowEnd 但由于keyBy输出是JavaTuple类型,因此传的是Tuple */ class TopNHostItem(topN: Int) extends KeyedProcessFunction[Tuple,ItemViewCount,String]{ //先定义一个ListState,保存全部商品的count,id,ts等信息,由于要最终从这个结果里面排序截取topN个,因此得提早所有存下来才行 var itemViewCountListState: ListState[ItemViewCount] = _ override def open(parameters: Configuration): Unit = { itemViewCountListState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemViewCountList", classOf[ItemViewCount])) } override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = { // 每来一条数据,直接追加到ListState itemViewCountListState.add(i) //注册一个定时器,windowEnd+1秒以后触发 context.timerService().registerEventTimeTimer(i.windowEnd + 1) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = { //为了方便排序,定义另外一个ListBuffer,保存ListState的全部数据 val allItemListBuffer: ListBuffer[ItemViewCount] = ListBuffer() val iter = itemViewCountListState.get().iterator() while (iter.hasNext){ allItemListBuffer += iter.next() } //清空ListState的数据,已经放到allItemListBuffer 准备计算了,等下次触发就应该是新的了 itemViewCountListState.clear() // 先按照count,从大到小排序,而后再取前N个 val sortItemViewCounts = allItemListBuffer.sortBy(_.count)(Ordering.Long.reverse).take(topN) //格式化输出数据: val result : StringBuilder = new StringBuilder result.append("当前窗口的结束时间:\t").append(new Timestamp(timestamp -1)).append("\n") //遍历结果列表中的每一个ItemViewCount , 输出到一行 for(i <- sortItemViewCounts.indices){ val currentItemViewCount = sortItemViewCounts(i) result.append("Top").append(i+1).append("\t") .append("商品id = ").append(currentItemViewCount.itemId).append("\t") .append("访问量: ").append(currentItemViewCount.count).append("\n") } result.append("---------------------------------------\n\n\n") Thread.sleep(1000) out.collect(result.toString()) } }
UserBehavior.csv 的内容:shell
1,40000,12306,pv,1609512713 5,30000,12306,pv,1609512714 4,50000,12306,pv,1609512715 2,20000,12306,pv,1609512716
运行效果和代码结构:apache
在开发的时候UserBehavior.csv 若是想看效果,最好仍是多准备点数据,不然一行一行的手敲有点麻烦,我写了个java脚本,GenerateData.java 运行完,就能给UserBehavior.csv模拟写入10万条数据作测试了bootstrap
/* @author mafei * @date 2021/1/1 */ package com.mafei.hotitems_analysis; import cn.hutool.core.io.file.FileWriter; import cn.hutool.core.util.RandomUtil; import java.time.LocalDateTime; import java.time.ZoneOffset; public class GenerateData { public static void main(String[] args) { String csvPath = "/opt/java2020_study/UserBehaviorAnalysis/HotItemsAnalysis/src/main/resources/UserBehavior.csv"; Integer[] userId= new Integer[]{1,2,3,4,5,9}; Integer[] itemId= new Integer[]{10000,20000,30000,40000,50000,60000}; int categoryId = 12306; StringBuffer content = new StringBuffer(); Long second = LocalDateTime.now().toEpochSecond(ZoneOffset.of("+8")); for (int i = 0; i < 100000; i++) { content.append(RandomUtil.randomEle(userId)).append(",") .append(RandomUtil.randomEle(itemId)).append(",") .append(categoryId).append(",pv,").append(second + i).append("\n"); } FileWriter writer = new FileWriter(csvPath); writer.write(content.toString()); } }
也是同样,只须要把从csv改为读kafka就能够windows
//一、从文件中读取数据 // val inputStream = env.readTextFile("/opt/java2020_study/UserBehaviorAnalysis/HotItemsAnalysis/src/main/resources/UserBehavior.csv") //二、从kafka中读取数据 val properties = new Properties() properties.setProperty("bootstrap.servers", "127.0.0.1:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("auto.offset.reset", "latest") val inputStream = env.addSource(new FlinkKafkaConsumer[String]("hotItems",new SimpleStringSchema(), properties)) //把kafka中的数据打印出来,看下具体的值 inputStream.print()
关于kafka安装部署能够参考以前的文章,找个centos虚拟机 http://www.javashuo.com/article/p-yisbokxi-hm.html
不少状况下不想改配置文件里监听端口,又想直接连服务器上的端口,推荐使用端口转发,把服务器端口转到127.0.0.1就能够,像xshell,terminus等等这些工具都有这个功能centos
累加规则---窗口内 碰到一条数据就加一(add方法) 实现AggregateFunction接口 Interface AggregateFunction<IN,ACC,OUT> 实现输出结构----itemViewCount(itemid,windowEnd,count) 实现WindowFunction接口
-针对有状态流的底层api 1.-keydProcessFunction会对分区后的每个子流进程处理 2.- 以windowEnd做为key,保证分流后每一条流都在一个时间窗口内 3.-从ListState中读取当前流的状态,存储数据进行输出
分区以后每隔KeyedStream都有本身的生命周期 1. -open,初始化,从这里能够获取当前流状态 2. -processElement,处理流中每一个元素时调用 3. -onTimer: 定时调用,注册定时器Timer并触发以后的回调操做