Spark综合使用及用户行为案例广告点击量实时统计分析实战-Spark商业应用实战

版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,若有任何技术交流,可随时联系。java

1 广告点击数据模型

1.1 数据格式

timestamp 	  province 	  city        userid         adid
复制代码

1.2 生成动态黑名单

数据格式
    (timestamp province city userid adid)
    
    统计单用户的统计次数
    (date, userid,adid,clickCount)
    
    阈值统计统计黑名单
复制代码

1.3 计算广告点击流量实时统计结果

输入数据格式
    (userid, timestamp province city userid adid)
    
    计算后数据格式并持久化
    (date,province,city,adid,clickCount)
复制代码

1.4 实时统计天天每一个省份top3热门广告

输入数据格式
   (yyyyMMdd_province_city_adid,clickCount)
    
    计算后数据格式并持久化
    (date,province, adid,clickCount)
    
    注册成表ROW_NUMBER()实现窗聚合
    tmp_daily_ad_click_count_by_prov
复制代码

1.5 实时统计天天每一个广告在最近1小时的滑动窗口内的点击趋势(每分钟的点击量)

输入数据格式
    (timestamp province city userid adid)
    
    计算后数据格式并持久化
    (date,hour,minute,adid,clickCount)
复制代码

2 具体技术实现

2.1 SparkStreaming 与Kafka对接

  • 构建Spark上下文mysql

    val sparkConf = new SparkConf().setAppName("streamingRecommendingSystem").setMaster("local[*]")
    
      // 建立Spark客户端
      val spark = SparkSession.builder().config(sparkConf).getOrCreate()
      val sc = spark.sparkContext
      val ssc = new StreamingContext(sc, Seconds(5))
      
      // 获取Kafka配置
      val broker_list = ConfigurationManager.config.getString("kafka.broker.list")
      val topics = ConfigurationManager.config.getString("kafka.topics")
    复制代码
  • kafka消费者配置算法

    val kafkaParam = Map(
       "bootstrap.servers" -> broker_list,//用于初始化连接到集群的地址
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       //用于标识这个消费者属于哪一个消费团体
       "group.id" -> "commerce-consumer-group",
       //若是没有初始化偏移量或者当前的偏移量不存在任何服务器上,可使用这个配置属性
       //可使用这个配置,latest自动重置偏移量为最新的偏移量
       //earilist:提交过度区,从Offset处读取,若是没有提交过offset,从头读取
       //latest:提交过度区,从Offset处读取,没有从最新的数据开始读取
       //None:若是没有提交offset,就会报错,提交过offset,就从offset处读取
       "auto.offset.reset" -> "latest",
       //若是是true,则这个消费者的偏移量会在后台自动提交
       "enable.auto.commit" -> (false: java.lang.Boolean)
     )
    复制代码
  • 设置检查点目录sql

    ssc.checkpoint("./streaming_checkpoint")
    复制代码
  • LocationStrategies 分配分区策略数据库

    // 建立DStream,返回接收到的输入数据
      // LocationStrategies:根据给定的主题和集群地址建立consumer
      // LocationStrategies.PreferConsistent:持续的在全部Executor之间匀分配分区 (均匀分配,选中的每个Executor都会分配 partition)
      // LocationStrategies.PreferBrokers: 若是executor和kafka brokers 在同一台机器上,选择该executor。
      // LocationStrategies.PreferFixed: 若是机器不是均匀的状况下,能够指定特殊的hosts。固然若是不指定,采用 LocationStrategies.PreferConsistent模式
    复制代码
  • ConsumerStrategies 消费策略bootstrap

    // ConsumerStrategies:选择如何在Driver和Executor上建立和配置Kafka Consumer
      // ConsumerStrategies.Subscribe:订阅一系列主题
      val adRealTimeLogDStream=KafkaUtils.createDirectStream[String,String](ssc,
              LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topics),kafkaParam))
    复制代码
  • SparkStreaming开始消费安全

    var adRealTimeValueDStream = adRealTimeLogDStream.map(consumerRecordRDD => consumerRecordRDD.value())
    复制代码

2.2 SparkStreaming 开始根据黑名单进行过滤

  • 算法过程以下 (timestamp province city userid adid) -> (userid, timestamp province city userid adid)服务器

  • 根据userId进行过滤app

    用于Kafka Stream的线程非安全问题,从新分区切断血统
      adRealTimeValueDStream = adRealTimeValueDStream.repartition(400)
     
      val filteredAdRealTimeLogDStream = filterByBlacklist(spark,adRealTimeValueDStream)
      
      def filterByBlacklist(spark: SparkSession, adRealTimeValueDStream:DStream[String]):DStream[(Long, String)] = {
         
          // 刚刚接受到原始的用户点击行为日志以后
          // 根据mysql中的动态黑名单,进行实时的黑名单过滤(黑名单用户的点击行为,直接过滤掉,不要了)
          // 使用transform算子(将dstream中的每一个batch RDD进行处理,转换为任意的其余RDD,功能很强大)
      
          val filteredAdRealTimeLogDStream = adRealTimeValueDStream.transform{ consumerRecordRDD =>
      
            //格式 :timestamp province city userid adid
            //某个时间点 某个省份 某个城市 某个用户 某个广告
      
            // 首先,从mysql中查询全部黑名单用户,将其转换为一个rdd
            val adBlacklists = AdBlacklistDAO.findAll()
            // (userid, timestamp province city userid adid)
            val blacklistRDD = spark.sparkContext.makeRDD(adBlacklists.map(item => (item.userid, true)))
      
      
            //格式 :timestamp province city userid adid
            val mappedRDD = consumerRecordRDD.map(consumerRecord => {
              val userid = consumerRecord.split(" ")(3).toLong
              (userid,consumerRecord)
            })
      
            // 将原始日志数据rdd,与黑名单rdd,进行左外链接
            // 若是说原始日志的userid,没有在对应的黑名单中,join不到,左外链接
            // 用inner join,内链接,会致使数据丢失
            val joinedRDD = mappedRDD.leftOuterJoin(blacklistRDD)
      
            val filteredRDD = joinedRDD.filter{ case (userid,(log, black)) =>
              // 若是这个值存在,那么说明原始日志中的userid,join到了某个黑名单用户
              if(black.isDefined && black.get) false else true
            }
      
            filteredRDD.map{ case (userid,(log, black)) => (userid, log)}
          }
      
          filteredAdRealTimeLogDStream
    }
    复制代码

2.3 SparkStreaming 生成动态黑名单

  • 转化为用户粒度进行过滤,抛弃 province city ,格式为:(date, userid,adid,clickCount)函数

  • 过滤次数大于阈值的userId,持久化到磁盘。

    generateDynamicBlacklist(filteredAdRealTimeLogDStream)
      
      def generateDynamicBlacklist(filteredAdRealTimeLogDStream: DStream[(Long, String)]) {
          
          // (timestamp province city userid adid)
          // 计算出每5个秒内的数据中,天天每一个用户每一个广告的点击量
          // 经过对原始实时日志的处理
          // 将日志的格式处理成<yyyyMMdd_userid_adid, 1L>格式
          val dailyUserAdClickDStream = filteredAdRealTimeLogDStream.map{ case (userid,log) =>
      
            // 从tuple中获取到每一条原始的实时日志
            val logSplited = log.split(" ")
      
            // 提取出日期(yyyyMMdd)、userid、adid
            val timestamp = logSplited(0)
            val date = new Date(timestamp.toLong)
            val datekey = DateUtils.formatDateKey(date)
      
            val userid = logSplited(3).toLong
            val adid = logSplited(4)
      
            // 拼接key
            val key = datekey + "_" + userid + "_" + adid
            (key, 1L)
          }
      
          // 针对处理后的日志格式,执行reduceByKey算子便可,(每一个batch中)天天每一个用户对每一个广告的点击量
          val dailyUserAdClickCountDStream = dailyUserAdClickDStream.reduceByKey(_ + _)
      
          // 源源不断的,每一个5s的batch中,当天每一个用户对每支广告的点击次数
          // <yyyyMMdd_userid_adid, clickCount>
          dailyUserAdClickCountDStream.foreachRDD{ rdd =>
            rdd.foreachPartition{ items =>
              // 对每一个分区的数据就去获取一次链接对象
              // 每次都是从链接池中获取,而不是每次都建立
              // 写数据库操做,性能已经提到最高了
      
              val adUserClickCounts = ArrayBuffer[AdUserClickCount]()
              for(item <- items){
                val keySplited = item._1.split("_")
                val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))
                // yyyy-MM-dd
                val userid = keySplited(1).toLong
                val adid = keySplited(2).toLong
                val clickCount = item._2
      
                //批量插入
                adUserClickCounts += AdUserClickCount(date, userid,adid,clickCount)
              }
              AdUserClickCountDAO.updateBatch(adUserClickCounts.toArray)
            }
          }
      
          // 如今咱们在mysql里面,已经有了累计的天天各用户对各广告的点击量
          // 遍历每一个batch中的全部记录,对每条记录都要去查询一下,这一天这个用户对这个广告的累计点击量是多少
          // 从mysql中查询
          // 查询出来的结果,若是是100,若是你发现某个用户某天对某个广告的点击量已经大于等于100了
          // 那么就断定这个用户就是黑名单用户,就写入mysql的表中,持久化
          val blacklistDStream = dailyUserAdClickCountDStream.filter{ case (key, count) =>
            val keySplited = key.split("_")
      
            // yyyyMMdd -> yyyy-MM-dd
            val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))
            val userid = keySplited(1).toLong
            val adid = keySplited(2).toLong
      
            // 从mysql中查询指定日期指定用户对指定广告的点击量
            val clickCount = AdUserClickCountDAO.findClickCountByMultiKey(date, userid, adid)
      
            // 判断,若是点击量大于等于100,ok,那么很差意思,你就是黑名单用户
            // 那么就拉入黑名单,返回true
            if(clickCount >= 100) {
              true
            }else{
              // 反之,若是点击量小于100的,那么就暂时不要管它了
              false
            }
      } 
    复制代码

2.4 计算广告点击流量实时统计结果(yyyyMMdd_province_city_adid,clickCount)

  • 转化为省城市粒度进行过滤,抛弃userId,格式为:(yyyyMMdd_province_city_adid,clickCount)

    val adRealTimeStatDStream = calculateRealTimeStat(filteredAdRealTimeLogDStream)
      
      def calculateRealTimeStat(filteredAdRealTimeLogDStream:DStream[(Long, String)]):DStream[(String, Long)] = {
      
          // 计算天天各省各城市各广告的点击量
          // 设计出来几个维度:日期、省份、城市、广告
          // 2015-12-01,当天,能够看到当天全部的实时数据(动态改变),好比江苏省南京市
          // 广告能够进行选择(广告主、广告名称、广告类型来筛选一个出来)
          // 拿着date、province、city、adid,去mysql中查询最新的数据
          // 等等,基于这几个维度,以及这份动态改变的数据,是能够实现比较灵活的广告点击流量查看的功能的
      
          // date province city userid adid
          // date_province_city_adid,做为key;1做为value
          // 经过spark,直接统计出来全局的点击次数,在spark集群中保留一份;在mysql中,也保留一份
          // 咱们要对原始数据进行map,映射成<date_province_city_adid,1>格式
          // 而后呢,对上述格式的数据,执行updateStateByKey算子
          // spark streaming特有的一种算子,在spark集群内存中,维护一份key的全局状态
          //(userid, timestamp province city userid adid)
          val mappedDStream = filteredAdRealTimeLogDStream.map{ case (userid, log) =>
            val logSplited = log.split(" ")
      
            val timestamp = logSplited(0)
            val date = new Date(timestamp.toLong)
            val datekey = DateUtils.formatDateKey(date)
      
            val province = logSplited(1)
            val city = logSplited(2)
            val adid = logSplited(4).toLong
      
            val key = datekey + "_" + province + "_" + city + "_" + adid
      
            (key, 1L)
          }
      
          // 在这个dstream中,就至关于,有每一个batch rdd累加的各个key(各天各省份各城市各广告的点击次数)
          // 每次计算出最新的值,就在aggregatedDStream中的每一个batch rdd中反应出来
          val aggregatedDStream = mappedDStream.updateStateByKey[Long]{ (values:Seq[Long], old:Option[Long]) =>
            // 举例来讲
            // 对于每一个key,都会调用一次这个方法
            // 好比key是<20151201_Jiangsu_Nanjing_10001,1>,就会来调用一次这个方法7
            // 10个
      
            // values,(1,1,1,1,1,1,1,1,1,1)
      
            // 首先根据optional判断,以前这个key,是否有对应的状态
            var clickCount = 0L
      
            // 若是说,以前是存在这个状态的,那么就以以前的状态做为起点,进行值的累加
            if(old.isDefined) {
              clickCount = old.get
            }
      
            // values,表明了,batch rdd中,每一个key对应的全部的值
            for(value <- values) {
              clickCount += value
            }
      
            Some(clickCount)
          }
      
          // 将计算出来的最新结果,同步一份到mysql中,以便于j2ee系统使用
          aggregatedDStream.foreachRDD{ rdd =>
      
            rdd.foreachPartition{ items =>
      
              //批量保存到数据库
              val adStats = ArrayBuffer[AdStat]()
      
              for(item <- items){
                val keySplited = item._1.split("_")
                val date = keySplited(0)
                val province = keySplited(1)
                val city = keySplited(2)
                val adid = keySplited(3).toLong
      
                val clickCount = item._2
                adStats += AdStat(date,province,city,adid,clickCount)
              }
              AdStatDAO.updateBatch(adStats.toArray)
      
            }
      
          }
      
          aggregatedDStream
    }
    复制代码

2.5 实时统计天天每一个省份top3热门广告

  • 转化为省粒度进行过滤,抛弃userId ,cityid,格式为:(yyyyMMdd_province_adid,clickCount)

  • 注册成表,基于ROW_NUMBER()实现窗聚合,按照province分区,实现top3排序,

    tmp_daily_ad_click_count_by_prov
    
     calculateProvinceTop3Ad(spark,adRealTimeStatDStream)
     
     def calculateProvinceTop3Ad(spark:SparkSession, adRealTimeStatDStream:DStream[(String, Long)]) {
    
     // 每个batch rdd,都表明了最新的全量的天天各省份各城市各广告的点击量
     //(yyyyMMdd_province_city_adid,clickCount)
     val rowsDStream = adRealTimeStatDStream.transform{ rdd =>
    
       // <yyyyMMdd_province_city_adid, clickCount>
       // <yyyyMMdd_province_adid, clickCount>
    
       // 计算出天天各省份各广告的点击量
       val mappedRDD = rdd.map{ case (keyString, count) =>
    
         val keySplited = keyString.split("_")
         val date = keySplited(0)
         val province = keySplited(1)
         val adid = keySplited(3).toLong
         val clickCount = count
    
         val key = date + "_" + province + "_" + adid
         (key, clickCount)
       }
    
       val dailyAdClickCountByProvinceRDD = mappedRDD.reduceByKey( _ + _ )
    
       // 将dailyAdClickCountByProvinceRDD转换为DataFrame
       // 注册为一张临时表
       // 使用Spark SQL,经过开窗函数,获取到各省份的top3热门广告
       val rowsRDD = dailyAdClickCountByProvinceRDD.map{ case (keyString, count) =>
    
         val keySplited = keyString.split("_")
         val datekey = keySplited(0)
         val province = keySplited(1)
         val adid = keySplited(2).toLong
         val clickCount = count
    
         val date = DateUtils.formatDate(DateUtils.parseDateKey(datekey))
    
         (date, province, adid, clickCount)
    
       }
    
       import spark.implicits._
       val dailyAdClickCountByProvinceDF = rowsRDD.toDF("date","province","ad_id","click_count")
    
       // 将dailyAdClickCountByProvinceDF,注册成一张临时表
       dailyAdClickCountByProvinceDF.createOrReplaceTempView("tmp_daily_ad_click_count_by_prov")
    
       // 使用Spark SQL执行SQL语句,配合开窗函数,统计出各身份top3热门的广告
       val provinceTop3AdDF = spark.sql(
         "SELECT "
             + "date,"
             + "province,"
             + "ad_id,"
             + "click_count "
           + "FROM ( "
             + "SELECT "
               + "date,"
               + "province,"
               + "ad_id,"
               + "click_count,"
               + "ROW_NUMBER() OVER(PARTITION BY province ORDER BY click_count DESC) rank "
             + "FROM tmp_daily_ad_click_count_by_prov "
             + ") t "
           + "WHERE rank<=3"
       )
    
       provinceTop3AdDF.rdd
     }
    
     // 每次都是刷新出来各个省份最热门的top3广告,将其中的数据批量更新到MySQL中
     rowsDStream.foreachRDD{ rdd =>
       rdd.foreachPartition{ items =>
    
         // 插入数据库
         val adProvinceTop3s = ArrayBuffer[AdProvinceTop3]()
    
         for (item <- items){
           val date = item.getString(0)
           val province = item.getString(1)
           val adid = item.getLong(2)
           val clickCount = item.getLong(3)
           adProvinceTop3s += AdProvinceTop3(date,province,adid,clickCount)
         }
         AdProvinceTop3DAO.updateBatch(adProvinceTop3s.toArray)
    
       }
     }
    复制代码

    }

2.6 实时统计天天每一个广告在最近1小时的滑动窗口内的点击趋势(每分钟的点击量)

  • 版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,若有任何技术交流,可随时联系。

  • 转化为时间粒度进行过滤,抛弃province,userId ,cityid,格式为: <yyyyMMddHHMM_adid,1L>,基于reduceByKeyAndWindow进行聚合

  • 最终结果展开 (date,hour,minute,adid,clickCount)

    calculateAdClickCountByWindow(adRealTimeValueDStream)
      def calculateAdClickCountByWindow(adRealTimeValueDStream:DStream[String]) {
    
      // 映射成<yyyyMMddHHMM_adid,1L>格式
      //(timestamp province city userid adid)
      val pairDStream = adRealTimeValueDStream.map{ case consumerRecord  =>
        val logSplited = consumerRecord.split(" ")
        val timeMinute = DateUtils.formatTimeMinute(new Date(logSplited(0).toLong))
        val adid = logSplited(4).toLong
    
        (timeMinute + "_" + adid, 1L)
      }
    
      // 计算窗口函数,1小时滑动窗口内的广告点击趋势
      val aggrRDD = pairDStream.reduceByKeyAndWindow((a:Long,b:Long) => (a + b),Minutes(60L), Seconds(10L))
    
      // 最近1小时内,各分钟的点击量,并保存到数据库
      aggrRDD.foreachRDD{ rdd =>
        rdd.foreachPartition{ items =>
          //保存到数据库
          val adClickTrends = ArrayBuffer[AdClickTrend]()
          for (item <- items){
            val keySplited = item._1.split("_")
            // yyyyMMddHHmm
            val dateMinute = keySplited(0)
            val adid = keySplited(1).toLong
            val clickCount = item._2
    
            val date = DateUtils.formatDate(DateUtils.parseDateKey(dateMinute.substring(0, 8)))
            val hour = dateMinute.substring(8, 10)
            val minute = dateMinute.substring(10)
    
            adClickTrends += AdClickTrend(date,hour,minute,adid,clickCount)
          }
          AdClickTrendDAO.updateBatch(adClickTrends.toArray)
        }
      }
    }
    复制代码

3 总结

温故而知新,本文为了综合复习,进行代码总结,内容粗鄙,勿怪

版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,若有任何技术交流,可随时联系。

秦凯新 于深圳

来源:掘金

著做权归做者全部。商业转载请联系做者得到受权,非商业转载请注明出处。

相关文章
相关标签/搜索