SparkMllib主题模型案例讲解

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

一  本文涉及到的算法正则表达式

1, LDA主题模型算法

符号定义apache

  • 文档集合D,m篇,topic集合T,k个主题分布式

  • D中每一个文档d看做一个单词序列< w1,w2,...,wn >,wi表示第i个单词,设d有n个单词。(LDA里面称之为word bag,实际上每一个单词的出现位置对LDA算法无影响)ide

  • D中涉及的全部不一样单词组成一个大集合VOCABULARY(简称VOC)测试

LDA符合的分布优化

  • 每篇文章d(长度为)都有各自的主题分布,主题分布式多项分布,该多项分布的参数服从Dirichlet分布,该Dirichlet分布的参数为α。spa

  • 每一个主题都有各自的词分布,词分布为多项分布,该多项分布的参数服从Dirichlet分布,该Dirichlet分布的参数为β;3d

  • 对于谋篇文章中的第n个词,首先从该文章的主题分布中采样一个主题,而后在这个主题对应的词分布中采样一个词。不断重复这个随机生成过程,直到m篇文章所有完成过程。orm

结果是但愿训练出两个结果向量(k个topic,VOC中共包含m个词)

LDA以文档集合D做为输入(会有分词,去掉停用词,取词干等预处理):

  • 对每一个D中的文档d,对应到不一样topic的几率θd < pt1,..., ptk >,其中,pti表示d对应T中第i个topic的几率。计算方法是直观的,pti=nti/n,其中nti表示d中对应第i个topic的词的数目,n是d中全部词的总数。

  • 对每一个T中的topic t,生成不一样单词的几率φt < pw1,..., pwm >,其中,pwi表示t生成VOC中第i个单词的几率。计算方法一样很直观,pwi=Nwi/N,其中Nwi表示对应到topic t的VOC中第i个单词的数目,N表示全部对应到topic t的单词总数。

LDA的核心公式以下:

p(w|d) = p(w|t)*p(t|d)

直观的看这个公式,就是以Topic做为中间层,能够经过当前的θd和φt给出了文档d中出现单词w的几率。其中p(t|d)利用θd计算获得,p(w|t)利用φt计算获得。

2, RegexTokenizer

RegexTokenizer容许基于正则的方式进行文档切分红单词组。默认状况下,使用参数“pattern”( regex, default: "s+")做为分隔符来分割输入文本。或者,用户能够将参数“gaps”设置为false,指示正则表达式“pattern”表示“tokens”,而不是分割间隙,并查找全部匹配事件做为切分后的结果。

 

 

3, StopWordsRemover

stopwords简单来讲是指在一种语言中普遍使用的词。在各类须要处理文本的地方,咱们对这些中止词作出一些特殊处理,以方便咱们更关注在更重要的一些词上。

中止词的词表通常不须要本身制做,有不少可选项能够本身下载选用。

 

Spark中提供了StopWordsRemover类处理中止词,它能够用做Machine learning Pipeline的一部分。

 

StopWordsRemover的功能是直接移除全部停用词(stopword),全部从inputCol输入的量都会被它检查,而后再outputCol中,这些中止词都会去掉了。

4, CountVectorizer

CountVectorizer 和 CountVectorizerModel 旨在帮助将文本文档集合转化为频数向量。当先验词典不可用时,CountVectorizer能够用做Estimator提取词汇表,并生成一个CountVectorizerModel。该模型会基于该字典为文档生成稀疏矩阵,该稀疏矩阵能够传给其它算法,好比LDA,去作一些处理。

在拟合过程当中,CountVectorizer会从整个文档集合中进行词频统计并排序后的前vocabSize个单词。

一个可选参数minDF也会影响拟合过程,方法是指定词汇必须出现的文档的最小数量(或小于1.0)。另外一个可选的二进制切换参数控制输出向量。若是设置为true,则全部非零计数都设置为1.这对于模拟二进制计数而不是整数计数的离散几率模型特别有用。

 

二   数据

20个主题的数据,每篇文章一个文件,每一个主题100个文件。共两千个文件。

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

三  实现步骤

1, 导入数据

val corpus = sc.wholeTextFiles("file:///opt/datas/mini_newsgroups/*").map(_._2).map(_.toLowerCase())

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

2, 数据格式整理

val corpus_body = corpus.map(_.split("\n\n")).map(_.drop(1)).map(_.mkString(" "))

val corpus_df = corpus_body.zipWithIndex.toDF("corpus", "id")

 

import org.apache.spark.ml.feature.RegexTokenizer

val tokenizer = new RegexTokenizer().setPattern("[\W_]+").setMinTokenLength(4).setInputCol("corpus").setOutputCol("tokens")

val tokenized_df = tokenizer.transform(corpus_df)

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

3, 导入停用词

val stopwords = sc.textFile("file:///opt/datas/stop_words.txt").collect()

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

4, 去除停用词

import org.apache.spark.ml.feature.StopWordsRemover

 

// Set params for StopWordsRemover

val remover = new StopWordsRemover().setStopWords(stopwords).setInputCol("tokens").setOutputCol("filtered")

 

// Create new DF with Stopwords removed

val filtered_df = remover.transform(tokenized_df)

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

5, 生成词频向量

import org.apache.spark.ml.feature.CountVectorizer

 

// Set params for CountVectorizer

val vectorizer = new CountVectorizer().setInputCol("filtered").setOutputCol("features").setVocabSize(10000).setMinDF(5).fit(filtered_df)

 

val countVectors = vectorizer.transform(filtered_df).select("id", "features")

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

6, 构建LDA模型

import org.apache.spark.ml.clustering.LDA

 

val numTopics = 20

// Set LDA params

val lda = new LDA().setK(numTopics).setMaxIter(10)

 

7, 训练LDA模型

val model = lda.fit(countVectors )

8, 查看训练结果数据

val topicIndices = model.describeTopics(5)

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

9, 词典的使用

val vocabList = vectorizer.vocabulary

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

10,使用模型

val transformed = model.transform(dataset)

transformed.show(false)

五  可调整测试点

1, 增长stop-words

val add_stopwords = Array("article", "writes", "entry", "date", "udel", "said", "tell", "think", "know", "just", "newsgroup", "line", "like", "does", "going", "make", "thanks")

val new_stopwords = stopwords.union(add_stopwords)

2, 使用EM

用于估计LDA模型的优化器或推理算法,目前Spark支持两种:

online:Online Variational Bayes (默认)

em: Expectation-Maximization

能够经过调用setOptimizer(value: String),传入online或者em来使用。

 

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=