Spark 运算比 Hadoop 的 MapReduce 框架快的缘由是由于 Hadoop 在一次 MapReduce 运算以后,会将数据的运算结果从内存写入到磁盘中,第二次 Mapredue 运算时在从磁盘中读取数据,因此其瓶颈在2次运算间的多余 IO 消耗. Spark 则是将数据一直缓存在内存中,直到计算获得最后的结果,再将结果写入到磁盘,因此屡次运算的状况下, Spark 是比较快的. 其优化了迭代式工做负载。html
网上看到的图片,抄过来的:java
在集群中master节点上运行着master进程以及Driver,master进程将用户提交的job转为可并行执行的任务集Tasks。并将这些Tasks分配给集群上的Worker,Driver进程负责执行用户编写的application。算法
在集群中,除了master节点负责管理集群的调度之外,还有worker,worker中存在executer进程,每一个worker能够存在一个或多个executer进程,每一个executer都拥有一个线程池,线程池中的每一个线程都负责一个Task的执行。 每一个executer可并行执行的Task数量是根据cpu core来的。executer拥有多少cpu core就能同时并行执行多少Task。数组
若是master挂了想恢复集群的正常运行,须要依靠zookeeper,zookeeper记录了集群中全部的worker、driver、application。当master挂掉后,zookeeper根据自己的选举算法,在集群中选出一个worker做为新的master。在恢复master的这段时间内,用户没法提交新的job。缓存
在集群中若是某个正在执行task任务的worker挂掉了,master会从新把该worker负责的task分配给其余worker。在worker挂掉的这段时间内,若是worker长时间(默认是60s)没有上报master心跳。master则会将该worker从集群中移除,并标识DEADapp
RDD(Resilient Distributed Dataset): 一个可并行操做的有容错机制的数据集合,有 2 种方式建立 RDDs:第一种是在你的驱动程序中并行化一个已经存在的集合;另一种是引用一个外部存储系统的数据集,例如共享的文件系统,HDFS,HBase或其余 Hadoop 数据格式的数据源。框架
以上描述在实操中会更好理解。在未理解它以前把它看作是一种数据集合便可。jvm
Transformation: 返回值为RDD,不会立刻提交任务去集群执行函数
Action: 返回值不是RDD,造成DAG图后,将任务提交到集群执行,并返回结果。oop
统计某个文件的总字数:
// 设置appname和master,这里设置了local[2]的意思是在本地以2个CPU核心执行这个任务。或者使用:"spark://master_hostname:7077"指定任务到远程的机器执行。 SparkConf sparkConf = new SparkConf().setAppName("firstDemo").setMaster("local[2]"); // 建立SoarkContext对象 JavaSparkContext sc = new JavaSparkContext(sparkConf); // 读取文件 final JavaRDD<String> distFile = sc.textFile("/usr/test/test.txt"); // 统计字数 Integer wordCount = distFile.map(String::length).reduce((a, b) -> a + b)
第一种,使用RDD并行化一个已有的集合:
List<Integer> integers = Arrays.asList(1, 1, 2, 2, 3, 4, 5); // 把数据并行化为RDD JavaRDD<Integer> parallelize = sc.parallelize(integers); List<Integer> collect = parallelize.distinct().collect();
第二种,使用外部数据建立RDD。好比本地磁盘、HDFS、HBase等
// Spark能够根据本地文件系统、HDFS、Hbase等做为数据源,而后进行操做。 // 指定不一样的数据源,只须要有对应的uri便可好比hdfs的:"hdfs://" JavaRDD<String> distFile = sc.textFile("/usr/test/test.txt"); Integer wordCount = distFile.map(String::length).reduce((a, b) -> a + b);
spark的懒加载与scala的lazy val是有关系的。scala Lazy vals介绍
咱们把一个文件读取出来:
JavaRDD<String> lazyLoad = sc.textFile("/usr/test/test.txt");
按照通常程序的执行流程,执行这种操做spark会立马把数据从磁盘中读取出来放到内存中。可是事实却不是这样的,这里的操做,只是把地址映射了起来,并无去把它加载到内存中去。
对数据进行map操做,转换为另一个RDD:
JavaRDD<Integer> lineLengths = lazyLoad.map(String::length)
这里的操做也没有真正去执行,只是定义了把
JavaRDD<String>
转换为JavaRDD<Integer>
的操做。
最后统计出test.txt的字数:
Integer count = lineLengths.reduce((a, b) -> a + b)
该操做是真正的计算,当spark的操做到这步时,才会真正将计算分解在集群中的机器上运行。
将RDD操做保存到内存中,供下次使用:
lineLengths.persist(StorageLevel.MEMORY_ONLY())
spark支持将某次的RDD操做保存到内存中,以便以后其余操做复用该RDD的数据。这样使得以后的操做更快,由于复用的数据不须要从新计算,直接从缓存中取便可。若是在内存分区中,缓存的RDD数据丢失,spark会执行RDD从新计算,并放到缓存中。
当咱们在代码中执行了cache/persist等持久化操做时,spark会根据咱们设置的缓存级别不一样,每一个task计算出来的数据会保存到task所在节点的内存或磁盘中。
主要分为三块:
Storage Level | description |
---|---|
MEMORY_ONLY | 默认级别,将RDD操做做为序列化的java对象存储在jvm中,若是内存放不下所有的RDD操做。那么没法缓存的RDD操做在下次须要时再从新计算,而已经缓存的部分就直接使用。该级别只使用内存,不使用磁盘。效率很是高。 |
MEMORY_AND_DISK | 将RDD操做做为序列化的java对象存储在jvm中,若是内存放不下所有的RDD操做。那么没法缓存的RDD操做会持久化到磁盘上,并在须要时从磁盘中取出来。该级别须要使用的内存和磁盘。效率中等 |
MEMORY_ONLY_SER | 将RDD存储为序列化Java对象(每一个分区一个字节的数组),与反序列化对象相比,它更节省空间,特别是当它使用快速序列化器时。但它增长了CPU的开销。在此级别中,存储空间较小,CPU计算时间较长,数据存储在内存中。它不使用磁盘。 |
MEMORY_AND_DISK_SER | MEMORY_ONLY_SER,但它会将不适合内存的分区丢弃到磁盘,而不是每次须要时从新计算。在此存储级别中,用于存储的空间较低,CPU计算时间较长,它使用内存和磁盘存储。 |
DISK_ONLY | 在此存储级别中,RDD仅存储在磁盘上。用于存储的空间很小,CPU计算时间很长,而且它利用磁盘存储。 |