Spark学习笔记总结html
Spark能够用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。
Spark是MapReduce的替代方案,并且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。shell
RDD(Resilient Distributed Dataset)叫作分布式数据集,是Spark中最基本的数据抽象,它表明一个不可变(建立了内容不可变)、可分区、里面的元素可并行计算的集合。机器学习
查看该rdd的分区数量
rdd1.partitions.length分布式
RDD中两种算子:
transformation转换,是延迟加载的函数
经常使用的transformation:
(1)map、flatMap、filter
(2)intersection求交集、union求并集:注意类型要一致
distinct:去重
(3)join:类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的全部元素对在一块儿的(K,(V,W))的RDD
(4)groupByKey:在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
可是效率reduceByKey较高,由于有一个本地combiner的过程。
(5)cartesian笛卡尔积oop
经常使用的action
(1)collect()、count()
(2)reduce:经过func函数汇集RDD中的全部元素
(3)take(n):取前n个;top(2):排序取前两个
(4)takeOrdered(n),排完序后取前n个学习
参考《http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html》spa
(1)mapPartitions(func)和
mapPartitions(func):
独立地在RDD的每个分片上运行,可是返回值;foreachPartition(func)也经常使用,不须要返回值命令行
mapPartitionsWithIndex(func):
能够看到分区的编号,以及该分区数据。
相似于mapPartitions,但func带有一个整数参数表示分片的索引值,func的函数类型必须是
(Int, Interator[T]) => Iterator[U]scala
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2) val func = (index: Int, iter: Iterator[(Int)]) => {iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator} rdd1.mapPartitionsWithIndex(func).collect
(2)aggregate
action操做,
第一个参数是初始值,
第二个参数:是2个函数[每一个函数都是2个参数(第一个参数:先对个个分区进行的操做, 第二个:对个个分区合并后的结果再进行合并), 输出一个参数]
例子:
rdd1.aggregate(0)(_+_, _+_) //前一个是对每个分区进行的操做,第二个是对各分区结果进行的结果 rdd1.aggregate(5)(math.max(_, _), _ + _) //结果:5 + (5+9) = 19 val rdd3 = sc.parallelize(List("12","23","345","4567"),2) rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) //结果:24或者42 val rdd4 = sc.parallelize(List("12","23","345",""),2) rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) //结果01或者10
(3)aggregateByKey
将key值相同的,先局部操做,再总体操做。。和reduceByKey内部实现差很少
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2) pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect //结果:Array((dog,12), (cat,17), (mouse,6))
PS:
和reduceByKey(+)调用的都是同一个方法,只是aggregateByKey要底层一些,能够先局部再总体操做。
(4)combineByKey
和reduceByKey是相同的效果,是reduceByKey的底层。
第一个参数x:原封不动取出来, 第二个参数:是函数, 局部运算, 第三个:是函数, 对局部运算后的结果再作运算
每一个分区中每一个key中value中的第一个值,
val rdd1 = sc.textFile("hdfs://master:9000/wordcount/input/").flatMap(_.split(" ")).map((_, 1)) val rdd2 = rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) rdd2.collect
第一个参数的含义:
每一个分区中相同的key中value中的第一个值
如:
(hello,1)(hello,1)(good,1)-->(hello(1,1),good(1))-->x就至关于hello的第一个1, good中的1
val rdd3 = rdd1.combineByKey(x => x + 10, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n) rdd3.collect //每一个会多加3个10 val rdd4 = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val rdd5 = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) val rdd6 = rdd5.zip(rdd4) val rdd7 = rdd6.combineByKey(List(_), (x: List[String], y: String) => x :+ y, (m: List[String], n: List[String]) => m ++ n) //将key相同的数据,放入一个集合中
(5)collectAsMap
Action
Map(b -> 2, a -> 1)//将Array的元祖转换成Map,之后能够经过key取值
val rdd = sc.parallelize(List(("a", 1), ("b", 2))) rdd.collectAsMap //能够下一步使用
(6)countByKey
根据key计算key的数量
Action
val rdd1 = sc.parallelize(List(("a", 1), ("b", 2), ("b", 2), ("c", 2), ("c", 1))) rdd1.countByKey rdd1.countByValue//将("a", 1)当作一个元素,统计其出现的次数
(7)flatMapValues 对每个value进行操做后压平