==> RDD是什么?
shell
---> RDD(Resilient Distributed Dataset) 弹性分布式数据集 , 是 Spark 中最基本的数据抽象,它表明一个不可变,可分区,里面的元素可并行计算的集合数组
---> 特色:缓存
---- 自动容错
分布式
---- 位置感知性高度ide
---- 可伸缩性函数
---- 容许用户在执行多个查询时显示的将工做集缓存在内存中,后续的查询可以重用工做集,极大的提高了查询速度oop
---> RDD 的属性this
---- A list of partitionsspa
一个组分片,即数据集的基本组成单位 |
对于 RDD 来讲,每一个分片都会被一个计算任务处理,并决定并行计算的粒度,用户能够在建立 RDD 时指定 RDD的分片个数,若是没有指定,那么就会采用默认值,默认值就是程序所分配 到的 CPU Core 的数目 |
---- A function for computing each split
scala
一个计算每一个分区的函数 |
Spark 中 RDD 的计算是以分片为单位的,每一个 RDD 都会实现 compute 函数以达到这个目的, compute 函数会对迭代器进行复合,不须要保存每次计算的结果 |
---- A list of dependencies on other RDDs
RDD 之间的依赖关系 |
RDD 每次转换都会生成一个新的RDD, 因此 RDD 之间就会造成相似于流水线同样的先后依赖关系。在部分数据丢失时, Spark 能够经过这个依赖关系从新计算丢失 的分区数据,而不是对 RDD的全部分区进行从新计算 |
---- Optionally, a Partitioner for key-value RDDs(e.g. to say that the RDD is hash-partitioned)
一个 Partitioner, 即 RDD的分片函数 |
Spark 中实现 了两种类型的分片函数, 一个是基于哈希的 HashPartitioner, 另一个是基于 RangePartitioner, 只有对于 key-value 的 RDD, 才会有 Partitioner, 非 key-value的 RDD的 Partitioner的值是None, Partitioner函数不但决定 了 RDD 自己的分片数量,也决定了 parents RDD Shuffle 输出时的分片数量 |
---- Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file )
一个列表,存储存取每一个 Partion 的优先位置(preferred location) |
对于一个 HDFS 文件来讲,这个列表 保存的就是每一个Partition 所在的块的位置 按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候,会尽量的将计算任务分配到其所要处理数据块的存储位置 |
==> RDD 的建立方式
---> 经过外部的数据文件建立 (HDFS)
val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt")
---> 经过 sc.parallelize 进行建立
val rdd2 = sc.parallelize(Array(1,2,3,4,5,6))
==> RDD的基本原理
---> 建立一个 RDD:
// 3表明分三个分区 val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8), 3)
---> 一个分区运行在一个Worker 节点上, 一个 Worker 上能够运行多个分区
==> RDD 的类型
---> Trasformation
RDD 中的全部转换都是延迟加载的,即,不会返回计算结果,只记住这些应用到基础数据集(如,一个文件,一个列表等)上的转换动做,只有当发生一个要求返回结果给 Driver 时,这些转换才会执行(我的理解,与 Scala 中的 lazy (懒值)比较类似) |
转换 | 含义 |
map(func) |
返回一个新的 RDD,该 RDD 由每个输入元素通过 func 函数转换后组成 |
filter(func) | 返回一个新的RDD,该 RDD 由通过 func 函数计算后返回值为 true 的输入元素组成 |
flatMap(func) | 相似于 map ,可是每一个输入元素能够被映射为 0 或多个输出元素(返回一个序列) |
mapPartitions(func) | 相似于 map, 可是独立的在RDD 的每个分片上运行,所以在类型为T 的 RDD 上运行时,func 的函数类型必须 是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 相似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,所以在类型为T 的 RDD 上运行时, func 的函数类型必须是(Int, Interator[T])= > Iterator[U] |
sample(withReplacement, fraction, seed) | 根据 fraction 指定的比例对数据进行采样, 能够选择是否使用随机数进行替换, seed 用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD 和 参数 RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD 和参数 RDD 求交集后返回一个新的RDD |
distinct([numTasks]) | 对源RDD 去重后返回一个新的RDD |
groupByKey([numTasks]) | 在 (k, v) 的RDD 上调用,返回一个(K, iterator[V]) 的 RDD |
reduceByKey(func, [numTasks]) | 在(k, v) 的RDD上调用,返回一个(k, v) 的 RDD, 使用指定的reduce函数,将相同key 的值聚合到一块儿,与 groupByKey 相似,reduce 任务的个数能够经过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | |
sortByKey([ascending], [numTasks]) | 在一个(k, v)上调用,k 必须实现 Ordered 接口,返回一个按照 key 进行排序的(k, v) 的RDD |
sortBy(func, [ascending], [numTasks]) | 与 sortByKey 相似,可是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(k, v)和(k, w) 的RDD 上调用,返回一个相同key 对应的全部元素堆在一块儿的(k, (v, w)) 的 RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(k, v)和(k, w)的 RDD 上调用 ,返回一个(k, (Iterable<v>, Iterable<w>)) 类型的 RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | |
coalesce(numPartitions) | |
repartitionAndSortWithinPartitions(partitions) |
---> Action
reduce(fun) | 经过 func 函数汇集RDD中的全部元素,这个功能必须是可交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的全部元素 |
count() | 返回元素个数 |
first() | 反回 RDD 的第一个元素(相似于 take(1)) |
take(n) | 返回一个由数据集的前 n 个元素组成的数组 |
takeSample(withReplacement, num, [seed]) | 返回一个数组,该 数组由从数据集中随机采样的num 个元素组成,能够选择是否用随机数替换不足的部分, seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | |
saveAsTextFile(path) | 将数据集的元素以 textfile 的形式保存到HDFS文件系统或者其它支持的文件系统,对每一个元素,Spark 将会调用 toString 方法将它转换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以 Hadoop sequencefile 的格式 保存到指定的目录下,可使HDFS 或者其它 Hadoop 支持的文件系统 |
saveAsObjectFile(path) | |
countByKey() | 针对(k, v ) 类型的RDD, 返回一个(k, Int) 的 map, 表示每个key 对应的元素个数 |
foreach(func) | 在数据集的每个元素上运行函数 func 进行更新 |
==> RDD 的缓存机制
---> 做用:缓存有可能丢失,或因为存储于内存中的数据因为内存不足而被删除,缓存容错机制保证了即便缓存丢失也能保证计算的正确执行
---> 实现原理:经过基于 RDD 的一系列转换,丢失的数据会被重算,因为 RDD 的各个 Partition 是相对独立的,所以只须要计算丢失的部分便可, 不用所有从新计算
---> 运行方式:RDD经过 persist方法或 cache方法能够将前面的计算结果缓存,但并不会调用时便立缓存,而是触发后面的action 时,此RDD会被缓存到计算机内存中,供后面重用
---> 经过查看源码能够发现,cache 最终调用的也是 parsist
def persist():this.type = persist(StorageLevel.MEMORY_ONLY) def cache():this.type = persist()
---> 缓存使用:
val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt") rdd1.count // 没有缓存,直接执行 rdd1.cache rdd1.count // 第一次执行会慢一些 rdd1.count // 第二次会很快
---> 存储级别在 object StorageLevel 中定义
object StorageLevel{ val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true) val MEMORY_ONLY_SET = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SET_2 = new StorageLevel(false, true, false, false) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_SET = new StorageLevel(true, true, false, false) val MEMORY_ADN_DISK_SET_2 = new StorageLevel(true, true, false, false) val OFF_HEAP = new StorageLevel = new StorageLevel(true, true, true, false) }
==> RDD的 Checkpoint(检查点)机制: 容错机制
---> 检查点本质是经过将 RDD 写入 Disk 作检查点
---> 做用: 经过作 lineage 作容错的辅助
---> 运行机制: 在RDD 的中间阶段作检查点容错,以后若是有节点出现问题而丢失分区,从作检查点的 RDD 开始从新作 Lineage,以达到减小开销的目的
---> 设置检查点的方式: 本地目录, HDFS
---- 本地目录(须要将 spark-shell 运行在本地模式上)
// 设置检查点目录 sc.setCheckpointDir("/data/checkpoint") // 建立一个RDD val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt") // 设置检查点 rdd1.checkpoint // 执行,触发 Action ,会在检查点目录生成检查点 rdd1.count
---- HDFS(须要将 Spark-shell 运行在集群模式上)
// 设置检查点目录 sc.setCheckpointDir("hdfs://192.168.10.210:9000/data/checkpoint") // 建立一个RDD val rdd1 = sc.textFile("hdfs://192.168.10.210:9000/data/data.txt") // 设置检查点 rdd1.checkpoint // 执行,触发 Action ,会在检查点目录生成检查点 rdd1.count
==> RDD 的依赖关系 和 Spark 任务中的 Stage
---> RDD 依赖关系 RDD和它的父 RDD(s)的关系有两种不一样的类型
---- 窄依赖 每一个 父 RDD 的 partition 只能被子 RDD 的一个 partition 使用 一个子RDD
---- 宽依赖 多个子RDD 的 partition 会依赖同一个父 RDD 多个子RDD
---> Stage 划分Stage 的依据是:宽依赖