最近在研究机器学习,使用的工具是spark,本文是针对spar最新的源码Spark1.6.0的MLlib中的 logistic regression, linear regression进行源码分析,其理论部分参考:http://www.cnblogs.com/ljy2013/p/5129610.htmlhtml
下面咱们跟随个人demo来一步一步解剖源码,首先来看一下个人demo:程序员
1 package org.apache.spark.mllib.classification 2 3 import org.apache.spark.SparkContext 4 import org.apache.spark.mllib.classification.{ LogisticRegressionWithLBFGS, LogisticRegressionModel } 5 import org.apache.spark.mllib.evaluation.MulticlassMetrics 6 import org.apache.spark.mllib.regression.LabeledPoint 7 import org.apache.spark.mllib.linalg.Vectors 8 import org.apache.spark.mllib.util.MLUtils 9 import org.apache.spark.SparkConf 10 11 object MyLogisticRegression { 12 def main(args: Array[String]): Unit = { 13 14 val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]") 15 val sc = new SparkContext(conf) 16 17 // Load training data in LIBSVM format. 这里的数据格式是LIBSVM格式:<label> <index1>:<value1> <index2>:<value2> ...index1是按1开始的 18 val data = MLUtils.loadLibSVMFile(sc, "D:\\MyFile\\wine.txt") 19 20 // Split data into training (60%) and test (40%). 21 val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) 22 val training = splits(0).cache() 23 val test = splits(1) 24 25 // Run training algorithm to build the model 26 val model = new LogisticRegressionWithLBFGS() 27 .setNumClasses(10) //设置类别的个数 28 .run(training) 29 30 // Compute raw scores on the test set. 31 val predictionAndLabels = test.map { 32 case LabeledPoint(label, features) => 33 val prediction = model.predict(features) 34 (prediction, label) 35 } 36 37 // Get evaluation metrics. 38 val metrics = new MulticlassMetrics(predictionAndLabels) 39 val precision = metrics.precision 40 println("Precision = " + precision) 41 42 // Save and load model 43 model.save(sc, "myModelPath") 44 val sameModel = LogisticRegressionModel.load(sc, "myModelPath") 45 46 } 47 }
从上面的demo,咱们能够看出LogisticRegression采用的是LBFGS算法来进行优化求参数的,LBFGS是一个无约束项优化算法,主要用来求解逻辑回归的参数(权值)。不清楚的同窗能够参考:http://www.cnblogs.com/ljy2013/p/5129610.html 。web
我将其中的类继承图简单的画了一下:算法
主要分了两个过程:训练和预测。apache
一、训练过程缓存
首先主程序经过调用下面的方法来进行训练app
1 // Run training algorithm to build the model 2 val model = new LogisticRegressionWithLBFGS() 3 .setNumClasses(10) //设置类别的个数 4 .run(training)
经过设置对应的类别的个数,而后调用LogisticRegressionWithLBFGS的run方法,可是LogisticRegressionWithLBFGS类自己是没有该方法的,但它继承自GeneralizedLinearAlgorithm类的run方法,训练过程就是在这个方法中完成的,如今让咱们来看一下这个方法:dom
1 def run(input: RDD[LabeledPoint], initialWeights: Vector): M = { 2 3 if (numFeatures < 0) { 4 numFeatures = input.map(_.features.size).first() 5 } 6 //因为须要屡次迭代,所以须要将训练数据缓存到内存中 7 if (input.getStorageLevel == StorageLevel.NONE) { 8 logWarning("The input data is not directly cached, which may hurt performance if its" 9 + " parent RDDs are also uncached.") 10 } 11 12 // Check the data properties before running the optimizer 13 if (validateData && !validators.forall(func => func(input))) { 14 throw new SparkException("Input validation failed.") 15 } 16 17 /** 18 * Scaling columns to unit variance as a heuristic to reduce the condition number: 19 * 20 * During the optimization process, the convergence (rate) depends on the condition number of 21 * the training dataset. Scaling the variables often reduces this condition number 22 * heuristically, thus improving the convergence rate. Without reducing the condition number, 23 * some training datasets mixing the columns with different scales may not be able to converge. 24 * 25 * GLMNET and LIBSVM packages perform the scaling to reduce the condition number, and return 26 * the weights in the original scale. 27 * See page 9 in http://cran.r-project.org/web/packages/glmnet/glmnet.pdf 28 * 29 * Here, if useFeatureScaling is enabled, we will standardize the training features by dividing 30 * the variance of each column (without subtracting the mean), and train the model in the 31 * scaled space. Then we transform the coefficients from the scaled space to the original scale 32 * as GLMNET and LIBSVM do. 33 * 34 * Currently, it's only enabled in LogisticRegressionWithLBFGS 35 */ 36 //将数据标准化 37 val scaler = if (useFeatureScaling) { 38 new StandardScaler(withStd = true, withMean = false).fit(input.map(_.features)) 39 } else { 40 null 41 } 42 43 // Prepend an extra variable consisting of all 1.0's for the intercept. 44 // TODO: Apply feature scaling to the weight vector instead of input data. 45 val data = 46 if (addIntercept) { 47 if (useFeatureScaling) { 48 input.map(lp => (lp.label, appendBias(scaler.transform(lp.features)))).cache() 49 } else { 50 input.map(lp => (lp.label, appendBias(lp.features))).cache() 51 } 52 } else { 53 if (useFeatureScaling) { 54 input.map(lp => (lp.label, scaler.transform(lp.features))).cache() 55 } else { 56 input.map(lp => (lp.label, lp.features)) 57 } 58 } 59 60 /** 61 * TODO: For better convergence, in logistic regression, the intercepts should be computed 62 * from the prior probability distribution of the outcomes; for linear regression, 63 * the intercept should be set as the average of response. 64 */ 65 val initialWeightsWithIntercept = if (addIntercept && numOfLinearPredictor == 1) { 66 appendBias(initialWeights) 67 } else { 68 /** If `numOfLinearPredictor > 1`, initialWeights already contains intercepts. */ 69 initialWeights 70 } 71 72 //采用优化器对权值进行优化,返回优化好的权值,即最终的模型参数 73 val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept) 74 75 val intercept = if (addIntercept && numOfLinearPredictor == 1) { 76 weightsWithIntercept(weightsWithIntercept.size - 1) 77 } else { 78 0.0 79 } 80 81 var weights = if (addIntercept && numOfLinearPredictor == 1) { 82 Vectors.dense(weightsWithIntercept.toArray.slice(0, weightsWithIntercept.size - 1)) 83 } else { 84 weightsWithIntercept 85 } 86 87 /** 88 * The weights and intercept are trained in the scaled space; we're converting them back to 89 * the original scale. 90 * 91 * Math shows that if we only perform standardization without subtracting means, the intercept 92 * will not be changed. w_i = w_i' / v_i where w_i' is the coefficient in the scaled space, w_i 93 * is the coefficient in the original space, and v_i is the variance of the column i. 94 */ 95 if (useFeatureScaling) { 96 if (numOfLinearPredictor == 1) { 97 weights = scaler.transform(weights) 98 } else { 99 /** 100 * For `numOfLinearPredictor > 1`, we have to transform the weights back to the original 101 * scale for each set of linear predictor. Note that the intercepts have to be explicitly 102 * excluded when `addIntercept == true` since the intercepts are part of weights now. 103 */ 104 var i = 0 105 val n = weights.size / numOfLinearPredictor 106 val weightsArray = weights.toArray 107 while (i < numOfLinearPredictor) { 108 val start = i * n 109 val end = (i + 1) * n - { if (addIntercept) 1 else 0 } 110 111 val partialWeightsArray = scaler.transform( 112 Vectors.dense(weightsArray.slice(start, end))).toArray 113 114 System.arraycopy(partialWeightsArray, 0, weightsArray, start, partialWeightsArray.size) 115 i += 1 116 } 117 weights = Vectors.dense(weightsArray) 118 } 119 } 120 121 // Warn at the end of the run as well, for increased visibility. 122 if (input.getStorageLevel == StorageLevel.NONE) { 123 logWarning("The input data was not directly cached, which may hurt performance if its" 124 + " parent RDDs are also uncached.") 125 } 126 127 // Unpersist cached data 128 if (data.getStorageLevel != StorageLevel.NONE) { 129 data.unpersist(false) 130 } 131 132 createModel(weights, intercept) 133 }
这个方法中,第一步是实现训练数据进行标准化处理;机器学习
第二步,就是经过优化器算法进行求最优的权值。这里要注意一点:它是实现的方式是:val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)这里有一个应用到多态的特性。这里的optimizer是GeneralizedLinearAlgorithm类中的抽象方法,以下所示:ide
可是子类LogisticRegressionWithLBFGS实现了该方法:(这样子的设计能够作到每个算法能够有本身特有的优化算法来计算最优权值,但GeneralizedLinearAlgorithm类能够适用于全部的机器学习算法)
好了,如今是第三步,建立算法模型。咱们能够看到GeneralizedLinearAlgorithm的run方法中,建立模型就一句代码搞定:createModel(weights, intercept)。但其中包含了程序员的设计思想在里面。和上面optimizer相似,createModel(weights, intercept)方法也是用到了多态的方式来实现。首先,GeneralizedLinearAlgorithm类中定义了一个抽象的:createModel方法,以下所示:
protected def createModel(weights: Vector, intercept: Double): M
子类LogisticRegressionWithLBFGS实现了该方法。以下所示:
1 override protected def createModel(weights: Vector, intercept: Double) = { 2 if (numOfLinearPredictor == 1) { 3 //两类的模型 4 new LogisticRegressionModel(weights, intercept) 5 } else { 6 //多类的模型 7 new LogisticRegressionModel(weights, intercept, numFeatures, numOfLinearPredictor + 1) 8 } 9 }
所以实际上调用的是对应LogisticRegressionWithLBFGS的createModel方法。到目前为止,算法的模型算是已经创建了。接下来就是如何利用算法的模型,好比预测等
二、预测过程
预测过程主要是利用上面创建好的模型,去判断未知样本的类别。首先父类GeneralizedLinearModel有一个predict方法用来预测,该方法有两个实现方式,一个是一参数为RDD的方式,用于并行预测;另外一个参数是vector的方式,用于单机预测。代码以下所示:
1 /** 2 * Predict values for the given data set using the model trained. 3 * 4 * @param testData RDD representing data points to be predicted 5 * @return RDD[Double] where each entry contains the corresponding prediction 6 * 7 */ 8 @Since("1.0.0") //该方法是用于并行预测 9 def predict(testData: RDD[Vector]): RDD[Double] = { 10 // A small optimization to avoid serializing the entire model. Only the weightsMatrix 11 // and intercept is needed. 12 val localWeights = weights 13 val bcWeights = testData.context.broadcast(localWeights) 14 val localIntercept = intercept 15 testData.mapPartitions { iter => 16 val w = bcWeights.value 17 iter.map(v => predictPoint(v, w, localIntercept)) 18 } 19 } 20 21 /** 22 * Predict values for a single data point using the model trained. 23 * 24 * @param testData array representing a single data point 25 * @return Double prediction from the trained model 26 * 27 */ 28 @Since("1.0.0") //该方法适用于单机预测 29 def predict(testData: Vector): Double = { 30 predictPoint(testData, weights, intercept) 31 }
咱们仔细看上面的代码,上面的两个predict方法都是须要调用predictPoint方法 ,而predictPoint方法在GeneralizedLinearModel类中是抽象方法。也就是说,对应不一样的机器学习的算法,有不一样的预测方式,如逻辑回归的预测方式是判断对应的与测试是否大于0.5(以下所示)。
因此,对应的每个具体的机器学习算法的模型都会去从新实现父类(GeneralizedLinearModel)的predictPoint方法。那么咱们来看一下GeneralizedLinearModel的predictPoint方法是怎么实现的,代码以下所示:
1 override protected def predictPoint( 2 dataMatrix: Vector, 3 weightMatrix: Vector, 4 intercept: Double) = { 5 require(dataMatrix.size == numFeatures) 6 7 // If dataMatrix and weightMatrix have the same dimension, it's binary logistic regression. 8 if (numClasses == 2) { 9 val margin = dot(weightMatrix, dataMatrix) + intercept 10 val score = 1.0 / (1.0 + math.exp(-margin)) 11 threshold match { 12 case Some(t) => if (score > t) 1.0 else 0.0 13 case None => score 14 } 15 } else { 16 /** 17 * Compute and find the one with maximum margins. If the maxMargin is negative, then the 18 * prediction result will be the first class. 19 * 20 * PS, if you want to compute the probabilities for each outcome instead of the outcome 21 * with maximum probability, remember to subtract the maxMargin from margins if maxMargin 22 * is positive to prevent overflow. 23 */ 24 var bestClass = 0 25 var maxMargin = 0.0 26 val withBias = dataMatrix.size + 1 == dataWithBiasSize 27 (0 until numClasses - 1).foreach { i => 28 var margin = 0.0 29 dataMatrix.foreachActive { (index, value) => 30 if (value != 0.0) margin += value * weightsArray((i * dataWithBiasSize) + index) 31 } 32 // Intercept is required to be added into margin. 33 if (withBias) { 34 margin += weightsArray((i * dataWithBiasSize) + dataMatrix.size) 35 } 36 if (margin > maxMargin) { 37 maxMargin = margin 38 bestClass = i + 1 39 } 40 } 41 bestClass.toDouble 42 } 43 }
至此,机器学习的算法的源码分析算是完成,本文主要是借用LogisticRegression算法来说述源码,其余机器学习实际上也是相似的分析,后续也会给出其余算法的源码分析。