Action类算子也是一类算子(函数)叫作行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序(就是咱们编写的一个应用程序)中有几个Action类算子执行,就有几个job运行。es6
经过函数func汇集数据集中的全部元素,这个函数必须是关联性的,确保能够被正确的并发执行 apache
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24 scala> rdd1.reduce(_+_) res3: Int = 55
在driver的程序中,以数组的形式,返回数据集的全部元素,这一般会在使用filter或者其它操做后,返回一个足够小的数据子集再使用 数组
scala> var rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24 scala> rdd1.collect res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
返回数据集的元素个数 并发
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:24 scala> rdd1.count res4: Long = 10
返回数据集的第一个元素(相似于take(1)) app
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at <console>:24 scala> rdd1.first res5: Int = 1
返回一个数组,由数据集的前n个元素组成。注意此操做目前并不是并行执行的,而是driver程序所在机器 ide
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at <console>:24 scala> rdd1.take(3) res6: Array[Int] = Array(1, 2, 3)
withReplacement:结果中是否可重复
num:取多少个
seed:随机种子
返回一个数组,在数据集中随机采样num个元素组成,能够选择是否用随机数替换不足的部分,seed用于指定的随机数生成器种子
原理
takeSample()函数和sample函数是一个原理,可是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果再也不是RDD,而是至关于对采样后的数据进行collect(),返回结果的集合为单机的数组函数
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at makeRDD at <console>:24 scala> rdd1.takeSample(true,4,10) res19: Array[Int] = Array(10, 10, 2, 3)
takeOrdered和top相似,只不过以和top相反的顺序返回元素。
top默认倒序,taskOrdered默认正序
top方法其实就是调用的taskOrdered,而后反转的结果es5
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { takeOrdered(num)(ord.reverse) }
scala> val rdd1 = sc.makeRDD(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at makeRDD at <console>:24 scala> rdd1.top(5) res22: Array[Int] = Array(10, 9, 8, 7, 6) scala> rdd1.takeOrdered(5) res23: Array[Int] = Array(1, 2, 3, 4, 5)
saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中spa
val conf = new SparkConf() .setAppName("saveFile") .setMaster("local[*]") val sc = new SparkContext(conf) val rdd1: RDD[Int] = sc.parallelize(1 to 10) rdd1.repartition(1).saveAsTextFile("/tmp/fff")
saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。使用方法和saveAsTextFile相似scala
saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。使用方法和saveAsTextFile相似
对(K,V)类型的RDD有效,返回一个(K,Int)对的map,表示每个能够对应的元素个数
scala> val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",3))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at makeRDD at <console>:24 scala> rdd1.countByKey res1: scala.collection.Map[String,Long] = Map(B -> 2, A -> 2, C -> 1)
在数据集的每个元素上,运行函数func,t一般用于更新一个累加器变量,或者和外部存储系统作交互
scala> val rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",3))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at makeRDD at <console>:24 scala> rdd1.collect.foreach(println(_)) (A,0) (A,2) (B,1) (B,2) (C,3)