前一篇:Spark数据挖掘-基于 LSA 隐层语义分析理解APP描述信息(1)apache
Spark 经过调用 RowMatrix 的 computeSVD 方法会获得三个重要的矩阵 U、S、V , 并且:原始矩阵 近似等于 U * S * V
它们含义分别以下:微信
经过这个文档,首先想到的是文档中最重要的概念是什么?概念每每对应话题,这样基本就能肯定文档的主题了,而后每一个主题经过V矩阵能够获得重要的词,这样就能够给文档添加标签了,可是其实能够走的更远,本文将重点研究如何使用这两个矩阵,这里的用途很容易推广到LDA模型,LDA 模型获得 phi(词与topic关系矩阵) 和 theta(文档与topic的关系矩阵) 两个矩阵以后也能够干这些事。接下来主要尝试回答下面三个问题:机器学习
其实从最原始的词文档矩阵能够获得上面这些问题粗浅的答案:好比词与词的重要程度能够计算词文档矩阵中对应列之间的余弦类似性。余弦类似度计算的是高维文档空间中两个点之间向量的夹角,越相同方向的点认为类似度越高。余弦类似度计算的就是两个向量的点积除以两个向量长度的乘积。通用的计算行之间的余弦距离就获得了文档与文档的类似度。而词与文档的重要程度就对应该矩阵中这两个对象交叉的位置。
然而这些重要性得分都是很粗浅的,由于这些都是来自于对文档中词简单计数统计,根本没有考虑词之间的语义关系等。分布式
LSA 提供更深刻的理解语料库的得分矩阵。基于这些矩阵能够获得更加有深度的结论。例如:一些文件只出现“新闻”,可是不出现“资讯”,而另一下文章恰好相反,可是它们可能会经过“阅读”这个词联系在一块儿。
LSA 这种表示的方式从效率的角度来看也更有优点。LSA 将重要的信息压缩到低维空间来代替原始的词文档矩阵。这样使得不少计算更加快。由于计算原始词文档矩阵中词与其余词的重要程度须要的时间复杂度正比于单词的数量乘以文档的数量。LSA 能够经过将概念空间表示映射到词空间达到通用的效果,时间复杂度正比于单词的数量乘以概念的个数 K。数据之间的相关性经过这种低秩近似从新编码从而使得不须要访问整个语料库。学习
LSA 是如何理解词与词之间的距离的呢?其实经过将 SVD 奇异值分解获得的三个矩阵相乘就会获得原始矩阵的一个近似,那么这个近似矩阵列之间的余弦距离就是原始的词距离的一个近似,只不过如今有下面的三点优化:大数据
这样就会使得词之间的距离更加合理。幸运的是不须要从新将三个因子矩阵相乘再去计算词之间的类似性,线性代数已经证实:相乘以后的矩阵列与列之间的余弦就等于 St(V):(t 表示转置)这个矩阵对应列之间的余弦。
考虑给定一个词计算最相关的特定词这个任务:因为S是对角矩阵,那么S的转置就等于S,那么S*t(V)的列就变为了VS的行。经过对VS的每一行长度归一化,而后将VS乘以给定词对应的列转变的列向量就获得了这个词与每一个词之间的余弦。具体代码以下:优化
import breeze.linalg.{DenseVector => BDenseVector} import breeze.linalg.{DenseMatrix => BDenseMatrix} def topTermsForTerm( normalizedVS: BDenseMatrix[Double], termId: Int): Seq[(Double, Int)] = { //获得termId对应的行 val rowVec = new BDenseVector[Double]( row(normalizedVS, termId).toArray) //将VS归一化的矩阵乘上面对应的行,获得该term与每一个单词的余弦距离 val termScores = (normalizedVS * rowVec).toArray.zipWithIndex termScores.sortBy(-_._1).take(10) } //计算 VS val VS = multiplyByDiagonalMatrix(svd.V, svd.s) //将 VS 的行归一化 val normalizedVS = rowsNormalized(VS) def printRelevantTerms(term: String) { val id = idTerms(term) printIdWeights(topTermsForTerm(normalizedVS, id, termIds) }
利用上面的代码查询了和“银行”相关的词,结果以下:编码
银行:1.0000000000000007 农商:0.5731472845623417 浦发:0.5582996267582955 灵犀:0.5546113928156614 乌海:0.5181220508630512 邮政:0.49403542009285284 花旗:0.4767076670441433 渣打:0.4646580481689233 通畅:0.46282711600593196 缝隙:0.4500830196782121
语料库不足是致使效果通常的最大问题。spa
文档与文档的相关性与词与词之间的相关性思路彻底同样,只不过此次用的矩阵是U,U是分布式存储的,因此代码有点不一样:.net
import org.apache.spark.mllib.linalg.Matrices def topDocsForDoc(normalizedUS: RowMatrix, docId: Long) : Seq[(Double, Long)] = { val docRowArr = row(normalizedUS, docId) val docRowVec = Matrices.dense(docRowArr.length, 1, docRowArr) val docScores = normalizedUS.multiply(docRowVec) val allDocWeights = docScores.rows.map(_.toArray(0)). zipWithUniqueId() allDocWeights.filter(!_._1.isNaN).top(10) } val US = multiplyByDiagonalMatrix(svd.U, svd.s) val normalizedUS = rowsNormalized(US) def printRelevantDocs(doc: String) { val id = idDocs(doc) printIdWeights(topDocsForDoc(normalizedUS, id, docIds) }
一样的道理,词文档之间的相关性也是经过 USV 这个矩阵中的每一个位置的元素去近似的,好比词 t 与文档 d 的关系就是: U(d) * S * V(t),根据线性代数的基本理论,能够很容易获得词 t 与全部文档的关系为:U * S * V(t) 或者文档 d 与全部词的关系为:U(d) * S * V。这样就很容易知道与某个文档最相关的前几个词,以及与某个词最相关的文档。具体的应用代码以下:
def topDocsForTerm(US: RowMatrix, V: Matrix, termId: Int) : Seq[(Double, Long)] = { //获得词对应的行 val rowArr = row(V, termId).toArray //将改行转为列向量 val rowVec = Matrices.dense(termRowArr.length, 1, termRowArr) //计算 US 乘以列向量 val docScores = US.multiply(termRowVec) //获得全部文档与词之间的关系的得分 val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId() //选择最重要的10篇文档 allDocWeights.top(10) } //打印结果 def printRelevantDocs(term: String) { val id = idTerms(term) printIdWeights(topDocsForTerm(normalizedUS, svd.V, id, docIds) }
查询一个词,至关于上面的词与文档的关系,其实就是将 V 转置以后乘以一个长度为词向量长并且只有一个元素为 1 的列向量,值为 1 的位置对应的就是该词的位置,而多个词,那么就经过乘以一个长度为词向量长并且对应查询词位置都是该词的idf权重其余位置为0的列向量便可。具体代码以下:
import breeze.linalg.{SparseVector => BSparseVector} //获得查询词对应位置为idf权重,其余位置为0的向量 def termsToQueryVector( terms: Seq[String], idTerms: Map[String, Int], idfs: Map[String, Double]): BSparseVector[Double] = { //先获得查询词在整个词向量的下标索引位置 val indices = terms.map(idTerms(_)).toArray //将对应位置的idf权重找出来 val values = terms.map(idfs(_)).toArray //转为向量 长度为词向量长度,不少位置的值为零,查询词位置值为 idf 权重 new BSparseVector[Double](indices, values, idTerms.size) } //获得 US*t(t(V)*上面方法获得的向量) def topDocsForTermQuery( US: RowMatrix, V: Matrix, query: BSparseVector[Double]): Seq[(Double, Long)] = { val breezeV = new BDenseMatrix[Double](V.numRows, V.numCols, V.toArray) //计算 t(V)*上面方法获得的向量 val termRowArr = (breezeV.t * query).toArray //获得 t(t(V)*上面方法获得的向量) val termRowVec = Matrices.dense(termRowArr.length, 1, termRowArr) //计算 US*t(t(V)*上面方法获得的向量) val docScores = US.multiply(termRowVec) val allDocWeights = docScores.rows.map(_.toArray(0)). zipWithUniqueId() allDocWeights.top(10) } def printRelevantDocs(terms: Seq[String]) { val queryVec = termsToQueryVector(terms, idTerms, idfs) printIdWeights(topDocsForTermQuery(US, svd.V, queryVec), docIds) }
上面代码中用到一些辅助的方法,由于比较简单就不详细分析,这里简单作一个汇总:
def row(normalizedVS: DenseMatrix[Double], termId: Int) = { (0 until normalizedVS.cols).map(i => normalizedVS(termId, i)) } def multiplyByDiagonalMatrix(mat: Matrix, s: Vector) = { val sArr = s.toArray new BDenseMatrix[Double](mat.numRows, mat.numCols, mat.toArray) .mapPairs{case ((r, c), v) => v * sArr(c)} } def rowsNormalized(bm: BDenseMatrix[Double]) = { val newMat = new BDenseMatrix[Double](bm.rows, bm.cols) for (r <- 0 until bm.rows) { val len = math.sqrt((0 until bm.cols).map{c => math.pow(bm(r, c), 2)}.sum) (0 until bm.cols).foreach{c => newMat.update(r, c, bm(r, c)/len)} } newMat }
欢迎关注本人微信公众号,会定时发送关于大数据、机器学习、Java、Linux 等技术的学习文章,并且是一个系列一个系列的发布,无任何广告,纯属我的兴趣。