文本分类
spark
算法
spark2.0开始引入dataframe做为RDD的上层封装,以屏蔽RDD层次的复杂操做,本文使用spark milib中ml机器学习库进行新闻文本多分类预测,包含数据预预处理,分词,标签和特征向量化转换、多分类模型训练(包含朴素贝叶斯、逻辑回归、决策树和随机森林),分类预测和模型评估等完整的机器学习demo。本文分词方法选用HanLP分词工具包(文档丰富、算法公开、代码开源,而且经测试分词效果比较好)。apache
本文使用的数据为4类新闻,每条数据包含标签,标题,时间和新闻内容,以"\u00EF"符号做为分割符,数据格式以下:数组
首页|文化新闻ï第十一届全国优秀舞蹈节目展演将在武汉举办ï2016-07-05 19:25:00ï新华社北京7月5日电(记者周玮)由文化部、湖北省人民政府主办的...
首页|财经中心|财经频道ï新通教育收购杭州蓝海旅行社100%股权 发力出境游学市场ï2016-07-04 21:49:00ï杭州7月4日电(胡丰盛)7月4日,新通教育...
首页|军事新闻ï环太军演中国参演潜水分队开展潜水事故应急医学处置演练ï2016-07-04 19:40:00ï夏威夷7月4日电 (李纯 于超)当地时间3日,参加...
首页|体育新闻ï斯坦科维奇杯首战将为王治郅举办退役仪式ï2016-07-04 10:39:00ï周二晚上八点,中国男篮将在北京以里约奥运阵容出战在国内...
复制代码
文本清洗 -> 标签索引化 -> 内容文本分词 -> 去除停用词 -> 分词取前5000个词做为特征 -> 特征向量化 -> 保存预处理模型 -> 调用预处理模型 -> 输出预处理数据(indexedLabel,features)bash
首先将文本读取成Dataframe格式,将标签列数据索引化,{文化,经济,军事和体育}向量化后为{0,1,2,3}数据结构
/** * 数据清洗 可根据具体数据结构和业务场景的不一样进行重写. 注意: 输出必需要有标签字段"label" * @param filePath 数据路径 * @param spark SparkSession * @return 清洗后的数据, 包含字段: "label", "title", "time", "content" */
def clean(filePath: String, spark: SparkSession): DataFrame = {
import spark.implicits._
val textDF = spark.sparkContext.textFile(filePath).flatMap { line =>
val fields = line.split("\u00EF") //分隔符:ï,分红标签,标题,时间,内容
//首页|文化新闻ï第十一届全国优秀舞蹈节目展演将在武汉举办ï2016-07-05 19:25:00ï新华社北京7月5日电(记者周玮)由文化部...
//首页|财经中心|财经频道ï上半年浙江口岸原油进口量创同期历史新高ï2016-07-04 21:54:00ï杭州7月4日...
if (fields.length > 3) {
val categoryLine = fields(0)
val categories = categoryLine.split("\\|")
val category = categories.last
//分红4个标签名和其余,最后去除标签为其余的数据
var label = "其余"
if (category.contains("文化")) label = "文化"
else if (category.contains("财经")) label = "财经"
else if (category.contains("军事")) label = "军事"
else if (category.contains("体育")) label = "体育"
else {}
//输出标签,标题,时间,内容
val title = fields(1)
val time = fields(2)
val content = fields(3)
if (!label.equals("其余")) Some(label, title, time, content) else None
} else None
}.toDF("label", "title", "time", "content")
//输出标签,标题,时间,内容DF
textDF
}
/** * 处理label转换为索引形式 * @param data 输入label字段的数据 * @return 标签索引模型, 模型增长字段: "indexedLabel" */
def indexrize(data: DataFrame): StringIndexerModel = {
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data)
labelIndexer
}
复制代码
predictDF.select("label","indexedLabel").show(10, truncate = false)
复制代码
处理内容字段,首先要进行分词,而后去除停用词以及转换为特征向量,方便分类模型进行训练和预测。本文模仿spark的ml包下的StopWordsRemover类建立了Segmenter类,用于对数据进行分词,其内部调用了HanLP分词工具。闭包
因为spark自带的StopWordsRemover等使用的闭包仅限于ml包,自定义的类没法调用,故只是采用了与StopWordsRemover相似的使用形式,内部结构并不相同,而且因为以上缘由,Segmenter类没有继承Transformer类,故没法进行pipeline管道操做,故在分类模型超参数调优过程当中,没有加入分词模型的参数调优。app
/** * 分词过程,包括"分词", "去除停用词" * @param data 输入须要分词的字段的数据"content" * @param params 分词参数 * @return 分词处理后的DataFrame,增长字段: "tokens", "removed" */
def segment(data: DataFrame, params: PreprocessParam): DataFrame = {
val spark = data.sparkSession
//设置分词模型
val segmenter = new Segmenter()
.setSegmentType(params.segmentType) //分词方式
.isDelEn(params.delEn) //是否去除英语单词
.isDelNum(params.delNum) //是否去除数字
.addNature(params.addNature) //是否添加词性
.setMinTermLen(params.minTermLen) //最小词长度
.setMinTermNum(params.minTermNum) //行最小词数
.setInputCol("content") //输入内容字段
.setOutputCol("tokens") //输出分词后的字段
//进行分词
val segDF = segmenter.transform(data)
复制代码
分词以后,须要对一些经常使用的无心义词如:“的”、“咱们”、“是”等(统称为“停用词”)进行去除。这些词没有多大的意义,但这些词不去掉会强烈的干扰咱们对特征的抽取效果。(好比:在体育分类中,“的”出现500次,“足球”共出现300次,但显然足球更能表示体育分类,而“的”反而影响体育分类的结果。dom
去除停用词的操做咱们直接调用ml包中的StopWordsRemover类:eclipse
//读取停用词数据
val stopWordArray = spark.sparkContext.textFile(params.stopwordFilePath).collect()
//设置停用词模型
val remover = new StopWordsRemover()
.setStopWords(stopWordArray)
.setInputCol(segmenter.getOutputCol) //读取"tokens"字段
.setOutputCol("removed") //输出删除停用词后的字段"removed"
//删除停用词
val removedDF = remover.transform(segDF)
removedDF
}
复制代码
因为目前经常使用的分类、聚类等算法都是基于向量空间模型VSM(即将对象向量化为一个N维向量,映射成N维超空间中的一个点),VSM将数据转换为向量形式,便于对大规模数据进行矩阵操做等,也能够经过计算超空间中两个点之间的距离(通常是余弦距离)来计算两个向量之间的类似度。所以,咱们须要将通过处理的语料转换为向量形式,这个过程叫作向量化。机器学习
这里咱们也调用spark提供的向量化类CountVectorizer类进行向量化操做:
/** * 特征向量化处理,包括词汇表过滤 * @param data 输入向量化的字段"removed" * @param params 配置参数 * @return 向量模型 */
def vectorize(data: DataFrame, params: PreprocessParam): CountVectorizerModel = {
//设置向量模型
val vectorizer = new CountVectorizer()
.setVocabSize(params.vocabSize)
.setInputCol("removed")
.setOutputCol("features")
val parentVecModel = vectorizer.fit(data)
//过滤停用词中没有的数字features
val numPattern = "[0-9]+".r
val vocabulary = parentVecModel.vocabulary.flatMap {
term => if (term.length == 1 || term.matches(numPattern.regex)) None else Some(term)
}
val vecModel = new CountVectorizerModel(Identifiable.randomUID("cntVec"), vocabulary)
.setInputCol("removed")
.setOutputCol("features")
vecModel
}
复制代码
将字段"content"先进行分词和去除停用词获得"removed",再将全部词做为特征,进行特征向量化获得"features"字段:
为了方便每一个模型单独训练和预测,将预处理也做为数据处理的模型进行训练,保存和调用,方法以下:
/** * 训练预处理模型 * @param filePath 数据路径 * @param spark SparkSession * @return (预处理后的数据,索引模型,向量模型) * 数据包括字段: "label", "indexedLabel", "title", "time", "content", "tokens", "removed", "features" */
def train(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = {
val params = new PreprocessParam //预处理参数
val cleanDF = this.clean(filePath, spark) //读取DF,清洗数据
val indexModel = this.indexrize(cleanDF) //调用索引模型
val indexDF = indexModel.transform(cleanDF) //标签索引化
val segDF = this.segment(indexDF, params) //将内容字段分词
val vecModel = this.vectorize(segDF, params) //调用向量模型
val trainDF = vecModel.transform(segDF) //内容分词特征向量化
this.saveModel(indexModel, vecModel, params) //保存模型
(trainDF, indexModel, vecModel)
}
/** * 拟合预处理模型 * @param filePath 数据路径 * @param spark SparkSession * @return (预处理后的数据,索引模型,向量模型) */
def predict(filePath: String, spark: SparkSession): (DataFrame, StringIndexerModel, CountVectorizerModel) = {
val params = new PreprocessParam //预处理参数
val cleanDF = this.clean(filePath, spark) //读取DF,清洗数据
val (indexModel, vecModel) = this.loadModel(params) //加载索引和向量模型
val indexDF = indexModel.transform(cleanDF) //标签索引化
val segDF = this.segment(indexDF, params) //内容字段分词
val predictDF = vecModel.transform(segDF) //内容分词特征向量化
(predictDF, indexModel, vecModel)
}
复制代码
调用预处理模型,数据处理后的结果取出5条:
+-----+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+
|label| title| time| content|indexedLabel| tokens| removed| features|
+-----+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+
|财经 |西南乳业巨头新但愿北..|2016-06-27 10:46:00|京华时报讯(记者胡笑...| 1.0|[京华, 时报讯, 记者, 胡笑红...|[京华, 时报讯, 记者, 胡笑红...|(5000,[3,4,14,22,...|
|文化 |全国篆刻名家大做在大..|2016-06-02 21:53:00|内江6月2日电王爵陈 ...| 3.0|[内江, 6月, 日电, 王爵, ...|[内江, 6月, 日电, 王爵, ...|(5000,[0,8,10,13,...|
|文化 |世界海洋日进入8天倒...|2016-05-31 15:38:00|北京5月31日电,记者...| 3.0|[北京, 5月, 日电, 记者, ...|[北京, 5月, 日电, 记者, ...|(5000,[0,3,10,13,...|
|军事 |英媒评中国征兵放宽体..|2016-06-02 08:30:00|参考消息网英媒称,随..| 0.0|[参考消息, 随着, 中国军队, ...|[参考消息, 中国军队, 放宽, ...|(5000,[0,5,12,14,...|
|财经 |2016年二十国集团峰会..|2016-06-25 18:52:00|新华社厦门6月25日记...| 1.0|[新华社, 厦门, 6月, 日电,...|[新华社, 厦门, 6月, 日电,...|(5000,[3,8,10,12,...|
+-----+--------------------+-------------------+--------------------+------------+--------------------+--------------------+--------------------+
only showing top 5 rows
复制代码
本文选用了经常使用的4中多分类模型对文本数据进行训练,利用了管道Pipeline + 网格搜索Gridsearch + 交叉验证CrossValidator 进行参数调优,直接将参数调优放在了训练模型里,将获得的最优模型保存。
朴素贝叶斯算法是基于贝叶斯定理与特征条件独立假设的分类方法。
条件几率
P(A|B)表示事件B已经发生的前提下,事件A发生的几率,叫作事件B发生下事件A的条件几率。其基本求解公式为:
经常使用的模型主要有3个,多项式、伯努利和高斯模型:
平滑系数
超参数平滑系数α,做用是防止后验几率为0,当α = 1时,称做Laplace平滑,当0 < α < 1时,称做Lidstone平滑,α = 0时不作平滑。本文主要对平滑系数进行调参。
/** * NB模型训练处理过程 * @param data 训练数据集 * @return nbBestModel */
def train(data: DataFrame): NaiveBayesModel = {
val params = new ClassParam
//NB分类模型管道训练调参
data.persist()
data.show(5)
//NB模型
val nbModel = new NaiveBayes()
.setModelType(params.nbModelType) //多项式模型或者伯努利模型
.setSmoothing(params.smoothing) //平滑系数
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
//创建管道,模型只有一个 stages = 0
val pipeline = new Pipeline()
.setStages(Array(nbModel))
//创建网格搜索
val paramGrid = new ParamGridBuilder()
//.addGrid(nbModel.modelType, Array("multinomial", "bernoulli"))
//伯努利模型须要特征为01的数据
.addGrid(nbModel.smoothing, Array(0.01, 0.1, 0.2, 0.5))
.build()
//创建evaluator,必需要保证验证的标签列是向量化后的标签
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("indexedLabel")
//创建一个交叉验证的评估器,设置评估器的参数
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
//运行交叉验证评估器,获得最佳参数集的模型
val cvModel = cv.fit(data)
//获取最优逻辑回归模型
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val bestNBModel = bestModel.stages(0).asInstanceOf[NaiveBayesModel]
println("类的数量(标签可使用的值): " + bestNBModel.numClasses)
println("模型所接受的特征的数量: " + bestNBModel.numFeatures)
println("最优的modelType的值为: "+ bestNBModel.explainParam(bestNBModel.modelType))
println("最优的smoothing的值为: "+ bestNBModel.explainParam(bestNBModel.smoothing))
//更新最优朴素贝叶斯模型,并训练数据
val nbBestModel = new NaiveBayes()
.setModelType(bestNBModel.getModelType) //多项式模型或者伯努利模型
.setSmoothing(bestNBModel.getSmoothing) //平滑系数
.setLabelCol("indexedLabel")
.setFeaturesCol("features")
.fit(data)
this.saveModel(nbBestModel, params)
data.unpersist()
nbBestModel
}
复制代码
后续的三个算法原理网上都有不少,训练的代码也相似,本文只给出模型调参的部分代码。
//LR模型
val lrModel = new LogisticRegression()
.setMaxIter(bestLRModel.getMaxIter) //模型最大迭代次数
.setRegParam(bestLRModel.getRegParam) //正则化参数
.setElasticNetParam(params.elasticNetParam) //L1范式比例, L1/(L1 + L2)
.setTol(params.converTol) //模型收敛阈值
.setLabelCol("indexedLabel") //设置索引化标签字段
.setFeaturesCol("features") //设置向量化文本特征字段
//创建网格搜索
val paramGrid = new ParamGridBuilder()
.addGrid(lrModel.maxIter, Array(5, 10))
.addGrid(lrModel.regParam, Array(0.1, 0.2))
.build()
复制代码
//决策树模型
val dtModel = new DecisionTreeClassifier()
.setMinInfoGain(params.minInfoGain) //最小信息增益阈值
.setMaxDepth(params.maxDepth) //决策树最大深度
.setImpurity(params.impurity) //节点不纯度和信息增益方法gini, entropy
.setLabelCol("indexedLabel") //设置索引化标签字段
.setFeaturesCol("features") //设置向量化文本特征字段
//创建网格搜索
val paramGrid = new ParamGridBuilder()
.addGrid(dtModel.minInfoGain, Array(0.0, 0.1))
.addGrid(dtModel.maxDepth, Array(10, 20))
.addGrid(dtModel.impurity, Array("gini", "entropy"))
.build()
复制代码
随机森林模型经常须要调试以提升算法效果的两个参数:numTrees,maxDepth
实际上要想得到一个适当的阈值是至关困难的。高阈值可能致使过度简化的树,而低阈值可能简化不够。
预剪枝方法 minInfoGain、minInstancesPerNode 其实是经过不断修改中止条件来获得合理的结果,这并非一个好办法,事实上 咱们经常甚至不知道要寻找什么样的结果。这样就须要对树进行后剪枝了(后剪枝不须要用户指定参数,是更为理想化的剪枝方法)
//随机森林模型(不加fit)
val rfModel = new RandomForestClassifier()
.setMaxDepth(params.maxDepth) //决策树最大深度
.setNumTrees(params.numTrees) //设置决策树个数
.setMinInfoGain(params.minInfoGain) //最小信息增益阈值
.setImpurity(params.impurity) //信息增益的指标,选择熵或者gini不纯度
//.setMaxBins(params.maxBins) //最大分桶个数,用于连续特征离散化时决定每一个节点如何分裂
.setLabelCol("indexedLabel") //设置索引化标签字段
.setFeaturesCol("features") //设置向量化文本特征字段
//创建网格搜索
val paramGrid = new ParamGridBuilder()
.addGrid(rfModel.maxDepth, Array(5, 10, 20))
.addGrid(rfModel.numTrees, Array(5, 10, 20))
.addGrid(rfModel.minInfoGain, Array(0.0, 0.1, 0.5))
.build()
复制代码
机器学期通常都须要一个量化指标来衡量其效果:这个模型的准确率、召回率和F1值(这3个指标是评判模型预测能力经常使用的一组指标),spark提供了用于多分类模型评估的类MulticlassClassificationEvaluator,并将3个指标同时输出
object Evaluations extends Serializable {
/** * 多分类结果评估 * @param data 分类结果 * @return (准确率, 召回率, F1) */
def multiClassEvaluate(data: RDD[(Double, Double)]): (Double, Double, Double) = {
val metrics = new MulticlassMetrics(data)
val weightedPrecision = metrics.weightedPrecision
val weightedRecall = metrics.weightedRecall
val f1 = metrics.weightedFMeasure
(weightedPrecision, weightedRecall, f1)
}
}
复制代码
以逻辑回归为例,预测结果以下图,"probability"中4个值表示4个类别的预测几率:
/** * Description: 多分类模型预测结果评估对比 * Created by wy in 2019/4/16 10:07 */
object MultiClassEvalution {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder
.master("local")
.appName("Multi_Class_Evaluation_Demo")
.getOrCreate()
val filePath = "data/dataTest/predict"
//预处理(清洗、分词、向量化)
val preprocessor = new Preprocessor
val (predictDF, indexModel, _) = preprocessor.predict(filePath, spark)
predictDF.select("content","removed", "features").show(1, truncate = false)
//朴素贝叶斯模型预测
val nbClassifier = new NBClassifier
val nbPredictions = nbClassifier.predict(predictDF, indexModel)
//逻辑回归模型预测
val lrClassifier = new LRClassifier //import Classification.LogisticRegression.LRClassifier
val lrPredictions = lrClassifier.predict(predictDF, indexModel)
//决策树模型预测
val dtClassifier = new DTClassifier
val dtPredictions = dtClassifier.predict(predictDF, indexModel)
//随机森林模型预测
val rfClassifier = new RFClassifier
val rfPredictions = rfClassifier.predict(predictDF, indexModel)
//多个模型评估
val predictions = Seq(nbPredictions, lrPredictions, dtPredictions, rfPredictions)
val classNames = Seq("朴素贝叶斯模型", "逻辑回归模型", "决策树模型", "随机森林模型")
for (i <- 0 to 3) {
val prediction = predictions(i)
val className = classNames(i)
val resultRDD = prediction.select("prediction", "indexedLabel").rdd.map {
case Row(prediction: Double, label: Double) => (prediction, label)
}
val (precision, recall, f1) = Evaluations.multiClassEvaluate(resultRDD)
println(s"\n========= $className 评估结果 ==========")
println(s"加权准确率:$precision")
println(s"加权召回率:$recall")
println(s"F1值:$f1")
}
}
}
复制代码