Spark 云计算 ML 机器学习教程 以及 SPARK使用教程


spark Core的使用基础知识java

     rdd为spark的一个分布式数据源的计算的抽象sql

     sparkContext为spark环境上下文用于保持集群链接,建立RDD 并行数据 accumular boardcast变量 用户建立spark job做业apache

    

SparkConf conf = new SparkConf().setAppName("indeximage").setMaster("local");
  JavaSparkContext context = new JavaSparkContext(conf);
  JavaPairRDD<String, PortableDataStream> imagefiles=context.binaryFiles("C:/baidu/features/sift", 2);

  RDD是个分布式不变的抽象数据计算源 ,被划分多个分区,并在多台机器上分布计算,懒加载模式,当rdd开始计算时候才会加载来源使用,因此使用时候rdd 能够设置store级别,能够存储在内存 磁盘 或者 系列化 等几种方式 默认cache就是存储在内存中 其余几种见storagelevel数组

JavaRDD<File> imagefiles=context.parallelize(getFiles("C:/baidu/imagetest/"), 2);//.binaryFiles("C:/baidu/imagetest/*").coalesce(1);
  imagefiles.persist(StorageLevel.MEMORY_ONLY());

它可以从多种数据来源加载 经常使用的数组 文件 以及 hadoop格式文件(能够用来自定义后面叫你们建立自定义)dom

context.textFile(path)
机器学习

context.paralllize/binarryFile分布式


spark有不少行为操做,这里不一一介绍了 只介绍 几种经常使用的 ,好比ide

 1,map  map其实就是一个转换 从一个格式 转换从 另外一种格式的操做 oop

 2,flatMap 同map基本一致,可是flatmap 返回的是一个数组的格式 学习

 3.filter 主要是用来过滤掉rdd中 的数据

 另外javaRDD 与 javaPairRDD 基本无非就是List 与 map的区别,分布式上 key value的转换

因此会javaPairRDD会涉及一些group by key combie key value的操做

 因为就是key value  map能够至关于list 因此我这里只讲一下List的状况,其余都是相似的方法加上byKey values等等关键字

 

distinct()  同sql 同样 去重 
union()  由于rdd是不可变 因此外部添加联合
intersection() 交集    
subtract() 差集
cartesian() 笛卡尔积
reduce() reduce运算 把多个rdd的的元素最后合并成一个值 
aggregate()/fold() 同上面相似,不过能够设定初值 以及 聚合后的值类型能够与合并前的不同,好比多个integer 聚合成double
collect()  把RDD中全部元素 最后聚合汇总到主分区中 返回List 或者 map
foreach 故名思意 遍历 rdd的元素 并进行操做,好比遍历元素把它存储到Hbase中
cogroup、join 前者就是全链接 后者 就是内关联
count 同sql同样统计数目
coalesce、repartition 意义同样 都是设定RDD分区数目

Spark 计数器 同hadoop中的couter差很少 ,不过主要是外部最后使用 ,内部中不能使用实时的值,智能使用localValue

Accumulator<Integer> counter = sc.accumulator(0);
counter.add(1);

spark 共享数据变量

不少状况须要数据变量进行共享

Broadcast<Object> share = sc.broadcast(object)
发送广播通知 会实时更新,在内部能够获取调用

Spark的MLIB的使用基础知识

本地向量

        本地向量的基类是 Vector,咱们提供了两个实现 DenseVector 和 SparseVector。咱们建议经过 Vectors中实现的工厂方法来建立本地向量:(注意:Scala语言默认引入的是 scala.collection.immutable.Vector,为了使用MLlib的Vector,你必须显示引入org.apache.spark.mllib.linalg.Vector。)

import org.apache.spark.mllib.linalg.{Vector, Vectors}
 
// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
 
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values
corresponding to nonzero entries.
 
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
 
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

2.含类标签的点


含有类标签的点经过case class LabeledPoint来表示。

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
 
// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
 
// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))



3.稀疏数据Sparse data


        实际运用中,稀疏数据是很常见的。MLlib能够读取以LIBSVM格式存储的训练实例,LIBSVM格式是 LIBSVM 和 LIBLINEAR的默认格式,这是一种文本格式,每行表明一个含类标签的稀疏特征向量。格式以下:


label index1:value1 index2:value2 ...


        索引是从 1 开始而且递增。加载完成后,索引被转换为从 0 开始。


        经过 MLUtils.loadLibSVMFile读取训练实例并以LIBSVM 格式存储。

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
 
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")



4.本地矩阵


        一个本地矩阵由整型的行列索引数据和对应的 double 型值数据组成,存储在某一个机器中。MLlib 支持密集矩阵(暂无稀疏矩阵!),实体值以列优先的方式存储在一个 double数组中。


        本 地 矩 阵 的 基 类 是 Matrix , 我 们 提 供 了 一 个 实 现 DenseMatrix 。 我 们 建 议 经过 Matrices 中实现的工厂方法来建立本地矩阵:


import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
 
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))



5.分布式矩阵


        一个分布式矩阵由 long 型行列索引数据和对应的 double 型值数据组成,分布式存储在一个或多个 RDD 中。对于巨大的分布式的矩阵来讲,选择正确的存储格式很是重要。将一个分布式矩阵转换为另外一个不一样格式须要全局洗牌(shuffle),因此代价很高。目前,实现了三类分布式矩阵存储格式。最基本的类型是 RowMatrix。一个 RowMatrix 是一个面向行的分布式矩阵,其行索引是没有具体含义的。好比一系列特征向量的一个集合。经过一个 RDD 来表明全部的行,每一行就是一个本地向量。对于 RowMatrix,咱们假定其列数量并不巨大,因此一个本地向量能够恰当的与驱动节点(driver)交换信息,而且可以在某一节点中存储和操做。


         IndexedRowMatrix 与 RowMatrix 类似,但有行索引,能够用来识别行和进行 join 操做。而 CoordinateMatrix 是一个以三元组列表格式(coordinate list ,COO)存储的分布式矩阵,其实体集合是一个 RDD。注 意 : 因 为 我 们 需 要 缓 存 矩 阵 大 小 , 分 布 式 矩 阵 的 底 层 RDD 必 须 是 确 定 的(deterministic)。一般来讲,使用非肯定的 RDD(non-deterministic RDDs)会致使错误。



5.1 面向行的分布式矩阵(RowMatrix)


        一个 RowMatrix 是一个面向行的分布式矩阵,其行索引是没有具体含义的。好比一系列特征向量的一个集合。经过一个 RDD 来表明全部的行,每一行就是一个本地向量。既然每一行由一个本地向量表示,因此其列数就被整型数据大小所限制,其实实践中列数是一个很小的数值。


       一个 RowMatrix可从一个RDD[Vector]实例建立。而后咱们能够计算出其概要统计信息。


import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
 
val rows: RDD[Vector] = ... // an RDD of local vectors
 
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)
 
// Get its size.
val m = mat.numRows()
val n = mat.numCols()


5.2行索引矩阵(IndexedRowMatrix)


         IndexedRowMatrix 与 RowMatrix 类似,但其行索引具备特定含义,本质上是一个含有索引信息的行数据集合(an RDD of indexed rows)。每一行由 long 型索引和一个本地向量组成。一个 IndexedRowMatrix可从一个RDD[IndexedRow]实例建立,这里的 IndexedRow是 (Long, Vector) 的 封 装 类 。 剔 除 IndexedRowMatrix 中 的 行 索 引 信 息 就 变 成 一 个RowMatrix。


import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
 
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
 
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
 
// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix(

)



5.3三元组矩阵(CoordinateMatrix)


          一个 CoordinateMatrix 是一个分布式矩阵,其实体集合是一个 RDD。每个实体是一个(i: Long, j: Long, value: Double)三元组,其中 i 表明行索引,j 表明列索引,value 表明实体的值。只有当矩阵的行和列都很巨大,而且矩阵很稀疏时才使用 CoordinateMatrix。


          一个 CoordinateMatrix可从一个RDD[MatrixEntry]实例建立,这里的 MatrixEntry是 (Long, Long, Double) 的 封 装 类 。 通 过 调 用 toIndexedRowMatrix 可 以 将 一 个CoordinateMatrix转变为一个IndexedRowMatrix(但其行是稀疏的)。目前暂不支持其余计算操做。

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
 
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)
 
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
 
// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()


Spark MLIB测试基本使用 




理解了vector 就是double[] 而后咱们就好办了 只须要double[] 转为vector 或者加上标签的labelPoint 带入spark 的Mlib包的机器学习的类 就好了 下面咱们看看kmean聚类使用。其余相似

 

static{
		System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
	}
	final static FeatureDetector detector = FeatureDetector.create(FeatureDetector.ORB);//ORB
	final static DescriptorExtractor extractor = DescriptorExtractor.create(DescriptorExtractor.ORB);	//BRIEF	
public static List<double[]> readFeatureByStream(DataInputStream open) throws Exception {
		MatOfKeyPoint keypoints=new MatOfKeyPoint();
		
		Mat mat=OpenCVUtil.bufferedImageToMat(ImageIO.read(open));
		Mat descriptors=new Mat();
		detector.detect(mat, keypoints);
//		List<KeyPoint> referenceKeypointsList =
//	            keypoints.toList();
        extractor.compute(mat, keypoints, descriptors);
        int numPoints = (int) keypoints.rows();
        int descrpnum=(int) descriptors.rows();

       // double[][] descriptions = new double[numPoints][descrpnum];
        List<double[]> descriptions=Lists.newArrayList();
        System.out.println(numPoints+"=============="+descrpnum+"=================="+descriptors.rows()+"=================="+descriptors.cols());
        for (int i = 0; i < descriptors.rows(); i++) {
	    	int cols=descriptors.cols();
	    	double[] desc=new double[cols];
	    	for (int j = 0; j < cols; j++) {
				desc[j]=descriptors.get(i, j)[0];
			}
	    	//descriptions[i]=desc;
	    	descriptions.add(desc);
		}
		
		
		return descriptions;
	}
 
  SparkConf conf = new SparkConf().setAppName("indeximage").setMaster("local");
  
   
  JavaSparkContext context = new JavaSparkContext(conf);
  JavaRDD<File> imagefiles=context.parallelize(getFiles("C:/baidu/imagetest/"), 2);//.binaryFiles("C:/baidu/imagetest/*").coalesce(1);
  imagefiles.persist(StorageLevel.MEMORY_ONLY());
  JavaRDD<Vector> vectors=imagefiles.map(new Function<File, List<Vector>>() {
@Override
public List<Vector> call(File v1)
throws Exception {
try{
List<Vector> sample=Lists.newArrayList();
final List<double[]> fkeys =readFeatureByFile(v1);
final int[] indices = RandomData.getUniqueRandomInts((int) (fkeys.size() * 0.1f), 0,
fkeys.size());
for (int i : indices) {
sample.add(Vectors.dense(fkeys.get(i)));
}
return sample;
}catch(Exception e){
e.printStackTrace();
return null;
}
}
}).flatMap(new FlatMapFunction<List<Vector>, Vector>() { //这里多余的其实不须要这样写,我为了演示flatMap的用法
@Override
public Iterable<Vector> call(
List<Vector> t) throws Exception {
return t;
}
});//.repartition(1);
    vectors.persist(StorageLevel.MEMORY_ONLY());
int numClusters = 64;
    int numIterations = 1000;
    long startTime = System.nanoTime();
//    BisectingKMeans kmeans=new BisectingKMeans();
//    kmeans.setK(64);
//    kmeans.setMaxIterations(100);
//    kmeans.setMinDivisibleClusterSize(1.0);
//    BisectingKMeansModel clusters=kmeans.run(vectors.rdd());
    KMeansModel clusters = KMeans.train(vectors.rdd(), numClusters, numIterations);
    //KMeansModel clusters = KMeans.train(vectors.rdd(), numClusters, numIterations);
    System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
    double WSSSE = clusters.computeCost(vectors.rdd());
    long endTime = System.nanoTime();
    System.out.println("Execution Time: " + (endTime - startTime)/1000000 + " ms");
    System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$聚类成功¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥¥"+clusters.k()+"=============");
Vector[] vs=clusters.clusterCenters();

   下面运行结果展现 

这是我本地刚刚运行的结果。,。。spark很方便 跟storm 同样 本地 调试很是方便快捷。

    

相关文章
相关标签/搜索