在数据挖掘和机器学习算法中,有不少分类算法。其中朴素贝叶斯(Naive Bayes classiffer,NBC)是最简单但倒是最有效的算法之一。java
本章节实现一个基于监督学习方法和几率分析器。朴素贝叶斯是一个线性分类器。要理解这个概念,须要了解条件几率的概念。算法
处理数值数据时,最好使用聚类技术,例如K均值、K-近邻算法。不过杜宇名字、符号、电子邮件和文本类型的分类,则最好使用几率方法,如NBC。apache
在某些状况下,咱们也可使用NBC对数值类型进行分类。api
NBC是一个基于强(朴素)独立假设应用贝叶斯定理实现的几率分类器。基本说来,NBC根据输入的一些属性(特征)将输入分配到K个类别$C_{1},C_{2},...,C_{k}$中的某个类。NBC有不少应用,如垃圾邮件过滤和文档分类。数据结构
例如,使用NBC的垃圾邮件过滤器把各个电子邮件分配到两个簇之一:app
垃圾邮件(spammail) 、非垃圾邮件(not spam mail)
前面提到,朴素贝叶斯是一个基于监督学习的算法,这类算法都有一个特点。就是须要两个阶段完成应用:机器学习
第二阶段,使用分类器对新数据进行分类,这时候咱们能够运用公式: $$ C^{predict} = \arg \max P(C=c)\prod_{j=1}^{m}P(X_{j} = u_{j}|C=c) $$ 其中P(C)已经在咱们的第一阶段求解得出,即每一类占数据集的总几率(当数据集足够大的时候,他已经能够表明全部现象——理想状态)。oop
1,晴,热,高,弱,不 2,晴,热,高,强,不 3,阴,热,高,弱,是 4,雨,温暖,高,弱,是 5,雨,冷,正常,弱,是 6,雨,冷,正常,强,不 7,阴,冷,正常,强,是 8,晴,温暖,高,弱,不 9,晴,冷,正常,弱,是 10,雨,温暖,正常,弱,是 11,晴,温暖,正常,强,是 12,阴,温暖,高,强,是 13,阴,热,正常,弱,是 14,雨,温暖,高,强,不
经过该数据集咱们能够生成一个分类器。post
package com.sunrun.movieshow.autils.nbc; import edu.umd.cloud9.io.pair.PairOfStrings; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /* 阶段1:使用训练数据创建一个分类器:一组几率表 * 输入数据(id,天气,温度,湿度,风力,是否适合打球) 1,晴,热,高,弱,不 2,晴,热,高,强,是 ... */ public class BuildNBCClassifier implements Serializable { /** * 1. 获取Spark 上下文对象 * @return */ public static JavaSparkContext getSparkContext(String appName){ SparkConf sparkConf = new SparkConf() .setAppName(appName) //.setSparkHome(sparkHome) .setMaster("local[*]") // 串行化器 .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .set("spark.testing.memory", "2147480000"); return new JavaSparkContext(sparkConf); } // 2. 将Map(Tuple2,Double) -> List<Tuple2<PairOfString,DoubleWritable>> // PairOfStrings是一个实现了Writable接口的类,这样就能够支持Hadoop写入了 public static List<Tuple2<PairOfStrings, DoubleWritable>> toWritableList(Map<Tuple2<String,String>,Double> PT){ List<Tuple2<PairOfStrings, DoubleWritable>> list = new ArrayList<>(); for (Map.Entry<Tuple2<String, String>, Double> entry : PT.entrySet()) { list.add(new Tuple2<>(new PairOfStrings(entry.getKey()._1,entry.getKey()._2),new DoubleWritable(entry.getValue()))); } return list; } public static void main(String[] args) { JavaSparkContext sc = getSparkContext("buildBNC"); JavaRDD<String> training = sc.textFile("data/nbc/ball.txt"); String dfsUrl = "hdfs://10.21.1.24:9000/ball/"; // == 1.得到数据集大小,用于几率计算 long trainingDataSize = training.count(); // == 2.转换数据集: line -> ((feature,classification),1) JavaPairRDD<Tuple2<String, String>, Integer> pairs = training.flatMapToPair(line -> { List<Tuple2<Tuple2<String, String>, Integer>> result = new ArrayList<>(); String[] tokens = line.split(","); // 0->id, 1-(n-1) -> A(feature), n -> T(classification) String nowClassification = tokens[tokens.length - 1]; for (int i = 1; i < tokens.length - 1; i++) { // like ((晴,是),1) result.add(new Tuple2<>(new Tuple2<>(tokens[i], nowClassification), 1)); } // 最后还要统计每一个类别的总出现次数,所以,须要单独的对最终class进行统计((class,是),1) result.add(new Tuple2<>(new Tuple2<>("class", nowClassification), 1)); return result.iterator(); }); //pairs.saveAsTextFile(dfsUrl + "pair"); /** * ((晴,不),1) * ((热,不),1) * ((高,不),1) * ((弱,不),1) * ((class,不),1) * ... */ // == 3.计算每种特征出现的次数 JavaPairRDD<Tuple2<String, String>, Integer> count = pairs.reduceByKey((c1, c2) -> { return c1 + c2; }); //count.saveAsTextFile(dfsUrl + "count"); /** * [root@h24 ~]# hadoop fs -cat /ball/count/p* * ((强,是),3) * ((雨,是),3) * ((弱,是),6) * ((class,不),5) * ((晴,不),3) * ((class,是),9) * ((冷,是),3) * ... */ // == 4.将归约数据转换为map Map<Tuple2<String, String>, Integer> countAsMap = count.collectAsMap(); // == 5.创建分类器数据结构:几率表PT、分类表CT HashMap<Tuple2<String, String>, Double> PT = new HashMap<>(); ArrayList<String> CT = new ArrayList<>(); for (Map.Entry<Tuple2<String, String>, Integer> entry : countAsMap.entrySet()) { // (feature,classification) Tuple2<String, String> key = entry.getKey(); String feature = key._1; String classification = key._2; // K: new Tuple2<>(feature, classification) V: compute probably Tuple2<String, String> K = new Tuple2<>(feature, classification); // class type:target feature classification P(C) if(feature.equals("class")){ CT.add(classification); // target feature times / total,总类型的几率为类别出现次数/总记录数 PT.put(K, (double)entry.getValue() / trainingDataSize); }else{ // 获取某个分类出现的总次数。(Yes? No?) P(Ai|C=Ci) = 属性A值在类别C下出现的次数/类别C的出现次数 Tuple2<String, String> K2 = new Tuple2<>("class", classification); Integer times = countAsMap.get(K2); // 该类别没出现过,则几率设为0.0(其实不可能为0) if(times == 0){ PT.put(K,0.0); }else{ PT.put(K, (double)entry.getValue() / times); } } } // System.out.println(PT); // (class,是)=0.6428571428571429, (冷,是)=0.3333333333333333, (晴,不)=0.6, (雨,是)=0.3333333333333333, (高,不)=0.8, // System.out.println(CT); // [不, 是] // == 6. 保存分类器的数据结构 // ==== 6.1.转化为持久存储数据类型 List<Tuple2<PairOfStrings, DoubleWritable>> ptList = toWritableList(PT); JavaPairRDD<PairOfStrings, DoubleWritable> ptRDD = sc.parallelizePairs(ptList); // ==== 6.2.存储到Hadoop ptRDD.saveAsNewAPIHadoopFile(dfsUrl + "nbc/pt", // 存储路径 PairOfStrings.class,// K DoubleWritable.class, // V SequenceFileOutputFormat.class// 输出格式类 ); // == 7.保存分类列表 JavaRDD<String> ctRDD = sc.parallelize(CT); ctRDD.saveAsTextFile(dfsUrl + "nbc/ct"); /** * [root@h24 ~]# hadoop fs -cat /ball/nbc/ct/* * 不 * 是 * * [root@h24 ~]# hadoop fs -cat /ball/nbc/pt/* * SEQ$edu.umd.cloud9.io.pair.PairOfStrings#org.apache.hadoop.io.DoubleWritableռ * ... */ System.out.println("complete training..."); } }
package com.sunrun.movieshow.autils.nbc; import com.sunrun.movieshow.autils.common.SparkHelper; import edu.umd.cloud9.io.pair.PairOfStrings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; import java.util.ArrayList; import java.util.List; import java.util.Map; // 朴素贝叶斯分类器 // 阶段1:训练阶段:使用训练数据创建一个朴素贝叶斯分类器 BuildNBClassifier // 阶段2:测试阶段:使用新创建的NBC对新数据进行分类 NBCTester public class NBCTester { public static void main(String[] args) { JavaSparkContext sc = SparkHelper.getSparkContext("NBCTester"); // 实例存储文件的根目录 String dfsUrl = "hdfs://10.21.1.24:9000/ball/"; // == 1.导入要分类的数据集 JavaRDD<String> testRdd = sc.textFile("data/nbc/test.txt"); /** * 1,晴,热,高,弱 * 2,晴,热,高,强 * 3,阴,热,高,弱 * 4,雨,温暖,高,弱 */ // == 2.加载分类器模型 JavaPairRDD<PairOfStrings, DoubleWritable> modelRDD = sc.newAPIHadoopFile(dfsUrl + "nbc/pt", SequenceFileInputFormat.class, PairOfStrings.class, DoubleWritable.class, new Configuration() ); // System.out.println(ptRDD.collect()); /** *((高, 不),0.8), ((高, 不),0.8), ((高, 不),0.8),... */ // == 3.使用map复制返回的对象:((高, 不),0.8) JavaPairRDD<Tuple2<String, String>, Double> ptRDD = modelRDD.mapToPair(t -> { // pairStrings left and right (feature-v,classification) Tuple2<String, String> K = new Tuple2<>(t._1.getLeftElement(), t._1.getRightElement()); // V - the probably Double V = new Double(t._2.get()); return new Tuple2<>(K, V); }); // == 4.广播分类器 Broadcast<Map<Tuple2<String, String>, Double>> broadcastPT = sc.broadcast(ptRDD.collectAsMap()); // == 5.广播全部分类类别 JavaRDD<String> ctRDD = sc.textFile(dfsUrl + "nbc/ct"); final Broadcast<List<String>> broadcastCT = sc.broadcast(ctRDD.collect()); // == 6.对新数据进行分类: argMax II(P(C=c) * P(Ai|c)) JavaPairRDD<String, String> testResult = testRdd.mapToPair(line -> { // broadcast value Map<Tuple2<String, String>, Double> pt = broadcastPT.getValue(); List<String> ct = broadcastCT.getValue(); // 解析新数据的每个特征值 String[] featureValues = line.split(","); // 选择类别 String selectedClasses = ""; // 当前的最大几率 double maxPosterior = 0.0; // 计算: for (String Ci : ct) { // P(Ci) Double posterior = pt.get(new Tuple2<>("class", Ci)); for (int i = 0; i < featureValues.length; i++) { // P(Ai|Ci) Double probably = pt.get(new Tuple2<>(featureValues[1], Ci)); // 这里的逻辑有待探讨,能够理解为,当前类别下,没有这种特征值出现,那么当 // 一条数据的特征值为此值时,II(P(C=c) * P(Ai|c)) = 0,也就是该类别不可能被选择。 if (probably == null) { posterior = 0.0; break; } else { // P(Ci) * P(Ai|Ci) posterior *= probably.doubleValue(); } } System.out.println(line + "," + Ci + posterior); if (selectedClasses == null) { // 计算第1个分类的值 selectedClasses = Ci; maxPosterior = posterior; } else { if (posterior > maxPosterior) { selectedClasses = Ci; maxPosterior = posterior; } } } return new Tuple2<>(line, selectedClasses); }); testResult.saveAsTextFile(dfsUrl + "test01"); /*** * [root@h24 ~]# hadoop fs -cat /ball/ball/test02/p* * (1,晴,热,高,弱,不) * (2,晴,热,高,强,不) * (3,阴,热,高,弱,是) * (4,雨,温暖,高,弱,不) */ } }
MLib提供了各类各样的算法提供给咱们使用,这里也能够直接偷懒使用MLib提供的算法进行分类器的构建。学习
贝叶斯算法比起KNN算法来讲,效果高了不少,KNN算法虽然能够保证结果比较精确,可是其庞大的运算量在许多场景没法使用,所以,KNN能够说没有NBC运用的普遍。
使用朴素贝叶斯算法的思路:
第一阶段:训练分类器阶段,该阶段须要大量的数据训练一个分类器,数据量越大,预测结果越准确:
一、获取整个数据集的记录总数N
二、计算每一个类别出现的几率,即P(C) $$ P(C_{i}) = \frac{C_{i}}{N} $$ 其中$C_{i}$为数据集中类别出现的次数。
三、计算每一个特征值的条件几率(在已知类别以后) $$ P(A_{i}|C_{i}) = \frac{P(A_{i}\cap C_{i})}{P(C_{i})} = \frac{A_{i}\cap C_{i}}{C_{i}} $$ 例如咱们案例中的,求出天气这个特征向量在取值为晴的时候的几率,经过计算获得((晴,是),4)
这个元组,其中的4就为$(A_{i}\cap C_{i})$出现的次数。
四、经过步骤3的公式,计算出全部特征值对应的几率,而后将其做为分类器存储到Hadoop Support文件系统。同时也从训练集的目标列中总结出分为多少类,做为类别迭代器计算。
第二阶段:测试分类器阶段。
加载分类器,咱们使用公式 $$ C^{predict} = \arg \max P(C=c)\prod_{j=1}^{m}P(X_{j} = u_{j}|C=c) $$ 代入每一个测试数据,使用类别迭代器,分别测试将测试数据做为每一个类别以后获得的条件几率是多少,选取最大的那一个所选择的类别做为测试数据的分类结果。
能够看到:贝叶斯算法的核心思想就是这样一个常识:当你不能准确知悉一个事物的本质时,你能够依靠与事物特定本质相关的事件出现的多少去判断其本质属性的几率。 用数学语言表达就是:支持某项属性的事件发生得愈多,则该属性成立的可能性就愈大。
联系到咱们的应用:咱们都是假设给定的数据是类别C的状况下,他在咱们熟知的知识系统(训练集)中进行后验几率的计算以后,其分值(几率)越高,那么他就最可能就是这个类别。