RDD(Resilient Distributed Dataset) 叫着 弹性分布式数据集 ,是Spark 中最基本的抽象,它表明一个不可变、可分区、里面元素能够并行计算的集合。php
RDD 具备数据流模型特色:自动容错、位置感知性调度和可伸缩。java
RDD 容许用户在执行多个查询时,显示地将工做集缓存在内存中,后续的查询可以重用工做集,这将会极大的提高查询的效率。linux
咱们能够认为 RDD 就是一个代理,咱们操做这个代理就像操做本地集合同样,不需去关心任务调度、容错等问题。android
在 RDD 源码中这样来描述 RDD算法
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
复制代码
#经过并行化scala集合建立RDD,通常在测试的时候使用
scala> var rdd = sc.parallelize(List(1,2,3,4,5,6,7,8,9))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
复制代码
var rdd1 = sc.textFile("/root/words.txt")
var rdd2 = sc.textFile("hdfs:192.168.80.131:9000/words.text")
复制代码
这种 RDD 中的全部转换都是延迟加载的,也就是说,他们并不会直接就计算结果。相反的,他们只是记住这些应用到基础数据集(例如一个文件)上的转换动做。只有当发生一个返回结果的 Driver 的动做时,这些操做才会真正的运行。这种设计会让Spark 更加有效率的运行。apache
经常使用的 Transformation 操做:编程
转换 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每个输入元素通过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由通过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 相似于map,可是每个输入元素能够被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 相似于map,但独立地在RDD的每个分片上运行,所以在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 相似于mapPartitions,但func带有一个整数参数表示分片的索引值,所以在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,能够选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一块儿,与groupByKey相似,reduce任务的个数能够经过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 先按分区聚合 再总的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 对k/y的RDD进行操做 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey相似,可是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的全部元素对在一块儿的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | 调用外部程序 |
coalesce(numPartitions) | 从新分区 第一个参数是要分多少区,第二个参数是否shuffle 默认false ;少分区变多分区 true ; 多分区变少分区 false |
repartition(numPartitions) | 从新分区 必须shuffle 参数是要分多少区 少变多 |
repartitionAndSortWithinPartitions(partitioner) | 从新分区+排序 比先分区再排序效率高 对K/V的RDD进行操做 |
触发代码的运行操做,咱们一个Spark 应用,至少须要一个 Action 操做。windows
动做 | 含义 |
---|---|
reduce(func) | 经过func函数汇集RDD中的全部元素,这个功能必须是课交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的全部元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(相似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,能够选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其余支持的文件系统,对于每一个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可使HDFS或者其余Hadoop支持的文件系统。 |
saveAsObjectFile(path) | |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每个key对应的元素个数。 |
foreach(func) | 在数据集的每个元素上,运行函数func进行更新。 |
foreachPartition(func) | 在每一个分区上,运行函数 func |
执行流程图: api
pom.xml 依赖数组
<!-- 导入scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 导入spark的依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
复制代码
scala 版本代码实现:
package com.zhouq.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* scala 版本实现 wc
*
*/
object ScalaWordCount {
def main(args: Array[String]): Unit = {
//这行代码是由于我在windows 上直接跑,须要去读取 hadoop 上的文件,设置个人用户名。若是是linux 环境能够不设置。视状况而定
System.setProperty("HADOOP_USER_NAME", "root")
//建立spark 配置,设置应用程序名字
// val conf = new SparkConf().setAppName("scalaWordCount")
val conf = new SparkConf().setAppName("scalaWordCount").setMaster("local[4]")
// conf.set("spark.testing.memory","102457600")
//建立spark 执行的入口
val sc = new SparkContext(conf)
//指定之后从哪里读取数据建立RDD (弹性分布式数据集)
//取到一行数据
val lines: RDD[String] = sc.textFile(args(0))
//切分压平
val words: RDD[String] = lines.flatMap(_.split(" "))
//按单词和一组合
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
//按key 进行聚合
val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
// 排序, false 表示倒序
val sorted = reduced.sortBy(_._2, false)
//将结果保存到hdfs中
sorted.saveAsTextFile(args(1))
//释放资源
sc.stop()
}
}
复制代码
Java7 版本:
package com.zhouq.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* Java 版WordCount
*/
public class JavaWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaWordCount");
//建立SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
//指定读取数据的位置
JavaRDD<String> lines = jsc.textFile(args[0]);
//切分压平
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception{
return Arrays.asList(line.split(" ")).iterator();
}
});
//将单词进行组合 (a,1),(b,1),(c,1),(a,1)
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String tp) throws Exception {
return new Tuple2<>(tp, 1);
}
});
//先交换再排序,由于 只有groupByKey 方法
JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {
// return new Tuple2<>(tp._2, tp._1);
return tp.swap();
}
});
//排序
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//再次交换顺序
JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {
return tp.swap();
}
});
//输出到hdfs
result.saveAsTextFile(args[1]);
jsc.stop();
}
}
复制代码
Java8 版本:
package com.zhouq.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
/**
* Java Lambda 表达式版本的 WordCount
*/
public class JavaLambdaWordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaWordCount");
//建立SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
//指定读取数据的位置
JavaRDD<String> lines = jsc.textFile(args[0]);
//切分压平
// lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());
//将单词进行组合 (a,1),(b,1),(c,1),(a,1)
// words.mapToPair(tp -> new Tuple2<>(tp,1));
JavaPairRDD<String, Integer> wordAndOne = words.mapToPair((PairFunction<String, String, Integer>) tp -> new Tuple2<>(tp, 1));
//先交换再排序,由于 只有groupByKey 方法
// swaped.mapToPair(tp -> tp.swap());
JavaPairRDD<Integer, String> swaped = wordAndOne.mapToPair((PairFunction<Tuple2<String, Integer>, Integer, String>) tp -> {
// return new Tuple2<>(tp._2, tp._1);
return tp.swap();
});
//排序
JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
//再次交换顺序
// sorted.mapToPair(tp -> tp.swap());
JavaPairRDD<String, Integer> result = sorted.mapToPair((PairFunction<Tuple2<Integer, String>, String, Integer>) tp -> tp.swap());
//输出到hdfs
result.saveAsTextFile(args[1]);
jsc.stop();
}
}
复制代码
RDD 和它依赖的 父 RDD(可能有多个) 的关系有两种不一样的类型,即 窄依赖(narrow dependency)和宽依赖(wide dependency)。
窄依赖:窄依赖指的是每个父 RDD 的 Partition 最多被子 RDD 的一个分区使用。能够比喻为独生子女。 宽依赖:宽依赖是多个字 RDD 的Partition 会依赖同一个父 RDD 的 Partition
Spark中最重要的功能之一是跨操做在内存中持久化(或缓存)数据集。当您持久保存RDD时,每一个节点都会存储它在内存中计算的任何分区,并在该数据集(或从中派生的数据集)的其余操做中重用它们。这使得将来的行动更快(一般超过10倍)。缓存是迭代算法和快速交互使用的关键工具。
您可使用persist()或cache()方法标记要保留的RDD 。第一次在动做中计算它,它将保留在节点的内存中。Spark的缓存是容错的 - 若是丢失了RDD的任何分区,它将使用最初建立它的转换自动从新计算。
使用 rdd.persist()或者rdd.cache()
val lines: RDD[String] = sc.textFile("hdfs://xxx/user/accrss")
//使用cache 方法来缓存数据到内存
val cache = lines.cache()
//注意查看下面两次count 的时间
cached.count
cached.count
复制代码
咱们在 StorageLevel.scala 源码中能够看到:
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
复制代码
解释一下各个参数的意思:
第一个参数表示: 放到磁盘 第二个参数表示: 放到内存 第三个参数表示: 磁盘中的数据是否以Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第四个参数表示: 内存中的数据是否以Java 对象的方式保存,true 表示是, false表示以序列化的方式存放 第五个参数表示: 存放几份数据(目的是为了怕executor 出现故障致使分区数据丢失,当从新分配任务时,去另外的机器读取备份数据进行从新计算)
OFF_HEAP : 堆外内存,以序列化的格式存储RDD到Tachyon(一个分布式内存存储系统)中
Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不一样权衡。咱们推荐经过下面的过程选择一个合适的存储级别:
Spark自动的监控每一个节点缓存的使用状况,利用最近最少使用原则删除老旧的数据。若是你想手动的删除RDD,可使用 RDD.unpersist()方法
咱们除了把数据缓存到内存中,还能够把数据缓存到HDFS 中,保证中间数据不丢失.
何时咱们须要作chechpoint?
怎么作 checkpoint ?
首先设置 checkpoint 目录,而后再执行计算逻辑,再执行 checkpoint() 操做。
下面代码使用cache 和 checkpoint 两种方式实现计算每门课最受欢迎老师的 topN
package com.zhouq.spark
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 求每门课程最受欢迎老师TopN --2
* -- 使用cache
* -- 使用checkpoint 通常设置hdfs 目录
*/
object GroupFavTeacher2_cache_checkpoint {
def main(args: Array[String]): Unit = {
//前 N
val topN = args(1).toInt
//学科集合
val subjects = Array("bigdata", "javaee", "php")
val conf = new SparkConf().setAppName("FavTeacher").setMaster("local[4]")
//建立spark 执行入口
val sc = new SparkContext(conf)
//checkpoint 得先设置 sc 的checkpoint 的dir
// sc.setCheckpointDir("hdfs://hdfs://hadoop1:8020/user/root/ck20190215")
//指定读取数据
val lines: RDD[String] = sc.textFile(args(0))
val subjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
val index = line.lastIndexOf("/")
var teacher = line.substring(index + 1)
var httpHost = line.substring(0, index)
var subject = new URL(httpHost).getHost.split("[.]")(0)
((subject, teacher), 1)
})
//将学科,老师联合当作key
val reduced: RDD[((String, String), Int)] = subjectTeacherAndOne.reduceByKey(_ + _)
//第一种使用cache RDD 把数据缓存在内存中.标记为cache 的RDD 之后被反复使用,才使用cache
val cached: RDD[((String, String), Int)] = reduced.cache()
//第二种 使用checkpoint,得先设置 sc 的 checkpointDir
// val cached: RDD[((String, String), Int)] = reduced.checkpoint()
/**
* 先对学科进行过滤,而后再进行排序,调用RDD 的sortBy进行排序,避免scala 的排序当数据量大时,内存不足的状况.
* take 是Action 操做,每次take 都会进行一次任务提交,具体查看日志打印状况
*/
for (sub <- subjects) {
//过滤出当前的学科
val filtered: RDD[((String, String), Int)] = cached.filter(_._1._1 == sub)
//使用RDD 的 sortBy ,内存+磁盘排序,避免scala 中的排序因内存不足致使异常状况.
//take 是Action 的,因此每次循环都会触发一次提交任务,祥见日志打印状况
val favTeacher: Array[((String, String), Int)] = filtered.sortBy(_._2, false).take(topN)
println(favTeacher.toBuffer)
}
/**
* 前面cache的数据已经计算完了,后面还有不少其余的指标要计算
* 后面计算的指标也要触发不少次Action,最好将数据缓存到内存
* 原来的数据占用着内存,把原来的数据释放掉,才能缓存新的数据
*/
//把原来缓存的数据释放掉
cached.unpersist(true)
sc.stop()
}
}
复制代码
DAG(Directed Acyclic Graph)叫作有向无环图,原始的RDD经过一系列的转换就就造成了DAG,根据RDD之间的依赖关系的不一样将DAG划分红不一样的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,因为有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,所以宽依赖是划分Stage的依据。
微信公众号文章连接:Spark RDD
有兴趣欢迎关注,你们一块儿交流学习。