spark数据挖掘 - 基于 Audioscrobbler 数据集音乐推荐实战

基于 Audioscrobbler 数据集音乐推荐实战

1. 数据集

这个例子将使用 Audioscrobbler 公开的数据集。Audioscrobbler是http://www.last.fm/zh/第一个音乐推荐系统.
http://www.last.fm/zh/ 是第一个网络流媒体音频网站,成立与2002年。
Audioscrobbler 为 “scrobbling” 提供了一个开发的 API,主要记录听众听取了哪些做家的歌曲。 这个网站利用这些信息创建了一个强大的额音乐推荐系统。这个系统达到了数百万用户,由于第三方的App和网站能够提供收听数据给推荐引擎。 在那个时期,研究推荐系统大部分局限在学习相似评级的数据集。也就是说,推荐的人每每使用须要输入像 “某某某 评分3.5分” 这样的工具。 然而,Audioscrobbler 数据集有趣地方在于仅仅记录播放的历史:“某某某 播放了 什么”。一个播放记录带来的信息量远远小于一个评分数据带来的信息量,可是评分数据总量确定没有播放历史记录的数据多,当大量播放历史记录放在一块儿的时候,比评分数据将更有价值。 由这个网站公布的一个2005年的数据集合能够在http://www-etud.iro.umontreal.ca/~bergstrj/audioscrobbler_data.html上面下载。这个数据集合没有解压以前大小是 135MB,解压以后是 500MB,解压以后将会看到主要的数据集是 user_artist_data.txt 文件,里面大约包含 141000 惟一的用户和 1.6 百万惟一的artist艺术家,大约 24.2 百万用户播放记录。 固然每一个 artist 都是用 id 记录的,id 与 名字的对照关系是在 artist_data.txt 中。注意同一个 artist 可能对应不少不一样名字即有不一样的 id。 因此这里还有一个文件 artist_alias.txt 标识每一个 artist 的别名。里面用一个惟一的id 标识全部同一个artist id 列表。html

2. 推荐算法

咱们须要选择一种适合于这种隐士反馈数据的算法。这个数据集所有是用户与做家的歌曲之间的交互行为数据。它没有包含用户和做家自己除了名字以外的其余属性信息。 这个就是典型的协同过滤算法。举个例子:好比决定两个用户具备相同的品味的缘由是他们有相同的年龄,这个不是协同过滤。决定两个用户都喜欢同一首歌曲的缘由是他们之间有不少共同喜欢的歌曲,这才是协同过滤。
这个数据集很大,由于他包含1千万多条用户播放记录。可是从另外一个方面来讲,它又很小数据量不够,由于它很稀疏。平均起来,每一个用户才放过171个艺术家的歌曲,而总共的艺术家有 1.6 百万个。有些用户甚至只是听歌一个做曲家的歌曲,咱们须要一种算法,可以对这种用户也给出合理的推荐。毕竟,每一个人刚开始在系统中开始产生记录的那一刻都只听过一个做家。这个也说明算法对新用户准确度低,这种状况当用户交互行为变多的时候会慢慢变好。 固然,咱们须要咱们的算法有能力扩展,处理大数据,而且很快。
接下来咱们例子里面展现的算法是普遍分类算法模型中的一个叫作隐因素模型。模型尝试经过观察不到的潜在的缘由去解释这些观察到的大量的用户产品交互行为。 更具体的来讲,这个例子将使用一个矩阵分解模型。下面介绍交叉最小方差模型。ios

交叉最小方差

算法名字 交叉最小方差, Alternating Least Squares, ALS
算法描述 Spark上的交替性最小二乘ALS本质是一种协同过滤的算法
算法原理 1. 首先将用户推荐对象交互历史转换为矩阵,行表示用户,列表示推荐对象,矩阵对应 i,j 表示用户 i 在对象 j 上有没有行为 <br /> 2. 协同过滤就是要像填数独同样,填满1获得的矩阵,采用的方法是矩阵分解算法原理图 算法原理图 <br /> 3. 原始矩阵 A 是一个很大的稀疏矩阵,而后利用 ALS 分解成近似两个矩阵 B 和 C 的乘,另外两个矩阵就比较密集,并且 B 矩阵的列能够解释为一个事物的几个方面。<br /> 4. 用户 k 对对象 h 的喜爱程度就能够经过矩阵 B 的 k 行乘 矩阵 C 的 h 列获得
使用场景 当用户和推荐的对象自己属性数据没有,只存在用户和推荐对象历史交互数据的时候,当提炼出用户推荐对象的关系矩阵能够发现是一个大型的稀疏矩阵
算法优缺点 优势: 1. 此算法可伸缩 2. 速度很快 3. 适合大数据 4.新异兴趣发现、不须要领域知识 5. 随着时间推移性能提升 6. 推荐个性化、自动化程度高 7. 能处理复杂的非结构化对象 <br /> 缺点: 1. 稀疏问题 2. 可扩展性问题 3. 新用户问题 4. 质量取决于历史数据集 5. 系统开始时推荐质量差
参考资料 1. 算法原理 Large-scale Parallel Collaborative Filtering for the Netflix Prize <br /> 2. MLlib实现 MLlib - Collaborative Filtering

3. 数据准备

首先将样例数据上传到HDFS,若是想要在本地测试这些功能的话,须要内存数量至少 6g, 固然能够经过减小数据量来达到通用的测试。 下面给出完整的代码,注释已经说明每段代码的含义:git

package clebeg.spark.action

import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}

/**
 * Spark 数据挖掘实战 案例音乐推荐
 * 注意:ALS 限制每个用户产品对必须有一个 ID,并且这个 ID 必须小于 Integer.MAX_VALUE
 * Created by clebeg.xie on 2015/10/29.
 */
object MusicRecommend {
  val rootDir = "F:\\clebeg\\spark\\datas\\profiledata_06-May-2005\\";
  //本地测试
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkInAction").setMaster("local[1]")
    val sc = new SparkContext(conf)
    val rawUserArtistData = sc.textFile(rootDir + "user_artist_data.txt")
    //检查数据集是否超过最大值,对于计算密集型算法,原始数据集最好多分块
    //println(rawUserArtistData.first())
    //println(rawUserArtistData.map(_.split(' ')(0).toDouble).stats())
    //println(rawUserArtistData.map(_.split(' ')(1).toDouble).stats())
    //艺术家ID和名字对应
    val artistById = artistByIdFunc(sc)
    //艺术家名字重复
    val aliasArtist = artistsAlias(sc)

    aslModelTest(sc, aliasArtist, rawUserArtistData, artistById)
    //查看一下 2093760 这个用户真正听的歌曲
    val existingProducts = rawUserArtistData.map(_.split(' ')).filter {
      case Array(userId, _, _) => userId.toInt == 2093760
    }.map{
      case Array(_, artistId, _) => {
        aliasArtist.getOrElse(artistId.toInt, artistId.toInt)
      }
    }.collect().toSet

    artistById.filter {
      line => line match {
        case Some((id, name)) => existingProducts.contains(id)
        case None => false
      }

    }.collect().foreach(println)

  }

  /**
   * 获取艺术家名字和ID的对应关系
   * 有些艺术家名字和ID没有按 \t 分割,错误处理就是放弃这些数据
   * @param sc
   * @return
   */
  def artistByIdFunc(sc: SparkContext): RDD[Option[(Int, String)]] = {
    val rawArtistData = sc.textFile(rootDir + "artist_data.txt")
    val artistByID = rawArtistData.map {
      line =>
        //span 碰到第一个不知足条件的开始划分, 少许的行转换不成功, 数据质量问题
        val (id, name) = line.span(_ != '\t')
        if (name.isEmpty) {
          None
        } else {
          try {
            //特别注意Some None缺失值处理的方式,Scala 中很是给力的一种方法
            Some((id.toInt, name.trim))
          } catch {
            case e: NumberFormatException =>
              None
          }
        }
    }
    artistByID
  }

  /**
   * 经过文件 artist_alias.txt 获得全部艺术家的别名
   * 文件不大,每一行按照 \t 分割包含一个拼错的名字ID 还有一个正确的名字ID
   * 一些行没有第一个拼错的名字ID,直接跳过
   * @param sc Spark上下文
   * @return
   */
  def artistsAlias(sc: SparkContext) = {
    val rawArtistAlias = sc.textFile(rootDir + "artist_alias.txt")
    val artistAlias = rawArtistAlias.flatMap { line =>
      val tokens = line.split('\t')
      if (tokens(0).isEmpty) {
        None
      } else {
        Some((tokens(0).toInt, tokens(1).toInt))
      }
    }.collectAsMap()
    artistAlias
  }

  def aslModelTest(sc: SparkContext,
                   aliasArtist: scala.collection.Map[Int, Int],
                   rawUserArtistData: RDD[String],
                   artistById: RDD[Option[(Int, String)]] ) = {
    //将对应关系广播出去,由于这个数据量不大,Spark广播变量相似于 hive 的 mapjoin
    val bArtistAlias = sc.broadcast(aliasArtist)
    //转换重复的艺术家的ID为同一个ID,而后将
    val trainData = rawUserArtistData.map{
      line =>
        val Array(userId, artistId, count) = line.split(' ').map(_.toInt)
        val finalArtistID = bArtistAlias.value.getOrElse(artistId, artistId)
        Rating(userId, finalArtistID, count)
    }.cache()
    //模型训练
    val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)

    //模型创建以后,为某个用户给出一个具体的推荐列表
    val recommendations = model.recommendProducts(2093760, 5) //为ID为2093760的用户推荐5个产品
    recommendations.foreach(println)
    val recommendedProductIDs = recommendations.map(_.product).toSet
    //输出推荐的艺术家的名字
    artistById.filter {
      line => line match {
        case Some((id, name)) => recommendedProductIDs.contains(id)
        case None => false
      }
    }.collect().foreach(println)
  }
}

代码中模型训练好以后,预测了用户 2093760 的推荐结果,我测试结果以下,因为里面代码使用了随机生成初始矩阵,每一个人的结果都有可能不同。github

Some((2814,50 Cent))
Some((829,Nas))
Some((1003249,Ludacris))
Some((1001819,2Pac))
Some((1300642,The Game))

代码中也给出了该用户之前听过的艺术家的名字以下:算法

Some((1180,David Gray))
Some((378,Blackalicious))
Some((813,Jurassic 5))
Some((1255340,The Saw Doctors))
Some((942,Xzibit))

模型评价

auc评价方法apache

def areaUnderCurve(
      positiveData: RDD[Rating],
      bAllItemIDs: Broadcast[Array[Int]],
      predictFunction: (RDD[(Int,Int)] => RDD[Rating])) = {
    // What this actually computes is AUC, per user. The result is actually something
    // that might be called "mean AUC".

    // Take held-out data as the "positive", and map to tuples
    val positiveUserProducts = positiveData.map(r => (r.user, r.product))
    // Make predictions for each of them, including a numeric score, and gather by user
    val positivePredictions = predictFunction(positiveUserProducts).groupBy(_.user)

    // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    // small AUC problems, and it would be inefficient, when a direct computation is available.

    // Create a set of "negative" products for each user. These are randomly chosen
    // from among all of the other items, excluding those that are "positive" for the user.
    val negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions {
      // mapPartitions operates on many (user,positive-items) pairs at once
      userIDAndPosItemIDs => {
        // Init an RNG and the item IDs set once for partition
        val random = new Random()
        val allItemIDs = bAllItemIDs.value
        userIDAndPosItemIDs.map { case (userID, posItemIDs) =>
          val posItemIDSet = posItemIDs.toSet
          val negative = new ArrayBuffer[Int]()
          var i = 0
          // Keep about as many negative examples per user as positive.
          // Duplicates are OK
          while (i < allItemIDs.size && negative.size < posItemIDSet.size) {
            val itemID = allItemIDs(random.nextInt(allItemIDs.size))
            if (!posItemIDSet.contains(itemID)) {
              negative += itemID
            }
            i += 1
          }
          // Result is a collection of (user,negative-item) tuples
          negative.map(itemID => (userID, itemID))
        }
      }
    }.flatMap(t => t)
    // flatMap breaks the collections above down into one big set of tuples

    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeUserProducts).groupBy(_.user)

    // Join positive and negative by user
    positivePredictions.join(negativePredictions).values.map {
      case (positiveRatings, negativeRatings) =>
        // AUC may be viewed as the probability that a random positive item scores
        // higher than a random negative one. Here the proportion of all positive-negative
        // pairs that are correctly ranked is computed. The result is equal to the AUC metric.
        var correct = 0L
        var total = 0L
        // For each pairing,
        for (positive <- positiveRatings;
             negative <- negativeRatings) {
          // Count the correctly-ranked pairs
          if (positive.rating > negative.rating) {
            correct += 1
          }
          total += 1
        }
        // Return AUC: fraction of pairs ranked correctly
        correct.toDouble / total
    }.mean() // Return mean AUC over users
  }

参数选择

能够经过调整参数看 auc 的结果来反复选择网络

参考资料

Advanced Analytics with Spark
二分类器评价dom

相关文章
相关标签/搜索