翻译自:Why Apache Spark is a Crossover Hit for Data Scientists,有删减。java
Spark是一个超有潜力的通用数据计算平台,不管是对统计科学家仍是数据工程师。程序员
数据科学是一个广阔的领域。我自认是一个数据科学家,但和另一批数据科学家又有不少的不一样。数据科学家一般分为统计科学家和数据工程师两个阵营,而我正处于第二阵营。
统计科学家使用交互式的统计工具(好比R)来回答数据中的问题,得到全景的认识。与之相比,数据工程师则更像一名程序员,他们在服务器上编写代码,建立和应用机器学习模型,熟悉C++和Java等系统级语言,常常须要和企业级数据中心的某些组件打交道,好比Hadoop。
而有的数据科学家专一于更细的领域,就像精通R但从未据说过Python或者scikit-learn(反之亦然),即使二者都提供了丰富的统计库。算法
若是能够提供一种统一的工具,运行在统一的架构,用统一的语言编程,并能够同时知足统计科学家和数据工程师的需求,那该多好啊。我一开始就精通Java,难道为了研究数据,我就必须去学一种像Python或R的语言?我一直使用传统的数据分析工具,难道为了应对大规模计算,就必须去懂MapReduce?正是统计工具的不完美造就了这种局面:sql
其余的数据科学工具同样没法尽善尽美。基于Java和Hadoop的背景,我开始幻想一个理想的数据科学利器:一个像R和Python的能实现RPEL(读取-估值-打印-循环)的自带统计库函数的命令行解释器,又具有自然的分布式可扩展的属性;拥有像Crunch同样的分布式集合,并且能经过命令行解释器调用。shell
这就是Spark让我兴奋的缘由。大部分人讨论到Spark时,老是注意到将数据驻留内存以提升计算效率的方面(相对MapReduce),但对我来讲这根本不是关键。Spark拥有许多的特征,使之真正成为一个融合统计科学和数据工程的交叉点:apache
Spark和MLib还有待完善:整个项目有很多bug,效率也还有提高的空间,和YARN的整合也存在问题。Spark还没办法提供像R那样丰富的数据分析函数。但Spark已然是世界上最好的数据平台,足以让来自任何背景的数据科学家侧目。编程
Stack Overflow是一个著名的软件技术问答平台,在上面提的每一个问题有可能被打上若干个短文本的标签,好比java
或者sql
,咱们的目标在于创建一套系统,使用ALS推荐算法,为新问题的标签提供预测和建议。从推荐系统的角度,你能够把问题想象成user
,把标签想象成item
。
首先,从Stack Overflow下载官方提供的截至20140120的问答数据stackoverflow.com-Posts.7z
。
这是一个可以直接用于分布式计算的bzip格式文件,但在咱们的场景下,必须先解压并拷贝到HDFS:ruby
bzcat stackoverflow.com-Posts.7z | hdfs dfs -put - /user/srowen/Posts.xml
解压后的文件大约是24.4GB,包含210万个问题,1800万个回答,总共标注了930万个标签,这些标签排重以后大概是34000个。
确认机器安装了Spark以后,输入spark-shell
便可打开Scala的REPL环境。首先,咱们读取一个存储在HDFS的Posts.xml
文件:服务器
scalaval postsXML = sc.textFile("hdfs:///user/srowen/Posts.xml")
这时命令行工具会返回:数据结构
postsXML: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12
显示文本文件已转化为一个String型的RDD,你能够经过调用RDD的函数,实现任意的查询运算。好比统计文件的行数:
scalapostsXML.count
这条指令生成大量的输出,显示Spark正在利用分布式的环境计数,最终打印出18066983
。
下一步,将XML文件的每一行都存入形如(questionID, tag)
的元组。得益于Scala的函数式编程的风格,RDD和Scala集合同样可使用map等方法:
scalaval postIDTags = postsXML.flatMap { line => // Matches Id="..." ... Tags="..." in line val idTagRegex = "Id=\"(\\d+)\".+Tags=\"([^\"]+)\"".r // // Finds tags like <TAG> value from above val tagRegex = "<([^&]+)>".r // Yields 0 or 1 matches: idTagRegex.findFirstMatchIn(line) match { // No match -- not a line case None => None // Match, and can extract ID and tags from m case Some(m) => { val postID = m.group(1).toInt val tagsString = m.group(2) // Pick out just TAG matching group val tags = tagRegex.findAllMatchIn(tagsString).map(_.group(1)).toList // Keep only question with at least 4 tags, and map to (post,tag) tuples if (tags.size >= 4) tags.map((postID,_)) else None } } // Because of flatMap, individual lists will concatenate // into one collection of tuples }
你会发现这条指令的执行是当即返回的,而不像count
同样须要等待,由于到目前为止,Spark并未启动任何主机间的数据变换。
ALS的MLib实现必须使用数值ID而非字符串做为唯一标识,而问题的标签数据是字符串格式的,因此须要把字符串哈希成一个非负整数,同时保留非负整数到字符串的映射。这里咱们先定义一个哈希函数以便复用。
scaladef nnHash(tag: String) = tag.hashCode & 0x7FFFFF var tagHashes = postIDTags.map(_._2).distinct.map(tag =>(nnHash(tag),tag))
如今把元组转换为ALS计算所需的输入:
scalaimport org.apache.spark.mllib.recommendation._ // Convert to Rating(Int,Int,Double) objects val alsInput = postIDTags.map(t => Rating(t._1, nnHash(t._2), 1.0)) // Train model with 40 features, 10 iterations of ALS val model = ALS.trainImplicit(alsInput, 40, 10)
这一步生成特征矩阵,能够被用来预测问题与标签之间的关联。因为目前MLib还处于不完善的状态,没有提供一个recommend的接口来获取建议的标签,咱们能够简单定义一个:
scaladef recommend(questionID: Int, howMany: Int = 5): Array[(String, Double)] = { // Build list of one question and all items and predict value for all of them val predictions = model.predict(tagHashes.map(t => (questionID,t._1))) // Get top howMany recommendations ordered by prediction value val topN = predictions.top(howMany)(Ordering.by[Rating,Double](_.rating)) // Translate back to tags from IDs topN.map(r => (tagHashes.lookup(r.product)(0), r.rating)) }
经过上述函数,咱们能够得到任意一个问题好比ID为7122697的How to make substring-matching query work fast on a large table?
的至少4个标签:
scalarecommend(7122697).foreach(println)
推荐结果以下所示:
scala(sql,0.17745152481166354) (database,0.13526622226672633) (oracle,0.1079428707621154) (ruby-on-rails,0.06067207312463499) (postgresql,0.050933613169706474)
注意:
- 每次运行获得的结果不尽相同,是由于ALS是从随机解开始迭代的
- 若是你但愿得到实时性更高的结果,能够在recommend
前输入tagHashes = tagHashes.cache
真实的问题标签是postgresql
、query-optimization
、substring
和text-search
。不过,预测结果也有必定的合理性(postgresql
常常和ruby-on-rails
一块儿出现)。
固然,以上的示例还不够优雅和高效,可是,我但愿全部来自R的分析师、鼓捣Python的黑客和熟悉Hadoop的开发者,都能从中找到大家熟悉的部分,从而找到一条适合大家的路径去探索Spark,并从中获益。
来自:建造者说