在提取文本特征时,常常用到TF-IDF算法。Spark Mlib实现了该算法。下面是Spark Mlib中,TF_IDF算法调用的一个实例:html
def main(args:Array[String]){ val sc: SparkContext = null // Load documents (one per line). val documents: RDD[Seq[String]] = sc.textFile("...").map(_.split(" ").toSeq) val hashingTF = new HashingTF() //计算tf val tf: RDD[Vector] = hashingTF.transform(documents) tf.cache() //获得idfModel对象 val idf = new IDF().fit(tf) //获得tf-idf值 val tfidf: RDD[Vector] = idf.transform(tf)
要求输入数据 必须是一行一篇文章(切过词的),Spark Mlib中没有提供切词的工具,但给出了建议使用的切词工具 Stanford NLP Group and scalanlp/chalkgit
一、TF源码详读github
在调用的代码中,咱们找到算法
val hashingTF = new HashingTF() //计算tf val tf: RDD[Vector] = hashingTF.transform(documents)
获取TF,主要是经过HashingTF类的 transform方法,跟踪该方法apache
/** * Transforms the input document to term frequency vectors. */ @Since("1.1.0") def transform[D <: Iterable[_]](dataset: RDD[D]): RDD[Vector] = { dataset.map(this.transform) }
SparkMlib是基于RDD的,因此在看源码前,必需要对RDD熟悉。再看 dataset.map(this.transform)中的transform方法:api
/** * Transforms the input document into a sparse term frequency vector. */ @Since("1.1.0") def transform(document: Iterable[_]): Vector = { //定义词频的map val termFrequencies = mutable.HashMap.empty[Int, Double] //循环每篇文章里的每一个词 document.foreach { term => //获取词项term对应的向量位置 val i = indexOf(term) //i即表明这个词,统计次数放入termFrequencies termFrequencies.put(i, termFrequencies.getOrElse(i, 0.0) + 1.0) } //将词特征映射到一个很大维度的向量中去 稀疏向量 numFeatures是类HashingTF的成员变量 能够在调用HashingTF传入,若是没有传入,默认为2的20次方 Vectors.sparse(numFeatures, termFrequencies.toSeq) }
transform方法对每一行(即每篇文章)都会执行一次,主要是计算每篇文章里的词的词频,转存入一个维度很大的稀疏向量中,每一个词在该向量中对应的位置就是:工具
@Since("1.1.0") def indexOf(term: Any): Int = Utils.nonNegativeMod(term.##, numFeatures)
term.##至关于hashcode(),获得每一个词的hash值,而后对numFeatures 取模,是个Int型的值this
到此为止,TF就计算完了,最终的结果是一个存放词的位置,以及该词对应词频的 向量,即SparseVector(size, indices, values)spa
二、IDF源码详读 scala
//获得idfModel对象 输入的tf类型是SparseVector(size, indices, values) val idf = new IDF().fit(tf) //获得tf-idf值 val tfidf: RDD[Vector] = idf.transform(tf)
IDF实现主要经过两步:
第一步: val idf = new IDF().fit(tf)
/** * Computes the inverse document frequency. * @param dataset an RDD of term frequency vectors */ @Since("1.1.0") def fit(dataset: RDD[Vector]): IDFModel = { //返回 IDF向量 类型是DenseVector(values) val idf = dataset.treeAggregate(new IDF.DocumentFrequencyAggregator( minDocFreq = minDocFreq))(///minDocFreq是词最小出现频率,不填是默认0 seqOp = (df,v) => df.add(v),//计算 combOp = (df1, df2) => df1.merge(df2)//合并 ).idf() new IDFModel(idf) }
上面treeAggregate方法原型是def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) =>U, depth: Int = 2): U
treeAggregate是使用mapPartition进行计算的,需定义两个操做符,一个用来计算,一个用来合并结果
seqOp 用来计算分区结果的操做符 (an operator used to accumulate results within a partition)
combOp 用来组合来自不一样分区结果的关联操做符( an associative operator used to combine results from different partitions)
该方法的调用返回new IDF.DocumentFrequencyAggregator对象,接着又调用DocumentFrequencyAggregator的idf方法,返回idf向量,而后又经过new IDFModel(idf)返回IDFModel对象
下面是 DocumentFrequencyAggregator 类的方法,即一个add(seqOp)一个merge(combOp)
private object IDF { /** Document frequency aggregator. */ class DocumentFrequencyAggregator(val minDocFreq: Int) extends Serializable { /** number of documents 文档总数量*/ private var m = 0L /** document frequency vector df向量,词在出现过的文档个数*/ private var df: BDV[Long] = _ def this() = this(0) //构造方法,若是minDocFreq没有传入的话,默认值为0 /** Adds a new document. 这个地方就是执行的每一个分区里的计算操做 ,输入是tf向量*/ def add(doc: Vector): this.type = { if (isEmpty) { df = BDV.zeros(doc.size) } doc match { //tf向量是 SparseVector 因此会走这个case case SparseVector(size, indices, values) => val nnz = indices.size var k = 0 while (k < nnz) { if (values(k) > 0) { df(indices(k)) += 1L //若是词在文章中出的频率大于0,则该词的df+1 } k += 1 } case DenseVector(values) => val n = values.size var j = 0 while (j < n) { if (values(j) > 0.0) { df(j) += 1L } j += 1 } case other => throw new UnsupportedOperationException( s"Only sparse and dense vectors are supported but got ${other.getClass}.") } m += 1L this } /** Merges another. 这个地方就是执行全部分区的合并操做*/ def merge(other: DocumentFrequencyAggregator): this.type = { if (!other.isEmpty) { m += other.m //总文档数合并 if (df == null) { df = other.df.copy } else { df += other.df //df向量合并 } } this } private def isEmpty: Boolean = m == 0L /** Returns the current IDF vector. 计算idf向量的方法 */ def idf(): Vector = { if (isEmpty) { throw new IllegalStateException("Haven't seen any document yet.") } val n = df.length val inv = new Array[Double](n) var j = 0 while (j < n) { /* * If the term is not present in the minimum * number of documents, set IDF to 0. This * will cause multiplication in IDFModel to * set TF-IDF to 0. * * Since arrays are initialized to 0 by default, * we just omit changing those entries. */ if (df(j) >= minDocFreq) { //若是df大于设定的值,就计算idf的值,若是不大于的话,就直接设置为0 inv(j) = math.log((m + 1.0) / (df(j) + 1.0)) } j += 1 } Vectors.dense(inv) //返回idf 密集向量 } } }
第二步:经过上面的计算获得idf向量,剩下的工做就是计算 tf*idf了,会用到IDFMode类中的transform方法 val tfidf: RDD[Vector] = idf.transform(tf)
private object IDFModel { /** * Transforms a term frequency (TF) vector to a TF-IDF vector with a IDF vector * * @param idf an IDF vector * @param v a term frequence vector * @return a TF-IDF vector */ def transform(idf: Vector, v: Vector): Vector = { val n = v.size v match { //会进入这个case case SparseVector(size, indices, values) => val nnz = indices.size val newValues = new Array[Double](nnz) var k = 0 while (k < nnz) { newValues(k) = values(k) * idf(indices(k)) //计算tf*idf k += 1 } Vectors.sparse(n, indices, newValues) //TFIDF向量 case DenseVector(values) => val newValues = new Array[Double](n) var j = 0 while (j < n) { newValues(j) = values(j) * idf(j) j += 1 } Vectors.dense(newValues) case other => throw new UnsupportedOperationException( s"Only sparse and dense vectors are supported but got ${other.getClass}.") } } }
以上就是整个TFIDF的计算过程,用到Spark Mlib 的密集向量(DenseVector)和稀疏向量(SparseVector) 、RDD的聚合操做
主要相关的类有三个:HashingTF 、IDF、IDFModel
还有就是利用spark Mlib 的TFIDF生成的TFIDF向量,位置信息存是词hash后和向量维度取模后的值,而不是该词,在后面作一些分类,或者文本推荐的时候,若是须要用到词自己,还须要作调整