咱们这边是要使用Spark去并行一个天然语言处理的算法,其中使用到了LDA主题模型。因为使用的是天河二号,Spark版本是1.5.1,pyspark一样,因此获取主题时还不能使用describeTopics(在spark1.6中才开放对python的接口),只能使用topicsMatrix的方法。java
原本凑合用topicsMatrix也行,但咱们发现,这一个用来获取主题模型的函数,竟然比Lda的训练还要慢!不管在咱们本身的集群仍是在天河二号的分区上,都是这一个状况。观察topicsMatrix的源代码,好像也没有什么复杂操做,只是把数据汇总collect而已:python
@Since("1.3.0") override lazy val topicsMatrix: Matrix = { // Collect row-major topics val termTopicCounts: Array[(Int, TopicCounts)] = graph.vertices.filter(_._1 < 0).map { case (termIndex, cnts) => (index2term(termIndex), cnts)}.collect() // Convert to Matrix val brzTopics = BDM.zeros[Double](vocabSize, k) termTopicCounts.foreach { case (term, cnts) => var j = 0 while (j < k) { brzTopics(term, j) = cnts(j) j += 1 } } Matrices.fromBreeze(brzTopics) }
因为并非算法中有一些复杂运算致使较慢,咱们天然不但愿在程序中有这样的状况。发现到在Spark1.5.1中,mllib中LdaModel已经实现了describeTopics,只是未在Python中开放,咱们天然但愿尝试使用describeTopics看看效果。算法
已知LDA.train()返回的是LdaModel的实例,因而乎,参考上篇博客,用如下方式去调用:编程
model = LDA.train(rdd_data, k=num_topics, maxIterations=20) topics = model.call('describeTopics', _py2java(sc, 10))
执行速度特别快,然而返回的结果却不尽如人意,仅返回了一个长度k的列表,每一个元素是一个key为'class',value为'scala.Tuple2'的单元素字典。从结果来看,scala的代码应该是被成功执行了,然而返回结果却出了问题。查看callJavaFunc的内容,能够判断出,是describeTopics的返回结果没有被_java2py函数正常的转换。bash
比对Spark1.5和Spark1.6的代码,LdaModel.describeTopics函数的内容是一致的,那么问题在哪儿呢?再去查看pyspark的LDA.train()调用的PythonMLLibAPI.trainLdaModel,发如今1.6中返回的再也不是LdaModel而是它的子类LdaModelWrapper。查看这个类的方法,发现它重载了describeTopics来方便_java2py进行数据转换:app
private[python] class LDAModelWrapper(model: LDAModel) { def topicsMatrix(): Matrix = model.topicsMatrix def vocabSize(): Int = model.vocabSize def describeTopics(): Array[Byte] = describeTopics(this.model.vocabSize) def describeTopics(maxTermsPerTopic: Int): Array[Byte] = { val topics = model.describeTopics(maxTermsPerTopic).map { case (terms, termWeights) => val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava Array[Any](jTerms, jTermWeights) } SerDe.dumps(JavaConverters.seqAsJavaListConverter(topics).asJava) } def save(sc: SparkContext, path: String): Unit = model.save(sc, path) }
找到这里,解决方法就油然而生了。只要咱们把这一段scala代码在python中调用,并将describeTopics的Java对象传入,不就万事大吉了吗?jvm
也许还有别的方法,不过这里使用的方法也足够简单。将.scala文件打包成jar后,启动spark时加入参数--driver-class-path /path/to/xxx.jar,即可以将你的scala代码放入Spark运行的虚拟机JVM中,从而让python代码在运行中经过反射机制在SparkContext._jvm里动态获取到你的类与方法:ide
func = sc._jvm.com.example.YourObject.func
那么,如今的问题就是如何把scala代码打包成jar了。scala虽然也是基于JVM运行的语言,与java很是类似,可是其编译选项中并无提供将其打包成jar的参数。这里咱们用sbt打包它,sbt的下载与安装请自行查阅其余教程,这里就不提供了,官方网站。函数
首先编写好你的scala代码,确认没有bug,并在文件开头用package关键字将其封装至包中。接着,请手动创建你的项目目录,并建立以下结构:网站
在build.sbt中,请至少进行如下设置
//项目名 name := "Project" //项目版本 version := "0.1" //scala版本 scalaVersion := "2.10.5" //jdk版本 javacOptions ++= Seq("-source", "1.7", "-target", "1.7") //主函数 mainClass in Compile := Some("YourClass.func")
在plugins.sbt中,请加上这一句话,告诉sbt须要这个第三方插件,这是用来打包的
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
这些都准备完成后,在terminal里进入你的项目根目录下,输入
sbt package
等待打包完成,会有相应提示。
更多的打包选项,以及sbt的更多用法,感兴趣能够自行查阅。
回到咱们这里的问题,咱们但愿能在python中对describeTopics的返回值进行转换,那么我么只须要打包那一个重载的describeTopics就行了,这样能够避免打包Spark的第三方包。更改一下函数的返回值,并注释掉调用Spark的SerDe进行序列化的语句,最终的代码以下:
package com.sysu.sparkhelper import java.util.List import scala.collection.JavaConverters object LdaHelper { def convert(topics: Array[(Array[Int], Array[Double])]): List[Array[Any]] = { val result = topics.map { case (terms, termWeights) => val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava Array[Any](jTerms, jTermWeights) } return JavaConverters.seqAsJavaListConverter(result).asJava // SerDe.dumps(JavaConverters.seqAsJavaListConverter(result).asJava) } }
用sbt打包完成后,使用--driver-class-path添加jar包,在python中相应代码为:
lda_java_model = model._java_model func = getattr(model._java_model, 'describeTopics') result = func(_py2java(sc, 10)) topics = _java2py(sc, sc._jvm.com.sysu.sparkhelper.LdaHelper.convert(result))
这算是阅读源码的一次应用,能够说仍是解决了遇到的问题,同时也加深了对Spark的了解。原本作并行化就是但愿效率更高,pyspark却在调用scala代码,同时进行了不少数据转换。想要更好的使用Spark的话,使用scala去编程应该才是最好的。