第4章 离线推荐服务建设4.1 离线推荐服务4.2 离线统计服务4.2.1 离线统计服务主体框架4.2.2 历史热门商品统计4.2.3 最近热门商品统计4.2.4 商品平均得分统计4.2.5 将 DF 数据写入 MongoDB 数据库对应的表中的方法4.3 基于隐语义模型的协同过滤推荐(类似推荐)4.3.1 用户商品推荐列表4.3.2 商品类似度矩阵4.3.3 模型评估和参数选取第5章 实时推荐服务建设5.1 实时推荐服务5.2 实时推荐模型和代码框架5.2.1 实时推荐模型算法设计5.2.2 实时推荐模块框架5.3 实时推荐算法的实现5.3.1 获取用户的 K 次最近评分5.3.2 获取当前商品最类似的 K 个商品5.3.3 商品推荐优先级计算5.3.4 将结果保存到 mongoDB5.3.5 更新实时推荐结果5.4 实时系统联调5.4.1 启动实时系统的基本组件5.4.2 启动 zookeeper 集群(使用群起脚本)5.4.3 启动 kafka 集群(使用群起脚本)5.4.4 构建 Kafka Streaming 程序5.4.5 配置并启动 flume5.4.6 启动业务系统后台第6章 冷启动问题处理第7章 其它形式的离线推荐服务(类似推荐)7.1 基于内容的协同过滤推荐(类似推荐)7.2 基于物品的协同过滤推荐(类似推荐)第8章 程序部署与运行html
离线推荐服务是综合用户全部的历史数据,利用设定的离线统计算法
和离线推荐算法
周期性的进行结果统计与保存,计算的结果在必定时间周期内是固定不变的,变动的频率取决于算法调度的频率。
离线推荐服务主要计算一些能够预先进行统计和计算的指标,为实时计算和前端业务相应提供数据支撑。
离线推荐服务主要分为统计推荐
、基于隐语义模型的协同过滤推荐
以及基于内容的类似推荐
和基于 Item-CF 的类似推荐
。咱们这一章主要介绍前两部分,基于内容的推荐
和 基于 Item-CF 的推荐
在总体结构和实现上是相似的,咱们将在第 7 章详细介绍。前端
在 recommender 下新建子项目 StatisticsRecommender,pom.xml 文件中只需引入 spark、scala 和 mongodb 的相关依赖:java
<dependencies>
<!-- Spark 的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驱动 -->
<!-- 用于代码方式链接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于 Spark 和 MongoDB 的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
在 resources 文件夹下引入 log4j.properties,而后在 src/main/scala 下新建 scala 单例 object 对象 com.atguigu.statistics.StatisticsRecommender。
一样,咱们应该先建好样例类,在 main() 方法中定义配置、建立 SparkSession 并加载数据,最后关闭 spark。代码以下:redis
src/main/scala/com.atguigu.statistics/StatisticsRecommender.scala算法
// 定义样例类
// 注意:spark mllib 中有 Rating 类,为了便于区别,咱们从新命名为 ProductRating
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
object StatisticsRecommender {
// 定义 MongoDB 中存储的表名
val MONGODB_RATING_COLLECTION = "Rating"
// 定义 MongoDB 中统计表的名称
val RATE_MORE_PRODUCTS = "RateMoreProducts"
val RATE_MORE_RECENTLY_PRODUCTS = "RateMoreRecentlyProducts"
val AVERAGE_PRODUCTS_SCORE = "AverageProductsScore"
def main(args: Array[String]): Unit = {
// 定义用到的配置参数
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 建立一个 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommender")
// 建立一个 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一个 sparkContext
val sc = spark.sparkContext
// 声明一个隐式的配置对象,方便重复调用(当屡次调用对 MongoDB 的存储或读写操做时)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隐式转换:在对 DataFrame 和 Dataset 进行操做许多操做都须要这个包进行支持
import spark.implicits._
// 将 MongoDB 中的数据加载进来,并转换为 DataFrame
val ratingDF = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.toDF()
// 建立一张名为 ratings 的临时表
ratingDF.createOrReplaceTempView("ratings")
// TODO: 用 sparK sql 去作不一样的统计推荐结果
// 一、历史热门商品统计(按照商品的评分次数统计)数据结构是:productId, count
// 二、最近热门商品统计,即统计以月为单位每一个商品的评分个数(须要将时间戳转换成 yyyyMM 格式后,按照商品的评分次数统计)数据结构是:productId, count, yearmonth
// 三、商品平均得分统计(即优质商品统计)数据结构是:productId,avg
// 关闭 Spark
spark.stop()
}
}
根据全部历史评分数据,计算历史评分次数最多的商品。
实现思路:经过 Spark SQL 读取评分数据集,统计全部评分中评分个数最多的商品,而后按照从大到小排序,将最终结果写入 MongoDB 的 RateMoreProducts 数据集中。sql
// 一、历史热门商品统计(按照商品的评分次数统计)数据结构是:productId, count
val rateMoreProductsDF = spark.sql("select productId, count(productId) as count from ratings group by productId order by count desc")
storeDFInMongoDB(rateMoreProductsDF, RATE_MORE_PRODUCTS)
根据评分,按月为单位计算最近时间的月份里面评分数个数最多的商品集合。
实现思路:经过 Spark SQL 读取评分数据集,经过 UDF 函数将评分的数据时间修改成月,而后统计每个月商品的评分数。统计完成以后将数据写入到 MongoDB 的 RateMoreRecentlyProducts 数据集中。mongodb
// 二、最近热门商品统计,即统计以月为单位每一个商品的评分个数(须要将时间戳转换成 yyyyMM 格式后,按照商品的评分次数统计)数据结构是:productId, count, yearmonth
// 建立一个日期格式化工具
val simpleDateFormat = new SimpleDateFormat("yyyyMM")
// 注册 UDF,将 时间戳 timestamp 转化为年月格式 yyyyMM,注意:时间戳 timestamp 的单位是 秒,而日期格式化工具中 Date 须要的是 毫秒,且 format() 的结果是 字符串,须要转化为 Int 类型
spark.udf.register("changeDate", (x: Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)
// 把原始的 ratings 数据转换成想要的数据结构:productId, score, yearmonth,而后建立对应的临时表
val ratingOfYearMonthDF = spark.sql("select productId, score, changeDate(timestamp) as yearmonth from ratings")
// 将新的数据集注册成为一张临时表
ratingOfYearMonthDF.createOrReplaceTempView("ratingOfMonth")
val rateMoreRecentlyProductsDF = spark.sql("select productId, count(productId) as count, yearmonth from " +
"ratingOfMonth group by yearmonth, productId order by yearmonth desc, count desc")
storeDFInMongoDB(rateMoreRecentlyProductsDF, RATE_MORE_RECENTLY_PRODUCTS)
根据历史数据中全部用户对商品的评分,周期性的计算每一个商品的平均得分。
实现思路:经过 Spark SQL 读取保存在 MongDB 中的 Rating 数据集,经过执行如下 SQL 语句实现对于商品的平均分统计。统计完成以后将生成的新的 DataFrame 写出到 MongoDB 的 AverageProductsScore 集合中。数据库
// 三、商品平均得分统计(即优质商品统计)数据结构是:productId,avg
val averageProductsScoreDF = spark.sql("select productId, avg(score) as avg from ratings group by productId order by avg desc")
storeDFInMongoDB(averageProductsScoreDF, AVERAGE_PRODUCTS_SCORE)
/**
* 将 DF 数据写入 MongoDB 数据库对应的表中的方法
*
* @param df
* @param collection_name
*/
def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig) = {
df.write
.option("uri", mongoConfig.uri)
.option("collection", collection_name)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
}
项目采用 ALS(交替最小二乘法) 做为协同过滤算法,根据 MongoDB 中的用户评分表 计算离线的用户商品推荐列表以及商品类似度矩阵。apache
经过 ALS 训练出来的 Model 来计算全部当前用户商品的推荐列表,主要思路以下:
一、userId 和 productId 作笛卡尔积,产生 (userId, productId) 的元组。
二、经过模型预测 (userId, productId) 对应的评分。
三、将预测结果经过预测分值进行排序。
四、返回分值最大的 K 个商品,做为当前用户的推荐列表。
最后生成的数据结构以下:将数据保存到 MongoDB 的 UserRecs 表中。
bootstrap
<dependencies>
<!-- java 线性代数的库 -->
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
<!-- Spark 的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驱动 -->
<!-- 用于代码方式链接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于 Spark 和 MongoDB 的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
一样通过前期的构建样例类、声明配置、建立 SparkSession 等步骤,能够加载数据开始计算模型了。
核心代码以下:
src/main/scala/com.atguigu.offline/OfflineRecommender.scala
// 定义样例类
// 注意:spark mllib 中有 Rating 类,为了便于区别,咱们从新命名为 ProductRating
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
// 标准推荐对象,productId, score
case class Recommendation(productId: Int, score: Double)
// 用户推荐列表
case class UserRecs(userId: Int, recs: Seq[Recommendation])
// 商品类似度列表(商品类似度矩阵/商品推荐列表)
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object OfflineRecommender {
// 定义 MongoDB 中存储的表名
val MONGODB_RATING_COLLECTION = "Rating"
// 定义 MongoDB 中推荐表的名称
val USER_RECS = "UserRecs"
val PRODUCT_RECS = "ProductRecs"
val USER_MAX_RECOMMENDATION = 20
def main(args: Array[String]): Unit = {
// 定义用到的配置参数
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 建立一个 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
// 建立一个 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一个 sparkContext
val sc = spark.sparkContext
// 声明一个隐式的配置对象,方便重复调用(当屡次调用对 MongoDB 的存储或读写操做时)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隐式转换:在对 DataFrame 和 Dataset 进行操做许多操做都须要这个包进行支持
import spark.implicits._
// 将 MongoDB 中的数据加载进来,并转换为 RDD,以后进行 map 遍历转换为 三元组形式的 RDD,并缓存
val ratingRDD = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(productRating => (productRating.userId, productRating.productId, productRating.score))
.cache()
// 提取出用户和商品的数据集,并去重
val userRDD = ratingRDD.map(_._1).distinct()
val productRDD = ratingRDD.map(_._2).distinct()
// TODO: 核心计算过程 -- 基于 LFM 模型的协同过滤推荐(类似推荐)
// 一、训练隐语义模型
// 建立训练数据集
val trainDataRDD = ratingRDD.map(x => Rating(x._1, x._2, x._3)) // Rating(user, product, rating)
// rank 是模型中隐语义因子(特征)的个数, iterations 是迭代的次数, lambda 是 ALS 的正则化参数
val (rank, iterations, lambda) = (5, 10, 0.001)
val model = ALS.train(trainDataRDD, rank, iterations, lambda)
// 二、获取预测评分矩阵,获得用户的商品推荐列表(用户推荐矩阵)
// 用 userRDD 和 productRDD 作一个笛卡尔积,获得一个空的 userProductsRDD: RDD[(userId, productId)]
val userProductsRDD = userRDD.cartesian(productRDD)
// 执行模型预测,获取预测评分矩阵,predictRatingRDD: RDD[Rating(userId, productId, rating)]
val predictRatingRDD = model.predict(userProductsRDD)
// 从预测评分矩阵中提取获得用户推荐列表
// (先过滤 filter,而后 map 转换为 KV 结构,再 groupByKey,再 map 封装样例类1,sortWith 后 take 再 map 封装样例类2)
val userRecsDF = predictRatingRDD.filter(_.rating > 0)
.map(
rating =>
(rating.user, (rating.product, rating.rating))
)
.groupByKey()
.map {
case (userId, recs) =>
// UserRecs(userId, recs.toList.sortBy(_._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))
UserRecs(userId, recs.toList.sortWith(_._2 > _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))
} // userRecsRDD: RDD[(userId, Seq[(productId, score)])]
.toDF()
// 将 DF 数据写入 MongoDB 数据库对应的表中
storeDFInMongoDB(userRecsDF, USER_RECS)
// 三、利用商品的特征矩阵,计算商品的类似度列表(商品类似度矩阵)
spark.stop()
}
经过 ALS 计算商品类似度矩阵,该矩阵用于查询当前商品的类似商品并为实时推荐系统服务。
核心代码以下:
// 三、利用商品的特征矩阵,计算商品的类似度列表(商品类似度矩阵)
// 经过训练出的 model 的 productFeatures 方法,获得 商品的特征矩阵
// 数据格式 RDD[(scala.Int, scala.Array[scala.Double])]
val productFeaturesRDD = model.productFeatures.map {
case (productId, featuresArray) =>
(productId, new DoubleMatrix(featuresArray))
}
// 将 商品的特征矩阵 和 商品的特征矩阵 作一个笛卡尔积,获得一个空的 productFeaturesCartRDD
val productFeaturesCartRDD = productFeaturesRDD.cartesian(productFeaturesRDD)
// 获取 商品类似度列表(商品类似度矩阵/商品推荐列表)
val productSimDF = productFeaturesCartRDD
.filter { // 过滤掉本身与本身作笛卡尔积的数据
case (a, b) =>
a._1 != b._1
}
.map { // 计算余弦类似度
case (a, b) =>
val simScore = this.consinSim(a._2, b._2)
// 返回一个二元组 productSimRDD: RDD[(productId, (productId, consinSim))]
(a._1, (b._1, simScore))
}
.filter(_._2._2 > 0.6)
.groupByKey()
.map {
case (productId, recs) =>
ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
} // productSimGroupRDD: RDD[(productId, Seq[(productId, consinSim)])]
.toDF()
// 将 DF 数据写入 MongoDB 数据库对应的表中
storeDFInMongoDB(productSimDF, PRODUCT_RECS)
其中,consinSim 是求两个向量余弦类似度的函数,代码实现以下:
/**
* 计算两个商品之间的余弦类似度(使用的是向量点积公式)
*
* @param product1
* @param product2
* @return
*/
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = {
// dot 表示点积,norm2 表示模长,模长就是 L2范式
product1.dot(product2) / (product1.norm2() * product2.norm2()) // l1范数:向量元素绝对值之和;l2范数:即向量的模长(向量的长度)
}
在上述模型训练的过程当中,咱们直接给定了隐语义模型的 rank,iterations,lambda 三个参数。对于咱们的模型,这并不必定是最优的参数选取,因此咱们须要对模型进行评估。一般的作法是计算均方根偏差(RMSE),考察预测评分与实际评分之间的偏差。
def main(args: Array[String]): Unit = {
// 定义用到的配置参数
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 建立一个 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ALSTrainer")
// 建立一个 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一个 sparkContext
val sc = spark.sparkContext
// 声明一个隐式的配置对象,方便重复调用(当屡次调用对 MongoDB 的存储或读写操做时)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隐式转换:在对 DataFrame 和 Dataset 进行操做许多操做都须要这个包进行支持
import spark.implicits._
// 将 MongoDB 中的数据加载进来,并转换为 RDD,以后进行 map 遍历转换为 RDD(样例类是 spark mllib 中的 Rating),并缓存
val ratingRDD = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(productRating => Rating(productRating.userId, productRating.productId, productRating.score))
.cache()
// ratingRDD: RDD[Rating(user, product, rating)]
// 将一个 RDD 随机切分红两个 RDD,用以划分训练集和测试集
val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
val trainingDataRDD = splits(0)
val testinggDataRDD = splits(1)
// 输出最优参数
adjustALSParams(trainingDataRDD, testinggDataRDD)
// 关闭 Spark
spark.close()
}
其中 adjustALSParams 方法是模型评估的核心,输入一组训练数据和测试数据,输出计算获得最小 RMSE 的那组参数。代码实现以下:
/**
* 输出最优参数的方法:输入一组训练数据和测试数据,输出计算获得最小 RMSE 的那组参数
*
* @param trainingDataRDD
* @param testinggData
*/
def adjustALSParams(trainingDataRDD: RDD[Rating], testinggData: RDD[Rating]) = {
// 这里指定迭代次数为 10,rank 和 lambda 在几个值中选取调整
val result = for (rank <- Array(50, 100, 150, 200); lambda <- Array(1, 0.1, 0.01, 0.001))
yield { // yield 表示把 for 循环的每一次中间结果保存下来
val model = ALS.train(trainingDataRDD, rank, 10, lambda)
val rmse = getRMSE(model, testinggData)
(rank, lambda, rmse)
}
// 按照 rmse 排序
// println(result.sortBy(_._3).head)
println(result.minBy(_._3))
}
计算 RMSE 的函数 getRMSE 代码实现以下:
/**
* 计算 RMSE
*
* @param model
* @param testinggDataRDD
*/
def getRMSE(model: MatrixFactorizationModel, testinggDataRDD: RDD[Rating]) = {
// 将 三元组数据 转化为 二元组数据
// testinggDataRDD: RDD[Rating(userId, productId, rating)]
val userProductsRDD = testinggDataRDD.map(rating => (rating.user, rating.product))
// 执行模型预测,获取预测评分矩阵
// predictRatingRDD: RDD[Rating(userId, productId, rating)]
val predictRatingRDD = model.predict(userProductsRDD)
// 测试数据的真实评分
val realRDD = testinggDataRDD.map(rating => ((rating.user, rating.product), rating.rating))
// 测试数据的预测评分
val predictRDD = predictRatingRDD.map(rating => ((rating.user, rating.product), rating.rating))
// 计算 RMSE(测试数据的真实评分 与 测试数据的预测评分 作内链接操做)
sqrt(
realRDD.join(predictRDD).map {
case ((userId, productId), (real, predict)) =>
// 真实值和预测值之间的差
val err = real - predict
err * err
}.mean()
)
}
运行代码,咱们就能够获得目前数据的最优模型参数。
实时计算与离线计算应用于推荐系统上最大的不一样在于实时计算推荐结果应该反映最近一段时间用户近期的偏好
,而离线计算推荐结果则是根据用户从第一次评分起的全部评分记录来计算用户整体的偏好
。
用户对物品的偏好随着时间的推移老是会改变的。好比一个用户 u 在某时刻对商品 p 给予了极高的评分,那么在近期一段时候,u 极有可能很喜欢与商品 p 相似的其余商品;而若是用户 u 在某时刻对商品 q 给予了极低的评分,那么在近期一段时候,u 极有可能不喜欢与商品 q 相似的其余商品。因此对于实时推荐,当用户对一个商品进行了评价后,用户会但愿推荐结果基于最近这几回评分进行必定的更新,使得推荐结果匹配用户近期的偏好,知足用户近期的口味。
若是实时推荐继续采用离线推荐中的 ALS 算法,因为 ALS 算法运行时间巨大(好几分钟甚至好十几分钟)
,不具备实时获得新的推荐结果的能力;而且因为算法自己的使用的是用户评分表,用户本次评分后只更新了总评分表中的一项,使得算法运行后的推荐结果与用户本次评分以前的推荐结果基本没有多少差异,从而给用户一种推荐结果一直没变化的感受,很影响用户体验。
另外,在实时推荐中因为时间性能上要知足实时或者准实时的要求,因此算法的计算量不能太大,避免复杂、过多的计算形成用户体验的降低。鉴于此,推荐精度每每不会很高。实时推荐系统更关心推荐结果的动态变化能力,只要更新推荐结果的理由合理便可
,至于推荐的精度要求则能够适当放宽
。
因此对于实时推荐算法,主要有两点需求:
(1)用户本次评分后、或最近几个评分后系统能够明显的更新推荐结果。
(2)计算量不大,知足响应时间上的实时或者准实时要求。
咱们在 recommender 下新建子项目 OnlineRecommender,引入 spark、scala、mongo、redis 和 kafka 的依赖:
<dependencies>
<!-- Spark 的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驱动 -->
<!-- 用于代码方式链接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于 Spark 和 MongoDB 的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
<!-- Redis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
代码中首先定义样例类和一个链接助手对象(用于创建 redis 和 mongo 链接),并在 OnlineRecommender 中定义一些常量:
src/main/scala/com.atguigu.online/OnlineRecommender.scala
package com.atguigu.online
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer
// 定义样例类
// 链接助手对象(用于创建 redis 和 mongo 的链接)并序列化
object ConnHelper extends Serializable {
// 懒变量:使用的时候才初始化
lazy val jedis = new Jedis("hadoop102")
// 用于 MongoDB 中的一些复杂操做(读写以外的操做)
lazy val mongoClient = MongoClient(MongoClientURI("mongodb://hadoop102:27017/ECrecommender"))
}
// MongoDB 链接配置对象
case class MongoConfig(uri: String, db: String)
// 标准推荐对象,productId, score
case class Recommendation(productId: Int, score: Double)
// 用户推荐列表
case class UserRecs(userId: Int, recs: Seq[Recommendation])
// 商品类似度列表(商品类似度矩阵/商品推荐列表)
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object OnlineRecommender {
// 定义常量和表名
val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
val MONGODB_PRODUCT_RECS_COLLECTION = "ProductRecs"
val MONGODB_RATING_COLLECTION = "Rating"
val MAX_USER_RATINGS_NUM = 20
val MAX_SIM_PRODUCTS_NUM = 20
def main(args: Array[String]): Unit = {
}
}
实时推荐主体代码以下:
object OnlineRecommender {
// 定义常量和表名
val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
val MONGODB_PRODUCT_RECS_COLLECTION = "ProductRecs"
val MONGODB_RATING_COLLECTION = "Rating"
val MAX_USER_RATINGS_NUM = 20
val MAX_SIM_PRODUCTS_NUM = 20
def main(args: Array[String]): Unit = {
// 定义用到的配置参数
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender",
"kafka.topic" -> "ECrecommender"
)
// 建立一个 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OnlineRecommender")
// 建立一个 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一个 sparkContext
val sc = spark.sparkContext
// 建立一个 StreamContext
val ssc = new StreamingContext(sc, Seconds(2)) // 通常 500 毫秒以上
// 声明一个隐式的配置对象,方便重复调用(当屡次调用对 MongoDB 的存储或读写操做时)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隐式转换:在对 DataFrame 和 Dataset 进行操做许多操做都须要这个包进行支持
import spark.implicits._
// 加载数据:加载 MongoDB 中 ProductRecs 表的数据(商品类似度列表/商品类似度矩阵/商品推荐列表)
val simProductsMatrixMap = spark.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_RECS_COLLECTION)
.format("com.mongodb.spark.sql")
.load() // DF
.as[ProductRecs] // DS
.rdd // RDD
.map { recs =>
(recs.productId, recs.recs.map(item => (item.productId, item.score)).toMap)
}.collectAsMap() // Map[(productId, Map[(productId, score)])] 转换成 Map 结构,这么作的目的是:为了后续查询商品类似度方便
// 将 商品类似度 Map 广播出去
val simProductsMatrixMapBroadCast = sc.broadcast(simProductsMatrixMap)
// 建立到 Kafka 的链接
val kafkaPara = Map(
"bootstrap.servers" -> "hadoop102:9092", // 使用的是 Kafka 的高级 API
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "ECrecommender",
"auto.offset.reset" -> "latest"
)
// 建立 kafka InputDStream
val kafkaInputDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaPara)
)
// UID|PID|SCORE|TIMESTAMP
// userId|productId|score|timestamp
// 产生评分流
// ratingDStream: DStream[RDD, RDD, RDD, ...] -> RDD[(userId, productId, score, timestamp)]
val ratingDStream = kafkaInputDStream.map {
case msg =>
val attr = msg.value().split("\\|")
(attr(0).trim.toInt, attr(1).trim.toInt, attr(2).trim.toDouble, attr(3).toInt)
}
// TODO: 对评分流的处理流程
ratingDStream.foreachRDD {
rdds =>
rdds.foreach {
case (userId, productId, score, timestamp) =>
println("rating data coming! >>>>>>>>>>>>>>>>>>>> ")
// TODO: 核心实时推荐算法流程
// 一、从 redis 中获取 当前用户最近的 K 次商品评分,保存成一个数组 Array[(productId, score)]
val userRecentlyRatings = getUserRecentlyRatings(MAX_USER_RATINGS_NUM, userId, ConnHelper.jedis)
// 二、从 MongoDB 的 商品类似度列表 中获取 当前商品 p 的 K 个最类似的商品列表,做为候选商品列表,保存成一个数组 Array[(productId)]
val candidateProducts = getTopSimProducts(MAX_SIM_PRODUCTS_NUM, productId, userId, simProductsMatrixMapBroadCast.value)
// 三、计算每个 候选商品 q 的 推荐优先级得分,获得 当前用户的实时推荐列表,保存成一个数组 Array[(productId, score)]
// 为何不保存成 Recommendation 的列表呢?答:由于最后保存的过程中不用 DataFram 的 write() 方法了,而是将每个元素包装成 MongoDBObject 对象,而后插入列表中去
val streamRecs = computeProductsScore(candidateProducts, userRecentlyRatings, simProductsMatrixMapBroadCast.value)
// 四、将 当前用户的实时推荐列表数据 保存到 MongoDB
storeDataInMongDB(userId, streamRecs)
}
}
// 启动 Streaming 程序
ssc.start()
println(">>>>>>>>>>>>>>>>>>>> streaming started!")
ssc.awaitTermination()
}
实时推荐算法的前提:
一、在 Redis 集群中存储了每个用户最近对商品的 K 次评分。实时算法能够快速获取。
二、离线推荐算法已经将商品类似度矩阵提早计算到了 MongoDB 中。
三、Kafka 已经获取到了用户实时的评分数据。
算法过程以下:
实时推荐算法输入为一个评分流 <userId, productId, score, timestamp>
,而执行的核心内容包括:获取 userId 最近 K 次商品评分、获取 productId 最类似 K 个商品、计算候选商品的推荐优先级、更新对 userId 的实时推荐结果。
业务服务器在接收用户评分的时候,默认会将该评分状况以 userId, productId, score, timestamp
的格式插入到 Redis 中该用户对应的队列当中,在实时算法中,只须要经过 Redis 客户端获取相对应的队列内容便可。
// 由于 redis 操做返回的是 java 类,为了使用 map 操做须要引入转换类
import scala.collection.JavaConversions._
/**
* 一、从 redis 中获取 当前用户最近的 K 次商品评分,保存成一个数组 Array[(productId, score)]
*
* @param MAX_USER_RATINGS_NUM
* @param userId
* @param jedis
*/
def getUserRecentlyRatings(MAX_USER_RATINGS_NUM: Int, userId: Int, jedis: Jedis) = {
// redis 中的列表类型(list)能够存储一个有序的字符串列表
// 从 redis 中 用户的评分队列 里获取评分数据,list 中的 键 userId:4867 值 457976:5.0
jedis.lrange("userId:" + userId.toString, 0, MAX_USER_RATINGS_NUM)
.map { item =>
val attr = item.split("\\:")
(attr(0).trim.toInt, attr(1).trim.toDouble)
}
.toArray
}
在离线算法中,已经预先将商品的类似度矩阵进行了计算,因此每一个商品 productId 的最类似的 K 个商品很容易获取:从 MongoDB 中读取 ProductRecs 数据,从 productId 在 candidateProducts 对应的子哈希表中获取类似度前 K 大的那些商品。输出是数据类型为 Array[Int] 的数组,表示与 productId 最类似的商品集合,并命名为 candidateProducts 以做为候选商品集合。
/**
* 二、从 MongoDB 的 商品类似度列表 中获取 当前商品 p 的 K 个最类似的商品列表,做为候选商品列表,保存成一个数组 Array[(productId)]
*
* @param MAX_SIM_PRODUCTS_NUM
* @param productId
* @param userId
* @param simProductsMatrixMap
*/
def getTopSimProducts(MAX_SIM_PRODUCTS_NUM: Int, productId: Int, userId: Int, simProductsMatrixMap: collection.Map[Int, Map[Int, Double]])(implicit mongoConfig: MongoConfig) = {
// 一、从广播变量 商品类似度矩阵 中拿到当前商品的类似度商品列表
// simProductsMatrixMap: Map[(productId, Map[(productId, score)])]
// allSimProducts: Array[(productId, score)]
val allSimProducts = simProductsMatrixMap(productId).toArray
// 二、定义经过 MongoDB 客户端拿到的表操做对象
val ratingCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
// 获取用户已经评分过的商品(经过 MongoDBObject 对象)
val ratingExist = ratingCollection.find(MongoDBObject("userId" -> userId)).toArray.map(item => item.get("productId").toString.toInt)
// 三、过滤掉用户已经评分过的商品,排序输出
allSimProducts.filter(x => !ratingExist.contains(x._1))
.sortWith(_._2 > _._2)
.take(MAX_SIM_PRODUCTS_NUM)
.map(x => x._1)
}
对于候选商品集合 candidateProducts 和 userId 的最近 K 个评分 userRecentlyRatings,算法代码内容以下:
/**
* 三、计算每个 候选商品 q 的 推荐优先级得分,获得 当前用户的实时推荐列表,保存成一个数组 Array[(productId, score)]
*
* @param candidateProducts
* @param userRecentlyRatings
* @param simProductsMatrixMap
*/
def computeProductsScore(candidateProducts: Array[Int], userRecentlyRatings: Array[(Int, Double)], simProductsMatrixMap: collection.Map[Int, Map[Int, Double]]) = {
// 一、定义一个长度可变的数组 scala ArrayBuffer,用于保存每个候选商品的基础得分
val scores = ArrayBuffer[(Int, Double)]()
// 二、定义两个可变的 scala HashMap,用于保存每个候选商品的加强因子和减弱因子
val increMap = HashMap[Int, Int]()
val decreMap = HashMap[Int, Int]()
// 三、对 每个候选商品 和 每个已经评分的商品 计算推荐优先级得分
for (candidateProduct <- candidateProducts; userRecentlyRating <- userRecentlyRatings) {
// 获取当前 候选商品 和当前 最近评分商品 的类似度的得分
val simScore = getProductSimScore(candidateProduct, userRecentlyRating._1, simProductsMatrixMap)
if (simScore > 0.6) {
// 计算 候选商品 的基础得分
scores += ((candidateProduct, simScore * userRecentlyRating._2))
// 计算 加强因子 和 减弱因子
if (userRecentlyRating._2 > 3) {
increMap(candidateProduct) = increMap.getOrDefault(candidateProduct, 0) + 1
} else {
decreMap(candidateProduct) = decreMap.getOrDefault(candidateProduct, 0) + 1
}
}
}
// 四、根据备选商品的 productId 作 groupBy,而后根据公式最后求出候选商品的 推荐优先级得分 并排序
scores.groupBy(_._1).map {
case (productId, scoreList) =>
(productId, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(productId, 1)) - log(decreMap.getOrDefault(productId, 1)))
}.toArray.sortWith(_._2 > _._2)
}
其中,getTopSimProducts 是取候选商品和已评分商品的类似度,代码以下:
/**
* 获取当前 候选商品 和当前 最近评分商品 的类似度的得分,获得一个 Double
*
* @param productId1
* @param productId2
* @param simProductsMatrixMap
*/
def getProductSimScore(productId1: Int, productId2: Int, simProductsMatrixMap: collection.Map[Int, Map[Int, Double]]) = {
simProductsMatrixMap.get(productId1) match {
case Some(map) =>
map.get(productId2) match {
case Some(score) => score
case None => 0.0
}
case None => 0.0
}
}
而 log 是对数运算,这里实现为取 10 的对数(经常使用对数):
/**
* 求一个数以10为底数的对数(使用换底公式)
*
* @param m
* @return
*/
def log(m: Int): Double = {
val N = 10
math.log(m) / math.log(N) // 底数为 e => ln m / ln N = log m N = lg m
}
storeDataInMongDB 函数实现告终果的保存:
/**
* 四、将 当前用户的实时推荐列表数据 保存到 MongoDB
*
* @param userId
* @param streamRecs
*/
def storeDataInMongDB(userId: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit = {
// 到 StreamRecs 的链接
val streaRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
streaRecsCollection.findAndRemove(MongoDBObject("userId" -> userId))
streaRecsCollection.insert(MongoDBObject("userId" -> userId, "recs" -> streamRecs.map(x => MongoDBObject("productId" -> x._1, "score" -> x._2))))
}
当计算出候选商品的推荐优先级的数组 updatedRecommends<productId, E>
后,这个数组将被发送到 Web 后台服务器,与后台服务器上 userId 的上次实时推荐结果 recentRecommends<productId, E>
进行合并、替换并选出优先级 E 前 K 大的商品做为本次新的实时推荐。具体而言:
a、合并:将 updatedRecommends 与 recentRecommends 并集合成为一个新的 <productId, E>
数组;
b、替换(去重):当 updatedRecommends 与 recentRecommends 有重复的商品 productId 时,recentRecommends 中 productId 的推荐优先级因为是上次实时推荐的结果,因而将做废,被替换成表明了更新后的 updatedRecommends 的 productId 的推荐优先级;
c、选取 TopK:在合并、替换后的 <productId, E>
数组上,根据每一个 product 的推荐优先级,选择出前 K 大的商品,做为本次实时推荐的最终结果。
咱们的系统实时推荐的数据流向是:业务系统 -> 埋点日志 -> flume 日志采集 -> kafka streaming 数据清洗和预处理 -> spark streaming 流式计算。在咱们完成实时推荐服务的代码后,应该与其它工具进行联调测试,确保系统正常运行。
启动实时推荐系统 OnlineRecommender 以及 mongodb、redis。
参考连接:https://www.cnblogs.com/chenmingjun/p/10914837.html
启动 hadoop 集群命令:
[atguigu@hadoop102 hadoop-2.7.2]$ sbin/start-dfs.sh
[atguigu@hadoop103 hadoop-2.7.2]$ sbin/start-yarn.sh
[atguigu@hadoop102 ~]$ zkstart.sh
[atguigu@hadoop102 ~]$ kafka-start.sh
在 recommender 下新建 module,KafkaStreaming,主要用来作日志数据的预处理,过滤出须要的内容。pom.xml 文件须要引入依赖:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
</dependencies>
<build>
<finalName>kafkastream</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.atguigu.kafkastream.Application</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
在 src/main/java 下新建 java 类 com.atguigu.kafkastreaming.Application
package com.atguigu.kafkastream;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.Properties;
public class Application {
public static void main(String[] args) {
String brokers = "hadoop102:9092";
String zookeepers = "hadoop102:2181";
// 定义输入和输出的 topic
String from = "log";
String to = "ECrecommender";
// 定义 kafka streaming 的配置
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);
StreamsConfig config = new StreamsConfig(settings);
// 拓扑建构器
TopologyBuilder builder = new TopologyBuilder();
// 定义流处理的拓扑结构
builder.addSource("SOURCE", from)
.addProcessor("PROCESSOR", () -> new LogProcessor(), "SOURCE")
.addSink("SINK", to, "PROCESSOR");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
System.out.println("kafka stream started!");
}
}
这个程序会将 topic 为 “log” 的信息流获取来作处理,并以 “ECrecommender” 为新的 topic 转发出去。
流处理程序 LogProcess.java
package com.atguigu.kafkastream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
public class LogProcessor implements Processor<byte[], byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
}
@Override
public void process(byte[] dummy, byte[] line) { // dummy 表示 哑变量,没什么用
// 把收集到的日志信息用 String 表示
String input = new String(line);
// 根据前缀 PRODUCT_RATING_PREFIX: 从日志信息中提取评分数据
if (input.contains("PRODUCT_RATING_PREFIX:")) {
System.out.println("product rating data coming! >>>>>>>>>>>>>>>>>>>> " + input);
input = input.split("PRODUCT_RATING_PREFIX:")[1].trim();
context.forward("logProcessor".getBytes(), input.getBytes());
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
完成代码后,启动 Application。
在 flume 的 /opt/module/flume/job/ECrecommender 目录下(该目录任意)新建 flume-log-kafka.conf,对 flume 链接 kafka 作配置:
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink
# For each one of the sources, the type is defined.
agent.sources.exectail.type = exec
# 下面这个路径是须要收集日志的绝对路径,改成本身的日志目录(系统部署后的 tomcat 的日志目录)
agent.sources.exectail.command = tail –f /opt/module/tomcat/logs/catalina.out
agent.sources.exectail.interceptors = i1
agent.sources.exectail.interceptors.i1.type = regex_filter
# 定义日志过滤前缀的正则
agent.sources.exectail.interceptors.i1.regex = .+PRODUCT_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel
# Each sink's type must be defined.
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20
# Specify the channel the sink should use.
agent.sinks.kafkasink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel.
agent.channels.memoryChannel.capacity = 10000
配置好后,启动 flume:
[atguigu@hadoop102 flume]$ bin/flume-ng agent \
--conf conf/ --name a1 --conf-file job/ECrecommender/flume-log-kafka.conf \
-Dflume.root.logger=INFO,console
将业务代码加入系统中。注意在 src/main/resources/ 下的 log4j.properties 中,log4j.appender.file.File 的值应该替换为本身的日志目录,与 flume 中的配置应该相同(当 flume 与 业务代码在同一台机器上时这么作,不然 flume 指向的应该是系统部署后的 tomcat 的日志目录
)。
启动业务系统后台,访问 localhost:8088/index.html;点击某个商品进行评分,查看实时推荐列表是否会发生变化。
整个推荐系统更多的是依赖于用于的偏好信息进行商品的推荐,那么就会存在一个问题,对于新注册的用户是没有任何偏好信息记录的,那这个时候推荐就会出现问题,致使没有任何推荐的项目出现。
处理这个问题通常是经过当用户首次登录时,为用户提供交互式的窗口来获取用户对于物品的偏好,让用户勾选预设的兴趣标签
。
当获取用户的偏好以后,就能够直接给出相应类型商品的推荐。
原始数据中的 tag 文件,是用户给商品打上的标签,这部份内容想要直接转成评分并不容易,不过咱们能够将标签内容进行提取,获得商品的内容特征向量,进而能够经过求取商品内容类似度矩阵
。这部分能够与实时推荐系统直接对接,计算出与用户当前评分商品的类似商品,实现基于内容的实时推荐
。为了不热门标签对特征提取的影响,咱们还能够经过 TF-IDF 算法对标签的权重进行调整,从而尽量地接近用户偏好。
咱们在 recommender 下新建子项目 ContentRecommender,引入 spark、scala、mongo 和 jblas 的依赖:
<dependencies>
<!-- java 线性代数的库 -->
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
</dependency>
<!-- Spark 的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驱动 -->
<!-- 用于代码方式链接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于 Spark 和 MongoDB 的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
基于以上思想,加入 TF-IDF 算法将商品的标签内容进行提取,获得商品的内容特征向量 的核心代码以下:
src/main/scala/com.atguigu.content/ContentRecommender.scala
case class Product(productId: Int, name: String, imageUrl: String, categories: String, tags: String)
case class MongoConfig(uri: String, db: String)
// 标准推荐对象,productId, score
case class Recommendation(productId: Int, score: Double)
// 商品类似度列表(商品类似度矩阵/商品推荐列表)
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object ContentRecommender {
// 定义 mongodb 中存储的表名
val MONGODB_PRODUCT_COLLECTION = "Product"
val CONTENT_PRODUCT_RECS = "ContentBasedProductRecs"
def main(args: Array[String]): Unit = {
// 定义用到的配置参数
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 建立一个 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ContentRecommender")
// 建立一个 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一个 sparkContext
val sc = spark.sparkContext
// 声明一个隐式的配置对象,方便重复调用(当屡次调用对 MongoDB 的存储或读写操做时)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隐式转换:在对 DataFrame 和 Dataset 进行操做许多操做都须要这个包进行支持
import spark.implicits._
// 将 MongoDB 中的数据加载进来
val productTagsDF = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_PRODUCT_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[Product]
.map(product => (product.productId, product.name, product.tags.map(x => if (x == '|') ' ' else x))) // 由于 TF-IDF 默认使用的分词器的分隔符是空格
.toDF("productId", "name", "tags")
.cache()
// TODO: 用 TF-IDF 算法将商品的标签内容进行提取,获得商品的内容特征向量
// 一、实例化一个分词器,用来作分词,默认按照空格进行分词(注意:org.apache.spark.ml._ 下的 API 都是针对 DF 来操做的)
val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words")
// 用分词器作转换后,获得增长一个新列 words 的 DF
val wordsDataDF = tokenizer.transform(productTagsDF)
// 二、定义一个 HashingTF 工具,用于计算频次 TF(映射特征的过程使用的就是 Hash 算法,特征的数量就是 Hash 的分桶数量,若分桶的数量太小,会出现 Hash 碰撞,默认分桶很大,后面作笛卡尔积性能不好)
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rowFeatures").setNumFeatures(1000)
// 用 HashingTF 作转换
val featurizedDataDF = hashingTF.transform(wordsDataDF)
// 三、定义一个 IDF 工具,计算 TF-IDF
val idf = new IDF().setInputCol("rowFeatures").setOutputCol("features")
// 训练一个 idf 模型,将词频数据传入,获得 idf 模型(逆文档频率)
val idfModel = idf.fit(featurizedDataDF)
// 经过 idf 模型转换后,获得增长一个新列 features 的 DF,即用 TF-IDF 算法获得新的特征矩阵
val rescaleDataDF = idfModel.transform(featurizedDataDF)
// 测试
// rescaleDataDF.show(truncate = false)
// 对数据进行转换,获得所须要的 RDD
// 从获得的 rescaledDataDF 中提取特征向量
val productFeaturesRDD = rescaleDataDF
.map {
row => // DF 转换为 二元组
(row.getAs[Int]("productId"), row.getAs[SparseVector]("features").toArray)
}
.rdd
.map {
case (productId, featuresArray) =>
(productId, new DoubleMatrix(featuresArray))
}
// 将 商品的特征矩阵 和 商品的特征矩阵 作一个笛卡尔积,获得一个稀疏的 productFeaturesCartRDD
val productFeaturesCartRDD = productFeaturesRDD.cartesian(productFeaturesRDD)
// 测试
// productFeaturesCartRDD.foreach(println(_))
// 获取 商品类似度列表(商品类似度矩阵/商品推荐列表)
val productSimDF = productFeaturesCartRDD
.filter { // 过滤掉本身与本身作笛卡尔积的数据
case (a, b) =>
a._1 != b._1
}
.map { // 计算余弦类似度
case (a, b) =>
val simScore = this.consinSim(a._2, b._2)
// 返回一个二元组 productSimRDD: RDD[(productId, (productId, consinSim))]
(a._1, (b._1, simScore))
}
.filter(_._2._2 > 0.6)
.groupByKey()
.map {
case (productId, recs) =>
ProductRecs(productId, recs.toList.sortWith(_._2 > _._2).map(x => Recommendation(x._1, x._2)))
} // productSimGroupRDD: RDD[(productId, Seq[(productId, consinSim)])]
.toDF()
// 将 DF 数据写入 MongoDB 数据库对应的表中
storeDFInMongoDB(productSimDF, CONTENT_PRODUCT_RECS)
spark.stop()
}
/**
* 计算两个商品之间的余弦类似度(使用的是向量点积公式)
*
* @param product1
* @param product2
* @return
*/
def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double = {
// dot 表示点积,norm2 表示模长,模长就是 L2范式
product1.dot(product2) / (product1.norm2() * product2.norm2()) // l1范数:向量元素绝对值之和;l2范数:即向量的模长(向量的长度)
}
/**
* 将 DF 数据写入 MongoDB 数据库对应的表中的方法
*
* @param df
* @param collection_name
*/
def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig) = {
df.write
.option("uri", mongoConfig.uri)
.option("collection", collection_name)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
}
}
而后经过商品特征向量进而求出商品类似度矩阵,就能够在商品详情页给出类似推荐了;一般在电商网站中,用户浏览商品或者购买完成以后,都会显示相似的推荐列表。
获得的类似度矩阵也能够为实时推荐提供基础,获得用户推荐列表。能够看出,基于内容模型 和 基于隐语义模型
,目的都是为了提取出物品的特征向量
,从而能够计算出物品的类似度矩阵
。而咱们的实时推荐系统算法正是基于类似度来定义的。
基于物品的协同过滤(Item-CF),只需收集用户的常规行为数据(好比点击、收藏、购买等)就能够获得商品间的类似度,在实际项目中应用很广。
咱们在 recommender 下新建子项目 ItemCFRecommender,引入 spark、scala、mongo 和 jblas 的依赖:
<dependencies>
<!-- Spark 的依赖引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<!-- 引入 Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!-- 加入 MongoDB 的驱动 -->
<!-- 用于代码方式链接 MongoDB -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>casbah-core_2.11</artifactId>
<version>${casbah.version}</version>
</dependency>
<!-- 用于 Spark 和 MongoDB 的对接 -->
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>${mongodb-spark.version}</version>
</dependency>
</dependencies>
核心代码实现以下:
src/main/scala/com.atguigu.itemcf/ItemCFRecommender.scala
// 定义样例类
// 注意:spark mllib 中有 Rating 类,为了便于区别,咱们从新命名为 ProductRating
case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
case class MongoConfig(uri: String, db: String)
// 标准推荐对象,productId, score
case class Recommendation(productId: Int, score: Double)
// 商品类似度列表(商品类似度矩阵/商品推荐列表)
case class ProductRecs(productId: Int, recs: Seq[Recommendation])
object ItemCFRecommender {
// 定义 MongoDB 中存储的表名
val MONGODB_RATING_COLLECTION = "Rating"
// 定义 MongoDB 中推荐表的名称
val ITEM_CF_PRODUCT_RECS = "ItemCFProductRecs"
val MAX_RECOMMENDATION = 10
def main(args: Array[String]): Unit = {
// 定义用到的配置参数
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://hadoop102:27017/ECrecommender",
"mongo.db" -> "ECrecommender"
)
// 建立一个 SparkConf 配置
val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ItemCFRecommender")
// 建立一个 SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// 建立一个 sparkContext
val sc = spark.sparkContext
// 声明一个隐式的配置对象,方便重复调用(当屡次调用对 MongoDB 的存储或读写操做时)
implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
// 加入隐式转换:在对 DataFrame 和 Dataset 进行操做许多操做都须要这个包进行支持
import spark.implicits._
// 将 MongoDB 中的数据加载进来,获得 DF (userId, productId, count)
val ratingDF = spark
.read
.option("uri", mongoConfig.uri)
.option("collection", MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.map(productRating => (productRating.userId, productRating.productId, productRating.score))
.toDF("userId", "productId", "score")
.cache()
// TODO: 核心算法:基于物品的协同过滤推荐(类似推荐)--计算物品的同现类似度,获得商品的类似度列表
// 一、统计每一个商品的评分个数,使用 ratingDF 按照 productId 作 groupBy,获得 (productId, count)
val productRatingCountDF = ratingDF.groupBy("productId").count()
// 二、在原有的 rating 表中添加一列 count,获得新的 评分表,将 ratingDF 和 productRatingCountDF 作内链接 join 便可,获得 (productId, userId, score, count)
val ratingWithCountDF = ratingDF.join(productRatingCountDF, "productId")
// 三、将 新的评分表 按照 userId 作两两 join,统计两个商品被同一个用户评分过的次数,获得 (userId, productId1, score1, count1, productId2, score2, count2)
val joinedDF = ratingWithCountDF.join(ratingWithCountDF, "userId")
.toDF("userId", "productId1", "score1", "count1", "productId2", "score2", "count2") // 设置 DF 的列名称
.select("userId", "productId1", "count1", "productId2", "count2") // 设置 DF 显示的列
// 建立一个名为 joined 的临时表,用于写 sql 查询
joinedDF.createOrReplaceTempView("joined")
// 四、按照 productId1, productId2 作 groupBy,统计 userId 的数量,获得对两个商品同时评分的人数
val sql =
"""
|select productId1
|, productId2
|, count(userId) as cooCount
|, first(count1) as count1
|, first(count2) as count2
|from joined
|group by productId1, productId2
""".stripMargin
val cooccurrenceDF = spark.sql(sql).cache() // (productId1, productId2, cooCount, count1, count2)
val simDF = cooccurrenceDF
.map {
row =>
val coocSim = cooccurrenceSim(row.getAs[Long]("cooCount"), row.getAs[Long]("count1"), row.getAs[Long]("count2") )
(row.getInt(0), (row.getInt(1), coocSim))
}
.rdd
.groupByKey()
.map {
case (productId, recs) =>
ProductRecs(productId, recs.toList.filter(x => x._1 != productId).sortWith(_._2 > _._2).take(MAX_RECOMMENDATION).map(x => Recommendation(x._1, x._2)))
} // productSimGroupRDD: RDD[(productId, Seq[(productId, consinSim)])]
.toDF()
// 测试
// simDF.show()
// 将 DF 数据写入 MongoDB 数据库对应的表中
storeDFInMongoDB(simDF, ITEM_CF_PRODUCT_RECS)
spark.stop()
}
}
其中,计算同现类似度的函数代码实现以下:
/**
* 计算同现类似度
*
* @param cooCount
* @param count1
* @param count2
*/
def cooccurrenceSim(cooCount: Long, count1: Long, count2: Long) = {
cooCount / math.sqrt(count1 * count2)
}
其中,将 DF 数据写入 MongoDB 数据库对应的表中的函数代码实现以下:
/**
* 将 DF 数据写入 MongoDB 数据库对应的表中的方法
*
* @param df
* @param collection_name
*/
def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig) = {
df.write
.option("uri", mongoConfig.uri)
.option("collection", collection_name)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
}