MLlib支持存储在单机上的local vectors和metrices,也支持分布式的matrics(背后经过一或多个RDD实现)。
local vectors和local matrices都是简单数据类型,做为公共接口使用。
底层的线性算法操做则由Breeze和jblas来实现。MLlib中,监督学习的一个训练样本,被称为“labeled point”。html
存储在单机上的local vector,由一个整数类型的从0开始的索引(indice),double类型的值(value)组成。MLlib支持两种类型的local vectors: dense和sparse。dense vector 背后经过一个double array来表示它的条目值,而sparse vector则由两个并列数组实现:索引(indices)和值(values)。例如,一个vector(1.0, 0.0, 3.0),能够表示成dense格式:[1.0, 0.0, 3.0],也能够表示成sparse格式:(3, [0, 2], [1.0, 3.0]),其中,3就是vector的size。算法
import org.apache.spark.mllib.linalg.{Vector, Vectors} // Create a dense vector (1.0, 0.0, 3.0). val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries. val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries. val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
local matrix由整数型的行索引、列索引(indices),以及浮点型的值(values)组成,存储在单机上。MLlib支持dense matrices,它的条目值存储在单个double array上,以列为主(column-major)的顺序。而sparse matrices,它是非零条目值以压缩稀疏列(CSC: Compressed Sparse Column)的格式存储,以列为主(column-major)的顺序。apache
local matrices的基类是Matrix,它提供了两种实现:DenseMatrix和SparseMatrix. 咱们推荐使用Matrices的工厂方法来建立local matrices。记住,MLlib的local matrices被存成以列为主的顺序。后端
import org.apache.spark.mllib.linalg.{Matrix, Matrices} // Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) // https://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.mllib.linalg.Matrices$ // Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) // colPtrs: Array[Int], rowIndices: Array[Int] val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
distributed matrix由long型的行索引和列索引,以及double型的值组成,以一或多个RDD的方式分布式存储。选择合适的格式来存储分布式大矩阵至关重要。将一个分布式矩阵转换成一个不一样的格式,可能须要一个全局的shuffle,计算代价高昂!至今支持三种类型的分布式矩阵。api
基本类型被称为RowMatrix。数组
一个RowMatrix是以行为主的分布式矩阵,它没有行索引,只是一个特征向量的集合。背后由一个包含这些行的RDD实现,其中,每一个行(row)都是一个local vector。咱们假设,对于一个RowMatrix,列的数目并不大,于是,单个local vector能够合理地与driver进行通讯,也可使用单个节点被存储/操做。app
一个IndexedRowMatrix与一个RowMatrix类似,但有行索引,它能够被用来标识出行(rows)以及正在执行的join操做(executing joins)。分布式
一个CoordinateMatrix是一个分布式矩阵,以coordinate list(COO)的格式进行存储,后端由一个包含这些条目的RDD实现。学习
注意:spa
一个分布式矩阵的底层RDD实现必须是肯定的(deterministic),由于咱们会对matrix size进行cache。总之,使用非肯定的RDD会致使errors。
rowMatrix是面向row的分布式矩阵,没有行索引,背后由一个包含这些行的RDD实现,基中,每一个行是一个local vector。由于每一个row都被表示成一个local vector,列的数目被限制在一个整数范围内,实际使用时会更小。
RowMatrix能够经过一个RDD[Vector]实例被建立。接着,咱们能够计算它的列概括统计(column summary statistics),以及分解(decompositions)。QR deceompsition的格式:A=QR,其中Q是一个正交矩阵(orthogonal matrix),R是一个上三角矩阵(upper triangular matrix)。对于SVD和PCA,请参考降维这一节。
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.distributed.RowMatrix val rows: RDD[Vector] = ... // an RDD of local vectors // Create a RowMatrix from an RDD[Vector]. val mat: RowMatrix = new RowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols() // QR decomposition val qrResult = mat.tallSkinnyQR(true)
IndexedRowMatrix与RowMatrix类似,但有行索引。它背后由一个带索引的行的RDD实现,所以,每一个行能够被表示成long型索引和local vector。
一个IndexedRowMatrix由RDD[IndexedRow]实例实现,其中,IndexedRow是一个在(Long,Vector)上的封装wrapper。IndexedRowMatrix能够被转换成一个RowMatrix,经过drop掉它的行索引来完成。
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} val rows: RDD[IndexedRow] = ... // an RDD of indexed rows // Create an IndexedRowMatrix from an RDD[IndexedRow]. val mat: IndexedRowMatrix = new IndexedRowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols() // Drop its row indices. val rowMat: RowMatrix = mat.toRowMatrix()
CoordinateMatrix是一个分布式矩阵,背后由一个包含这些条目(entries)的RDD实现。每一个条目(entry)是一个三元组(tuple):(i: Long, j: Long, value: Double), 其中: i是行索引,j是列索引,value是entry value。当矩阵的维度很大,而且很稀疏时,推荐使用CoordinateMatrix。
CoordinateMatrix能够经过一个RDD[MatrixEntry]实例来建立,其中MatrixEntry是一个(Long,Long,Double)的Wrapper。经过调用toIndexedRowMatrix,一个CoordinateMatrix能够被转化成一个带有稀疏行的IndexedRowMatrix。CoordinateMatrix的其它计算目前不支持。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries // Create a CoordinateMatrix from an RDD[MatrixEntry]. val mat: CoordinateMatrix = new CoordinateMatrix(entries) // Get its size. val m = mat.numRows() val n = mat.numCols() // Convert it to an IndexRowMatrix whose rows are sparse vectors. val indexedRowMatrix = mat.toIndexedRowMatrix()
BlockMatrix是一个分布式矩阵,背后由一个MatrixBlocks的RDD实现,其中MatrixBlock是一个tuple: ((Int,Int),Matrix),其中(Int,Int)是block的索引,Matrix是由rowsPerBlock x colsPerBlock的sub-matrix。BlockMatrix支持方法:add和multiply。BlockMatrix也有一个helper function:validate,它能够被用于确认BlockMatrix的设置是否正确。
BlockMatrix 能够由一个IndexedRowMatrix或CoordinateMatrix,经过调用toBlockMatrix很容易地建立。toBlockMatrix缺省会建立1024x1024的blocks。能够经过toBlockMatrix(rowsPerBlock, colsPerBlock)进行修改。
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries // Create a CoordinateMatrix from an RDD[MatrixEntry]. val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) // Transform the CoordinateMatrix to a BlockMatrix val matA: BlockMatrix = coordMat.toBlockMatrix().cache() // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid. // Nothing happens if it is valid. matA.validate() // Calculate A^T A. val ata = matA.transpose.multiply(matA)