Spark学习笔记java
Spark简介node
spark 能够很容易和yarn结合,直接调用HDFS、Hbase上面的数据,和hadoop结合。配置很容易。算法
spark发展迅猛,框架比hadoop更加灵活实用。减小了延时处理,提升性能效率实用灵活性。也能够与hadoop切实相互结合。sql
spark核心部分分为RDD。Spark SQL、Spark Streaming、MLlib、GraphX、Spark R等核心组件解决了不少的大数据问题,其完美的框架日受欢迎。其相应的生态环境包括zepplin等可视化方面,正日益壮大。大型公司争相实用spark来代替原有hadoop上相应的功能模块。Spark读写过程不像hadoop溢出写入磁盘,都是基于内存,所以速度很快。另外DAG做业调度系统的宽窄依赖让Spark速度提升。数据库
Spark核心组成编程
1、RDD数组
是弹性分布式数据集,彻底弹性的,若是数据丢失一部分还能够重建。有自动容错、位置感知调度和可伸缩性,经过数据检查点和记录数据更新金象容错性检查。经过SparkContext.textFile()加载文件变成RDD,而后经过transformation构建新的RDD,经过action将RDD存储到外部系统。缓存
RDD使用延迟加载,也就是懒加载,只有当用到的时候才加载数据。若是加载存储全部的中间过程会浪费空间。所以要延迟加载。一旦spark看到整个变换链,他能够计算仅需的结果数据,若是下面的函数不须要数据那么数据也不会再加载。转换RDD是惰性的,只有在动做中才可使用它们。网络
Spark分为driver和executor,driver提交做业,executor是application早worknode上的进程,运行task,driver对应为sparkcontext。Spark的RDD操做有transformation、action。Transformation对RDD进行依赖包装,RDD所对应的依赖都进行DAG的构建并保存,在worknode挂掉以后除了经过备份恢复还能够经过元数据对其保存的依赖再计算一次获得。看成业提交也就是调用runJob时,spark会根据RDD构建DAG图,提交给DAGScheduler,这个DAGScheduler是在SparkContext建立时一同初始化的,他会对做业进行调度处理。当依赖图构建好之后,从action开始进行解析,每个操做做为一个task,每遇到shuffle就切割成为一个taskSet,并把数据输出到磁盘,若是不是shuffle数据还在内存中存储。就这样再往前推动,直到没有算子,而后运行从前面开始,若是没有action的算子在这里不会执行,直到遇到action为止才开始运行,这就造成了spark的懒加载,taskset提交给TaskSheduler生成TaskSetManager而且提交给Executor运行,运行结束后反馈给DAGScheduler完成一个taskSet,以后再提交下一个,当TaskSet运行失败时就返回DAGScheduler并从新再次建立。一个job里面可能有多个TaskSet,一个application可能包含多个job。app
二、Spark Streaming
经过对kafka数据读取,将Stream数据分红小的时间片断(几秒),以相似batch批处理的方式来处理这一部分小数据,每一个时间片生成一个RDD,有高效的容错性,对小批量数据能够兼容批量实时数据处理的逻辑算法,用一些历史数据和实时数据联合进行分析,好比分类算法等。也能够对小批量的stream进行mapreduce、join等操做,而保证其实时性。针对数据流时间要求不到毫秒级的工程性问题均可以。
Spark Streaming也有一个StreamingContext,其核心是DStream,是经过以组时间序列上的连续RDD来组成的,包含一个有Time做为key、RDD做为value的结构体,每个RDD都包含特定时间间隔的数据流,能够经过persist将其持久化。在接受不断的数据流后,在blockGenerator中维护一个队列,将流数据放到队列中,等处理时间间隔到来后将其中的全部数据合并成为一个RDD(这一间隔中的数据)。其做业提交和spark类似,只不过在提交时拿到DStream内部的RDD并产生Job提交,RDD在action触发以后,将job提交给jobManager中的JobQueue,又jobScheduler调度,JobScheduler将job提交到spark的job调度器,而后将job转换成为大量的任务分发给spark集群执行。Job从outputStream中生成的,而后触发反向回溯执行DStreamDAG。在流数据处理的过程当中,通常节点失效的处理比离线数据要复杂。Spark streamin在1.3以后能够周期性的将DStream写入HDFS,同时将offset也进行存储,避免写到zk。一旦主节点失效,会经过checkpoint的方式读取以前的数据。当worknode节点失效,若是HDFS或文件做为输入源那Spark会根据依赖关系从新计算数据,若是是基于Kafka、Flume等网络数据源spark会将手机的数据源在集群中的不一样节点进行备份,一旦有一个工做节点失效,系统可以根据另外一份还存在的数据从新计算,可是若是接受节点失效会丢失一部分数据,同时接受线程会在其余的节点上从新启动并接受数据。
三、Graphx
主要用于图的计算。核心算法有PageRank、SVD奇异矩阵、TriangleConut等。
四、Spark SQL
是Spark新推出的交互式大数据SQL技术。把sql语句翻译成Spark上的RDD操做能够支持Hive、Json等类型的数据。
五、Spark R
经过R语言调用spark,目前不会拥有像Scala或者java那样普遍的API,Spark经过RDD类提供Spark API,而且容许用户使用R交互式方式在集群中运行任务。同时集成了MLlib机器学习类库。
六、MLBase
从上到下包括了MLOptimizer(给使用者)、MLI(给算法使用者)、MLlib(给算法开发者)、Spark。也能够直接使用MLlib。ML Optimizer,一个优化机器学习选择更合适的算法和相关参数的模块,还有MLI进行特征抽取和高级ML编程 抽象算法实现API平台,MLlib分布式机器学习库,能够不断扩充算法。MLRuntime基于spark计算框架,将Spark的分布式计算应用到机器学习领域。MLBase提供了一个简单的声明方法指定机器学习任务,而且动态地选择最优的学习算法。
七、Tachyon
高容错的分布式文件系统。宣称其性能是HDFS的3000多倍。有相似java的接口,也实现了HDFS接口,因此Spark和MR程序不须要任何的修改就能够运行。目前支持HDFS、S3等。
八、Spark算子
一、Map。对原数据进行处理,相似于遍历操做,转换成MappedRDD,原分区不变。
二、flatMap。将原来的RDD中的每个元素经过函数转换成新的元素,将RDD的每一个集合中的元素合并成一个集合。好比一个元素里面多个list,经过这个函数都合并成一个大的list,最经典的就是wordcount中将每一行元素进行分词之后成为,经过flapMap变成一个个的单词,line.flapMap(_.split(“ ”)).map((_,1))若是经过map就会将一行的单词变成一个list。
三、mapPartitions。对每一个分区进行迭代,生成MapPartitionsRDD。
四、Union。是将两个RDD合并成一个。使用这个函数要保证两个RDD元素的数据类型相同,返回的RDD的数据类型和被合并的RDD数据类型相同。
五、Filter。其功能是对元素进行过滤,对每一个元素调用f函数,返回值为true的元素就保留在RDD中。
六、Distinct。对RDD中元素进行去重操做。
七、Subtract。对RDD1中取出RDD1与RDD2交集中的全部元素。
八、Sample。对RDD中的集合内元素进行采样,第一个参数withReplacement是true表示有放回取样,false表示无放回。第二个参数表示比例,第三个参数是随机种子。如data.sample(true, 0.3,new Random().nextInt())。
九、takeSample。和sample用法相同,只不第二个参数换成了个数。返回也不是RDD,而是collect。
十、Cache。将RDD缓存到内存中。至关于persist(MEMORY_ONLY)。能够经过参数设置缓存和运行内存之间的比例,若是数据量大于cache内存则会丢失。
十一、Persist。里面参数能够选择DISK_ONLY/MEMORY_ONLY/MEMORY_AND_DISK等,其中的MEMORY_AND_DISK当缓存空间满了后自动溢出到磁盘。
十二、MapValues。针对KV数据,对数据中的value进行map操做,而不对key进行处理。
1三、reduceByKey。针对KV数据将相同key的value聚合到一块儿。与groupByKey不一样,会进行一个相似mapreduce中的combine操做,减小相应的数据IO操做,加快效率。若是想进行一些非叠加操做,咱们能够将value组合成字符串或其余格式将相同key的value组合在一块儿,再经过迭代,组合的数据拆开操做。
1四、partitionBy。能够将RDD进行分区,从新生成一个ShuffleRDD,进行一个shuffle操做,对后面进行频繁的shuffle操做能够加快效率。
1五、randomSplit。对RDD进行随机切分。如data.randomSplit(new double[]{0.7, 0.3})返回一个RDD的数组。
1六、Cogroup。对两个RDD中的KV元素,每一个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不一样的是针对两个RDD中相同的key的元素进行合并。
1七、Join。至关于inner join。对两个须要链接的RDD进行cogroup,而后对每一个key下面的list进行笛卡尔积的操做,输出两两相交的两个集合做为value。 至关于sql中where a.key=b.key。
1八、leftOutJoin,rightOutJoin。在数据库中左链接以左表为坐标将表中全部的数据列出来,右面不存在的用null填充。在这里面对join的基础上判断左侧的RDD元素是不是空,若是是空则填充。右链接则相反。
1九、saveAsTestFile。将数据输出到HDFS的指定目录。
20、saveAsObjectFile。写入HDFS为SequenceFile格式。
2一、Collect、collectAsMap。将RDD转换成list或者Map。结果以List或者HashMap的方式输出。
2二、Count。对RDD的元素进行统计,返回个数。
2三、Top(k)。返回最大的k个元素,返回List的形式。
2四、Take返回数据的前k个元素。
2五、takeOrdered。返回数据的最小的k个元素,并在返回中保持元素的顺序。
9、Tips
1、RDD.repartition(n)能够在最初对RDD进行分区操做,这个操做其实是一个shuffle,可能比较耗时,可是若是以后的action比较多的话,能够减小下面操做的时间。其中的n值看cpu的个数,通常大于2倍cpu,小于1000。
2、Action不可以太多,每一次的action都会将以上的taskset划分一个job,这样当job增多,而其中task并不释放,会占用更多的内存,使得gc拉低效率。
3、在shuffle前面进行一个过滤,减小shuffle数据,而且过滤掉null值,以及空值。
4、groupBy尽可能经过reduceBy替代。reduceBy会在work节点作一次reduce,在总体进行reduce,至关于作了一次hadoop中的combine操做,而combine操做和reduceBy逻辑一致,这个groupBy不能保证。
5、作join的时候,尽可能用小RDD去join大RDD,用大RDD去join超大的RDD。
6、避免collect的使用。由于collect若是数据集超大的时候,会经过各个work进行收集,io增多,拉低性能,所以当数据集很大时要save到HDFS。
7、RDD若是后面使用迭代,建议cache,可是必定要估计好数据的大小,避免比cache设定的内存还要大,若是大过内存就会删除以前存储的cache,可能致使计算错误,若是想要彻底的存储可使用persist(MEMORY_AND_DISK),由于cache就是persist(MEMORY_ONLY)。
8、设置spark.cleaner.ttl,定时清除task,由于job的缘由可能会缓存不少执行过去的task,因此定时回收可能避免集中gc操做拉低性能。
9、适当pre-partition,经过partitionBy()设定,每次partitionBy会生成一个RDD。