Spark原理及Java操做

Spark与MapReduce

Spark 运算比 Hadoop 的 MapReduce 框架快的缘由是由于 Hadoop 在一次 MapReduce 运算以后,会将数据的运算结果从内存写入到磁盘中,第二次 Mapredue 运算时在从磁盘中读取数据,因此其瓶颈在2次运算间的多余 IO 消耗. Spark 则是将数据一直缓存在内存中,直到计算获得最后的结果,再将结果写入到磁盘,因此屡次运算的状况下, Spark 是比较快的. 其优化了迭代式工做负载。html

网上看到的图片,抄过来的:java

Spark的Master节点与Worker节点

  • Master

在集群中master节点上运行着master进程以及Driver,master进程将用户提交的job转为可并行执行的任务集Tasks。并将这些Tasks分配给集群上的Worker,Driver进程负责执行用户编写的application。算法

  • Worker

在集群中,除了master节点负责管理集群的调度之外,还有worker,worker中存在executer进程,每一个worker能够存在一个或多个executer进程,每一个executer都拥有一个线程池,线程池中的每一个线程都负责一个Task的执行。 每一个executer可并行执行的Task数量是根据cpu core来的。executer拥有多少cpu core就能同时并行执行多少Task。数组

  • Master挂了怎么办?

若是master挂了想恢复集群的正常运行,须要依靠zookeeper,zookeeper记录了集群中全部的worker、driver、application。当master挂掉后,zookeeper根据自己的选举算法,在集群中选出一个worker做为新的master。在恢复master的这段时间内,用户没法提交新的job。缓存

  • 某个worker挂了怎么办?

在集群中若是某个正在执行task任务的worker挂掉了,master会从新把该worker负责的task分配给其余worker。在worker挂掉的这段时间内,若是worker长时间(默认是60s)没有上报master心跳。master则会将该worker从集群中移除,并标识DEADapp

Spark RDD

RDD(Resilient Distributed Dataset): 一个可并行操做的有容错机制的数据集合,有 2 种方式建立 RDDs:第一种是在你的驱动程序中并行化一个已经存在的集合;另一种是引用一个外部存储系统的数据集,例如共享的文件系统,HDFS,HBase或其余 Hadoop 数据格式的数据源。框架

以上描述在实操中会更好理解。在未理解它以前把它看作是一种数据集合便可。jvm

RDD 特色

  • 它是在集群节点上的不可变的、已分区的集合对象;
  • 经过并行转换的方式来建立(如 Map、 filter、join 等);
  • 失败自动重建;
  • 能够控制存储级别(内存、磁盘等)来进行重用;
  • 必须是可序列化的;
  • 是静态类型的(只读)。

RDD的操做函数主要分为两种Transformation和Action

Transformation: 返回值为RDD,不会立刻提交任务去集群执行函数

Action: 返回值不是RDD,造成DAG图后,将任务提交到集群执行,并返回结果。oop

使用RDD操做数据

统计某个文件的总字数:

// 设置appname和master,这里设置了local[2]的意思是在本地以2个CPU核心执行这个任务。或者使用:"spark://master_hostname:7077"指定任务到远程的机器执行。
SparkConf sparkConf = new SparkConf().setAppName("firstDemo").setMaster("local[2]");
// 建立SoarkContext对象
JavaSparkContext sc = new JavaSparkContext(sparkConf);

// 读取文件
final JavaRDD<String> distFile = sc.textFile("/usr/test/test.txt");
// 统计字数
Integer wordCount = distFile.map(String::length).reduce((a, b) -> a + b)

建立RDD的两种方式:

第一种,使用RDD并行化一个已有的集合:

List<Integer> integers = Arrays.asList(1, 1, 2, 2, 3, 4, 5);
// 把数据并行化为RDD
JavaRDD<Integer> parallelize = sc.parallelize(integers);
List<Integer> collect = parallelize.distinct().collect();

第二种,使用外部数据建立RDD。好比本地磁盘、HDFS、HBase等

// Spark能够根据本地文件系统、HDFS、Hbase等做为数据源,而后进行操做。
// 指定不一样的数据源,只须要有对应的uri便可好比hdfs的:"hdfs://"
JavaRDD<String> distFile = sc.textFile("/usr/test/test.txt");
Integer wordCount = distFile.map(String::length).reduce((a, b) -> a + b);

Spark的懒加载

spark的懒加载与scala的lazy val是有关系的。scala Lazy vals介绍

咱们把一个文件读取出来:

JavaRDD<String> lazyLoad = sc.textFile("/usr/test/test.txt");

按照通常程序的执行流程,执行这种操做spark会立马把数据从磁盘中读取出来放到内存中。可是事实却不是这样的,这里的操做,只是把地址映射了起来,并无去把它加载到内存中去。

对数据进行map操做,转换为另一个RDD:

JavaRDD<Integer> lineLengths = lazyLoad.map(String::length)

这里的操做也没有真正去执行,只是定义了把JavaRDD<String>转换为JavaRDD<Integer>的操做。

最后统计出test.txt的字数:

Integer count = lineLengths.reduce((a, b) -> a + b)

该操做是真正的计算,当spark的操做到这步时,才会真正将计算分解在集群中的机器上运行。

将RDD操做保存到内存中,供下次使用:

lineLengths.persist(StorageLevel.MEMORY_ONLY())

Spark 内存分配及缓存机制

spark支持将某次的RDD操做保存到内存中,以便以后其余操做复用该RDD的数据。这样使得以后的操做更快,由于复用的数据不须要从新计算,直接从缓存中取便可。若是在内存分区中,缓存的RDD数据丢失,spark会执行RDD从新计算,并放到缓存中。

当咱们在代码中执行了cache/persist等持久化操做时,spark会根据咱们设置的缓存级别不一样,每一个task计算出来的数据会保存到task所在节点的内存或磁盘中。

主要分为三块:

  1. task在执行咱们写的代码时占用到的内存,默认占总内存的20%
  2. Task经过shuffle过程拉取了上一个stage的Task的输出后,进行聚合等操做时使用的内存,默认也是占总内存的20%
  3. RDD持久化使用到的内存总共占60%
  • spark RDD一共有如下几种缓存级别
Storage Level description
MEMORY_ONLY 默认级别,将RDD操做做为序列化的java对象存储在jvm中,若是内存放不下所有的RDD操做。那么没法缓存的RDD操做在下次须要时再从新计算,而已经缓存的部分就直接使用。该级别只使用内存,不使用磁盘。效率很是高。
MEMORY_AND_DISK 将RDD操做做为序列化的java对象存储在jvm中,若是内存放不下所有的RDD操做。那么没法缓存的RDD操做会持久化到磁盘上,并在须要时从磁盘中取出来。该级别须要使用的内存和磁盘。效率中等
MEMORY_ONLY_SER 将RDD存储为序列化Java对象(每一个分区一个字节的数组),与反序列化对象相比,它更节省空间,特别是当它使用快速序列化器时。但它增长了CPU的开销。在此级别中,存储空间较小,CPU计算时间较长,数据存储在内存中。它不使用磁盘。
MEMORY_AND_DISK_SER MEMORY_ONLY_SER,但它会将不适合内存的分区丢弃到磁盘,而不是每次须要时从新计算。在此存储级别中,用于存储的空间较低,CPU计算时间较长,它使用内存和磁盘存储。
DISK_ONLY 在此存储级别中,RDD仅存储在磁盘上。用于存储的空间很小,CPU计算时间很长,而且它利用磁盘存储。
相关文章
相关标签/搜索