MLlib 是Spark 中提供机器学习函数的库。它是专为在集群上并行运行的状况而设计的。MLlib 中包含许多机器学习算法,能够在Spark 支持的全部编程语言中使用,因为Spark基于内存计算模型的优点,很是适合机器学习中出现的屡次迭代,避免了操做磁盘和网络的性能损耗。Spark 官网展现的 MLlib 与Hadoop性能对比图就很是显著。因此Spark比Hadoop的MapReduce框架更易于支持机器学习。html
MLlib 包含一些特有的数据类型,它们位于org.apache.spark.mllib 包(Java/Scala)或
pyspark.mllib(Python)内。python
本地向量(Local Vector)的索引是从0开始的,而且是整型。而它的值为 Double 类型,存储于单个机器内。 MLlib 支持两种本地向量:稠密向量和稀疏向量。算法
稠密向量是用一个 Double 类型的数组表明它的实体值,而稀疏向量是基于两个并行数组,即索引和值。举个例子,向量 (1.0, 0.0, 3.0)
写成稠密形式就是 [1.0, 0.0, 3.0]
,而写成稀疏形式则是 (3, [0, 2], [1.0, 3.0])
,后者的第一个 3 是指向量的大小。稀疏和稠密的界限没有严格意义上的标准,一般须要依据具体的问题来决定。apache
本地向量的基类是 Vector 类,DenseVector 和 SparseVector 类是其两种不一样的实现。官方文档推荐你们使用 Vector 类中已实现的工厂方法来建立本地向量。编程
Scala环境下:数组
//建立稠密向量 scala> val denseVec1 = Vectors.dense(1.0,2.0,3.0) denseVec1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0] scala> val denseVec2 = Vectors.dense(Array(1.0,2.0,3.0)) denseVec2: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0] //建立稀疏向量 scala> val sparseVec1 = Vectors.sparse(4,Array(0,2),Array(1.0,2.0)) sparseVec1: org.apache.spark.mllib.linalg.Vector = (4,[0,2],[1.0,2.0])
python环境下:
网络
>>> from pyspark.mllib.linalg import Vectors >>> den = Vectors.dense([1.0,2.0,3.0]) >>> den DenseVector([1.0, 2.0, 3.0]) >>> spa = Vectors.sparse(4,[0,2],[1.0,2.0]) >>> spa SparseVector(4, {0: 1.0, 2: 2.0})
在诸如分类和回归这样的监督式学习(supervised learning)算法中,LabeledPoint 用来表示带标签的数据点。它包含一个特征向量与一个标签(由一个浮点数表示),位置在 mllib.regression
包中。框架
Scala环境中:dom
// 首先须要引入标签点相关的类 import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint // 建立一个带有正面标签和稠密特征向量的标签点。 val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) // 建立一个带有负面标签和稀疏特征向量的标签点。 val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
注意其第一个参数为标签,第二个参数为向量。标签是用Double
类型表示的。机器学习
Python环境中:
>>> from pyspark.mllib.regression import LabeledPoint >>> from pyspark.mllib.linalg import Vectors >>> pos = LabeledPoint(1.0,Vectors.dense([1.0,2.0,3.0])) >>> neg = LabeledPoint(0.0,Vectors.dense([1.0,2.0,3.0]))
稠密矩阵的实体值以列为主要次序的形式,存放于单个 Double 型数组内。系数矩阵的非零实体以列为主要次序的形式,存放于压缩稀疏列(Compressed Sparse Column, CSC)中。例如,下面这个稠密矩阵就是存放在一维数组 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] 中,矩阵的大小为 (3, 2) 。
import org.apache.spark.mllib.linalg.{Matrix, Matrices} // 建立稠密矩阵 ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))关于稀疏矩阵的存储方式请参考: http://www.tuicool.com/articles/A3emmqi
/下列矩阵 1.0 0.0 4.0 0.0 3.0 5.0 2.0 0.0 6.0 / 若是采用稀疏矩阵存储的话,其存储信息包括: 实际存储值: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]`, 矩阵元素对应的行索引:rowIndices=[0, 2, 1, 0, 1, 2]` 列起始位置索引: `colPointers=[0, 2, 3, 6]`.则生成稀疏矩阵的方式为:
scala> val sparseMatrix= Matrices.sparse(3, 3, Array(0, 2, 3, 6), Array(0, 2, 1, 0, 1, 2), Array(1.0, 2.0, 3.0,4.0,5.0,6.0)) sparseMatrix: org.apache.spark.mllib.linalg.Matrix = 3 x 3 CSCMatrix (0,0) 1.0 (2,0) 2.0 (1,1) 3.0 (0,2) 4.0 (1,2) 5.0 (2,2) 6.0
关于更多类型介绍参考:
http://spark.apache.org/docs/1.6.1/mllib-data-types.html#local-matrix
不管是在即时的探索中,仍是在机器学习的数据理解中,基本的统计都是数据分析的重要部分。MLlib 经过mllib.stat.Statistics 类中的方法提供了几种普遍使用的统计函数,这些函数能够直接在RDD 上使用。一些经常使用的函数以下所列。
计算由向量组成的RDD 的统计性综述,保存着向量集合中每列的最小值、最大值、平均值和方差。这能够用来在一次执行中获取丰富的统计信息。
计算由向量组成的RDD 中的列间的相关矩阵,使用皮尔森相关(Pearson correlation)或斯皮尔曼相关(Spearman correlation)中的一种(method 必须是pearson 或spearman中的一个)。
计算两个由浮点值组成的RDD 的相关矩阵,使用皮尔森相关或斯皮尔曼相关中的一种(method 必须是pearson 或spearman 中的一个)。
计算由LabeledPoint 对象组成的RDD 中每一个特征与标签的皮尔森独立性测试
(Pearson’s independence test) 结果。返回一个ChiSqTestResult 对象, 其中有p 值
(p-value)、测试统计及每一个特征的自由度。标签和特征值必须是分类的(即离散值)。构建测试数据,
下面举个例子:使用三个学生的成绩Vector来构建所需的RDD Vector,这个矩阵里的每一个Vector都表明一个学生在四门课程里的分数:
Python环境下:
from pyspark.mllib.stat import Statistics from pyspark.mllib.linalg import Vectors //构建RDD basicTestRDD = sc.parallelize([Vectors.dense([60, 70, 80, 0]), Vectors.dense([80, 50, 0, 90]), Vectors.dense([60, 70, 80, 0])])
//以查看下summary里的成员,这个对象中包含了大量的统计内容 >>> print summary.mean() [ 66.66666667 63.33333333 53.33333333 30. ] >>> print summary.variance() [ 133.33333333 133.33333333 2133.33333333 2700. ] >>> print summary.numNonzeros() [ 3. 3. 2. 1.]Scala环境:
import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.rdd.RDD val array1: Array[Double] = Array[Double](60, 70, 80, 0) val array2: Array[Double] = Array[Double](80, 50, 0, 90) val array3: Array[Double] = Array[Double](60, 70, 80, 0) val denseArray1 = Vectors.dense(array1) val denseArray2 = Vectors.dense(array2) val denseArray3 = Vectors.dense(array3) val seqDenseArray: Seq[Vector] = Seq(denseArray1, denseArray2, denseArray3) val basicTestRDD: RDD[Vector] = sc.parallelize[Vector](seqDenseArray) val summary: MultivariateStatisticalSummary = Statistics.colStats(basicTestRDD)
关于更多的统计能够参考:http://spark.apache.org/docs/1.6.1/mllib-statistics.html
Python环境下:
# 读取数据文件,建立RDD dataFile = "/usr/local/spark-1.6.3-bin-hadoop2.6/data/mllib/kmeans_data.txt" lines = sc.textFile(dataFile) # 建立Vector,将每行的数据用空格分隔后转成浮点值返回numpy的array data = lines.map(lambda line: np.array([float(x) for x in line.split(' ')])) # 其中2是簇的个数 model = KMeans.train(data, 2) print("Final centers: " + str(model.clusterCenters)) print("Total Cost: " + str(model.computeCost(data)))
实验的数据咱们直接使用官方提供的数据
/usr/local/spark-1.6.3-bin-hadoop2.6/data/mllib/sample_libsvm_data.txt
。
# 加载模块 from pyspark.mllib.util import MLUtils from pyspark.mllib.classification import SVMWithSGD # 读取数据 dataFile = '/usr/local/spark-1.6.3-bin-hadoop2.6/data/mllib/sample_libsvm_data.txt' data = MLUtils.loadLibSVMFile(sc, dataFile) splits = data.randomSplit([0.8, 0.2], seed = 9L) training = splits[0].cache() test = splits[1] # 打印分割后的数据量 print "TrainingCount:[%d]" % training.count(); print "TestingCount:[%d]" % test.count(); model = SVMWithSGD.train(training, 100) scoreAndLabels = test.map(lambda point : (model.predict(point.features), point.label)) #输出结果,包含预测的数字结果和0/1结果: for score, label in scoreAndLabels.collect(): print score, label
参考资料: