版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,若有任何技术交流,可随时联系。java
timestamp province city userid adid
复制代码
数据格式
(timestamp province city userid adid)
统计单用户的统计次数
(date, userid,adid,clickCount)
阈值统计统计黑名单
复制代码
输入数据格式
(userid, timestamp province city userid adid)
计算后数据格式并持久化
(date,province,city,adid,clickCount)
复制代码
输入数据格式
(yyyyMMdd_province_city_adid,clickCount)
计算后数据格式并持久化
(date,province, adid,clickCount)
注册成表ROW_NUMBER()实现窗聚合
tmp_daily_ad_click_count_by_prov
复制代码
输入数据格式
(timestamp province city userid adid)
计算后数据格式并持久化
(date,hour,minute,adid,clickCount)
复制代码
构建Spark上下文mysql
val sparkConf = new SparkConf().setAppName("streamingRecommendingSystem").setMaster("local[*]")
// 建立Spark客户端
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
// 获取Kafka配置
val broker_list = ConfigurationManager.config.getString("kafka.broker.list")
val topics = ConfigurationManager.config.getString("kafka.topics")
复制代码
kafka消费者配置算法
val kafkaParam = Map(
"bootstrap.servers" -> broker_list,//用于初始化连接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪一个消费团体
"group.id" -> "commerce-consumer-group",
//若是没有初始化偏移量或者当前的偏移量不存在任何服务器上,可使用这个配置属性
//可使用这个配置,latest自动重置偏移量为最新的偏移量
//earilist:提交过度区,从Offset处读取,若是没有提交过offset,从头读取
//latest:提交过度区,从Offset处读取,没有从最新的数据开始读取
//None:若是没有提交offset,就会报错,提交过offset,就从offset处读取
"auto.offset.reset" -> "latest",
//若是是true,则这个消费者的偏移量会在后台自动提交
"enable.auto.commit" -> (false: java.lang.Boolean)
)
复制代码
设置检查点目录sql
ssc.checkpoint("./streaming_checkpoint")
复制代码
LocationStrategies 分配分区策略数据库
// 建立DStream,返回接收到的输入数据
// LocationStrategies:根据给定的主题和集群地址建立consumer
// LocationStrategies.PreferConsistent:持续的在全部Executor之间匀分配分区 (均匀分配,选中的每个Executor都会分配 partition)
// LocationStrategies.PreferBrokers: 若是executor和kafka brokers 在同一台机器上,选择该executor。
// LocationStrategies.PreferFixed: 若是机器不是均匀的状况下,能够指定特殊的hosts。固然若是不指定,采用 LocationStrategies.PreferConsistent模式
复制代码
ConsumerStrategies 消费策略bootstrap
// ConsumerStrategies:选择如何在Driver和Executor上建立和配置Kafka Consumer
// ConsumerStrategies.Subscribe:订阅一系列主题
val adRealTimeLogDStream=KafkaUtils.createDirectStream[String,String](ssc,
LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topics),kafkaParam))
复制代码
SparkStreaming开始消费安全
var adRealTimeValueDStream = adRealTimeLogDStream.map(consumerRecordRDD => consumerRecordRDD.value())
复制代码
算法过程以下 (timestamp province city userid adid) -> (userid, timestamp province city userid adid)服务器
根据userId进行过滤app
用于Kafka Stream的线程非安全问题,从新分区切断血统
adRealTimeValueDStream = adRealTimeValueDStream.repartition(400)
val filteredAdRealTimeLogDStream = filterByBlacklist(spark,adRealTimeValueDStream)
def filterByBlacklist(spark: SparkSession, adRealTimeValueDStream:DStream[String]):DStream[(Long, String)] = {
// 刚刚接受到原始的用户点击行为日志以后
// 根据mysql中的动态黑名单,进行实时的黑名单过滤(黑名单用户的点击行为,直接过滤掉,不要了)
// 使用transform算子(将dstream中的每一个batch RDD进行处理,转换为任意的其余RDD,功能很强大)
val filteredAdRealTimeLogDStream = adRealTimeValueDStream.transform{ consumerRecordRDD =>
//格式 :timestamp province city userid adid
//某个时间点 某个省份 某个城市 某个用户 某个广告
// 首先,从mysql中查询全部黑名单用户,将其转换为一个rdd
val adBlacklists = AdBlacklistDAO.findAll()
// (userid, timestamp province city userid adid)
val blacklistRDD = spark.sparkContext.makeRDD(adBlacklists.map(item => (item.userid, true)))
//格式 :timestamp province city userid adid
val mappedRDD = consumerRecordRDD.map(consumerRecord => {
val userid = consumerRecord.split(" ")(3).toLong
(userid,consumerRecord)
})
// 将原始日志数据rdd,与黑名单rdd,进行左外链接
// 若是说原始日志的userid,没有在对应的黑名单中,join不到,左外链接
// 用inner join,内链接,会致使数据丢失
val joinedRDD = mappedRDD.leftOuterJoin(blacklistRDD)
val filteredRDD = joinedRDD.filter{ case (userid,(log, black)) =>
// 若是这个值存在,那么说明原始日志中的userid,join到了某个黑名单用户
if(black.isDefined && black.get) false else true
}
filteredRDD.map{ case (userid,(log, black)) => (userid, log)}
}
filteredAdRealTimeLogDStream
}
复制代码
转化为用户粒度进行过滤,抛弃 province city ,格式为:(date, userid,adid,clickCount)函数
过滤次数大于阈值的userId,持久化到磁盘。
generateDynamicBlacklist(filteredAdRealTimeLogDStream)
def generateDynamicBlacklist(filteredAdRealTimeLogDStream: DStream[(Long, String)]) {
// (timestamp province city userid adid)
// 计算出每5个秒内的数据中,天天每一个用户每一个广告的点击量
// 经过对原始实时日志的处理
// 将日志的格式处理成<yyyyMMdd_userid_adid, 1L>格式
val dailyUserAdClickDStream = filteredAdRealTimeLogDStream.map{ case (userid,log) =>
// 从tuple中获取到每一条原始的实时日志
val logSplited = log.split(" ")
// 提取出日期(yyyyMMdd)、userid、adid
val timestamp = logSplited(0)
val date = new Date(timestamp.toLong)
val datekey = DateUtils.formatDateKey(date)
val userid = logSplited(3).toLong
val adid = logSplited(4)
// 拼接key
val key = datekey + "_" + userid + "_" + adid
(key, 1L)
}
// 针对处理后的日志格式,执行reduceByKey算子便可,(每一个batch中)天天每一个用户对每一个广告的点击量
val dailyUserAdClickCountDStream = dailyUserAdClickDStream.reduceByKey(_ + _)
// 源源不断的,每一个5s的batch中,当天每一个用户对每支广告的点击次数
// <yyyyMMdd_userid_adid, clickCount>
dailyUserAdClickCountDStream.foreachRDD{ rdd =>
rdd.foreachPartition{ items =>
// 对每一个分区的数据就去获取一次链接对象
// 每次都是从链接池中获取,而不是每次都建立
// 写数据库操做,性能已经提到最高了
val adUserClickCounts = ArrayBuffer[AdUserClickCount]()
for(item <- items){
val keySplited = item._1.split("_")
val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))
// yyyy-MM-dd
val userid = keySplited(1).toLong
val adid = keySplited(2).toLong
val clickCount = item._2
//批量插入
adUserClickCounts += AdUserClickCount(date, userid,adid,clickCount)
}
AdUserClickCountDAO.updateBatch(adUserClickCounts.toArray)
}
}
// 如今咱们在mysql里面,已经有了累计的天天各用户对各广告的点击量
// 遍历每一个batch中的全部记录,对每条记录都要去查询一下,这一天这个用户对这个广告的累计点击量是多少
// 从mysql中查询
// 查询出来的结果,若是是100,若是你发现某个用户某天对某个广告的点击量已经大于等于100了
// 那么就断定这个用户就是黑名单用户,就写入mysql的表中,持久化
val blacklistDStream = dailyUserAdClickCountDStream.filter{ case (key, count) =>
val keySplited = key.split("_")
// yyyyMMdd -> yyyy-MM-dd
val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))
val userid = keySplited(1).toLong
val adid = keySplited(2).toLong
// 从mysql中查询指定日期指定用户对指定广告的点击量
val clickCount = AdUserClickCountDAO.findClickCountByMultiKey(date, userid, adid)
// 判断,若是点击量大于等于100,ok,那么很差意思,你就是黑名单用户
// 那么就拉入黑名单,返回true
if(clickCount >= 100) {
true
}else{
// 反之,若是点击量小于100的,那么就暂时不要管它了
false
}
}
复制代码
转化为省城市粒度进行过滤,抛弃userId,格式为:(yyyyMMdd_province_city_adid,clickCount)
val adRealTimeStatDStream = calculateRealTimeStat(filteredAdRealTimeLogDStream)
def calculateRealTimeStat(filteredAdRealTimeLogDStream:DStream[(Long, String)]):DStream[(String, Long)] = {
// 计算天天各省各城市各广告的点击量
// 设计出来几个维度:日期、省份、城市、广告
// 2015-12-01,当天,能够看到当天全部的实时数据(动态改变),好比江苏省南京市
// 广告能够进行选择(广告主、广告名称、广告类型来筛选一个出来)
// 拿着date、province、city、adid,去mysql中查询最新的数据
// 等等,基于这几个维度,以及这份动态改变的数据,是能够实现比较灵活的广告点击流量查看的功能的
// date province city userid adid
// date_province_city_adid,做为key;1做为value
// 经过spark,直接统计出来全局的点击次数,在spark集群中保留一份;在mysql中,也保留一份
// 咱们要对原始数据进行map,映射成<date_province_city_adid,1>格式
// 而后呢,对上述格式的数据,执行updateStateByKey算子
// spark streaming特有的一种算子,在spark集群内存中,维护一份key的全局状态
//(userid, timestamp province city userid adid)
val mappedDStream = filteredAdRealTimeLogDStream.map{ case (userid, log) =>
val logSplited = log.split(" ")
val timestamp = logSplited(0)
val date = new Date(timestamp.toLong)
val datekey = DateUtils.formatDateKey(date)
val province = logSplited(1)
val city = logSplited(2)
val adid = logSplited(4).toLong
val key = datekey + "_" + province + "_" + city + "_" + adid
(key, 1L)
}
// 在这个dstream中,就至关于,有每一个batch rdd累加的各个key(各天各省份各城市各广告的点击次数)
// 每次计算出最新的值,就在aggregatedDStream中的每一个batch rdd中反应出来
val aggregatedDStream = mappedDStream.updateStateByKey[Long]{ (values:Seq[Long], old:Option[Long]) =>
// 举例来讲
// 对于每一个key,都会调用一次这个方法
// 好比key是<20151201_Jiangsu_Nanjing_10001,1>,就会来调用一次这个方法7
// 10个
// values,(1,1,1,1,1,1,1,1,1,1)
// 首先根据optional判断,以前这个key,是否有对应的状态
var clickCount = 0L
// 若是说,以前是存在这个状态的,那么就以以前的状态做为起点,进行值的累加
if(old.isDefined) {
clickCount = old.get
}
// values,表明了,batch rdd中,每一个key对应的全部的值
for(value <- values) {
clickCount += value
}
Some(clickCount)
}
// 将计算出来的最新结果,同步一份到mysql中,以便于j2ee系统使用
aggregatedDStream.foreachRDD{ rdd =>
rdd.foreachPartition{ items =>
//批量保存到数据库
val adStats = ArrayBuffer[AdStat]()
for(item <- items){
val keySplited = item._1.split("_")
val date = keySplited(0)
val province = keySplited(1)
val city = keySplited(2)
val adid = keySplited(3).toLong
val clickCount = item._2
adStats += AdStat(date,province,city,adid,clickCount)
}
AdStatDAO.updateBatch(adStats.toArray)
}
}
aggregatedDStream
}
复制代码
转化为省粒度进行过滤,抛弃userId ,cityid,格式为:(yyyyMMdd_province_adid,clickCount)
注册成表,基于ROW_NUMBER()实现窗聚合,按照province分区,实现top3排序,
tmp_daily_ad_click_count_by_prov
calculateProvinceTop3Ad(spark,adRealTimeStatDStream)
def calculateProvinceTop3Ad(spark:SparkSession, adRealTimeStatDStream:DStream[(String, Long)]) {
// 每个batch rdd,都表明了最新的全量的天天各省份各城市各广告的点击量
//(yyyyMMdd_province_city_adid,clickCount)
val rowsDStream = adRealTimeStatDStream.transform{ rdd =>
// <yyyyMMdd_province_city_adid, clickCount>
// <yyyyMMdd_province_adid, clickCount>
// 计算出天天各省份各广告的点击量
val mappedRDD = rdd.map{ case (keyString, count) =>
val keySplited = keyString.split("_")
val date = keySplited(0)
val province = keySplited(1)
val adid = keySplited(3).toLong
val clickCount = count
val key = date + "_" + province + "_" + adid
(key, clickCount)
}
val dailyAdClickCountByProvinceRDD = mappedRDD.reduceByKey( _ + _ )
// 将dailyAdClickCountByProvinceRDD转换为DataFrame
// 注册为一张临时表
// 使用Spark SQL,经过开窗函数,获取到各省份的top3热门广告
val rowsRDD = dailyAdClickCountByProvinceRDD.map{ case (keyString, count) =>
val keySplited = keyString.split("_")
val datekey = keySplited(0)
val province = keySplited(1)
val adid = keySplited(2).toLong
val clickCount = count
val date = DateUtils.formatDate(DateUtils.parseDateKey(datekey))
(date, province, adid, clickCount)
}
import spark.implicits._
val dailyAdClickCountByProvinceDF = rowsRDD.toDF("date","province","ad_id","click_count")
// 将dailyAdClickCountByProvinceDF,注册成一张临时表
dailyAdClickCountByProvinceDF.createOrReplaceTempView("tmp_daily_ad_click_count_by_prov")
// 使用Spark SQL执行SQL语句,配合开窗函数,统计出各身份top3热门的广告
val provinceTop3AdDF = spark.sql(
"SELECT "
+ "date,"
+ "province,"
+ "ad_id,"
+ "click_count "
+ "FROM ( "
+ "SELECT "
+ "date,"
+ "province,"
+ "ad_id,"
+ "click_count,"
+ "ROW_NUMBER() OVER(PARTITION BY province ORDER BY click_count DESC) rank "
+ "FROM tmp_daily_ad_click_count_by_prov "
+ ") t "
+ "WHERE rank<=3"
)
provinceTop3AdDF.rdd
}
// 每次都是刷新出来各个省份最热门的top3广告,将其中的数据批量更新到MySQL中
rowsDStream.foreachRDD{ rdd =>
rdd.foreachPartition{ items =>
// 插入数据库
val adProvinceTop3s = ArrayBuffer[AdProvinceTop3]()
for (item <- items){
val date = item.getString(0)
val province = item.getString(1)
val adid = item.getLong(2)
val clickCount = item.getLong(3)
adProvinceTop3s += AdProvinceTop3(date,province,adid,clickCount)
}
AdProvinceTop3DAO.updateBatch(adProvinceTop3s.toArray)
}
}
复制代码
}
版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,若有任何技术交流,可随时联系。
转化为时间粒度进行过滤,抛弃province,userId ,cityid,格式为: <yyyyMMddHHMM_adid,1L>,基于reduceByKeyAndWindow进行聚合
最终结果展开 (date,hour,minute,adid,clickCount)
calculateAdClickCountByWindow(adRealTimeValueDStream)
def calculateAdClickCountByWindow(adRealTimeValueDStream:DStream[String]) {
// 映射成<yyyyMMddHHMM_adid,1L>格式
//(timestamp province city userid adid)
val pairDStream = adRealTimeValueDStream.map{ case consumerRecord =>
val logSplited = consumerRecord.split(" ")
val timeMinute = DateUtils.formatTimeMinute(new Date(logSplited(0).toLong))
val adid = logSplited(4).toLong
(timeMinute + "_" + adid, 1L)
}
// 计算窗口函数,1小时滑动窗口内的广告点击趋势
val aggrRDD = pairDStream.reduceByKeyAndWindow((a:Long,b:Long) => (a + b),Minutes(60L), Seconds(10L))
// 最近1小时内,各分钟的点击量,并保存到数据库
aggrRDD.foreachRDD{ rdd =>
rdd.foreachPartition{ items =>
//保存到数据库
val adClickTrends = ArrayBuffer[AdClickTrend]()
for (item <- items){
val keySplited = item._1.split("_")
// yyyyMMddHHmm
val dateMinute = keySplited(0)
val adid = keySplited(1).toLong
val clickCount = item._2
val date = DateUtils.formatDate(DateUtils.parseDateKey(dateMinute.substring(0, 8)))
val hour = dateMinute.substring(8, 10)
val minute = dateMinute.substring(10)
adClickTrends += AdClickTrend(date,hour,minute,adid,clickCount)
}
AdClickTrendDAO.updateBatch(adClickTrends.toArray)
}
}
}
复制代码
温故而知新,本文为了综合复习,进行代码总结,内容粗鄙,勿怪
版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,若有任何技术交流,可随时联系。
秦凯新 于深圳
来源:掘金
著做权归做者全部。商业转载请联系做者得到受权,非商业转载请注明出处。