1.从集合中建立RDD程序员
val conf = new SparkConf().setAppName("Test").setMaster("local") |
2.从外部存储建立RDD编程
//从外部存储建立RDD |
RDD支持两种操做:转化操做和行动操做。RDD 的转化操做是返回一个新的 RDD的操做,好比 map()和 filter(),而行动操做则是向驱动器程序返回结果或把结果写入外部系统的操做。好比 count() 和 first()。缓存
Spark采用惰性计算模式,RDD只有第一次在一个行动操做中用到时,才会真正计算。Spark能够优化整个计算过程。默认状况下,Spark 的 RDD 会在你每次对它们进行行动操做时从新计算。若是想在多个行动操做中重用同一个 RDD,可使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。函数
Transformation算子oop
RDD中的全部转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动做。只有当发生一个要求返回结果给Driver的动做时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。大数据
转换优化 |
含义spa |
map(func) |
返回一个新的RDD,该RDD由每个输入元素通过func函数转换后组成 |
filter(func) |
返回一个新的RDD,该RDD由通过func函数计算后返回值为true的输入元素组成 |
flatMap(func) |
相似于map,可是每个输入元素能够被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素) |
mapPartitions(func) |
相似于map,但独立地在RDD的每个分片上运行,所以在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) |
相似于mapPartitions,但func带有一个整数参数表示分片的索引值,所以在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) |
根据fraction指定的比例对数据进行采样,能够选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) |
对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) |
对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) |
对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) |
在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) |
在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一块儿,与groupByKey相似,reduce任务的个数能够经过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
相同的Key值进行聚合操做,在聚合过程当中一样使用了一个中立的初始值zeroValue:中立值,定义返回value的类型,并参与运算seqOp:用来在同一个partition中合并值combOp:用来在不一样partiton中合并值 |
sortByKey([ascending], [numTasks]) |
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) |
与sortByKey相似,可是更灵活 |
join(otherDataset, [numTasks]) |
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的全部元素对在一块儿的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) |
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD |
cartesian(otherDataset) |
笛卡尔积 |
pipe(command, [envVars]) |
将一些shell命令用于Spark中生成新的RDD |
coalesce(numPartitions) |
从新分区 |
repartition(numPartitions) |
从新分区 |
repartitionAndSortWithinPartitions(partitioner) |
从新分区和排序 |
Action算子
在RDD上运行计算,并返回结果给Driver或写入文件系统
动做 |
含义 |
reduce(func) |
经过func函数汇集RDD中的全部元素,这个功能必须是可交换且可并联的 |
collect() |
在驱动程序中,以数组的形式返回数据集的全部元素 |
count() |
返回RDD的元素个数 |
first() |
返回RDD的第一个元素(相似于take(1)) |
take(n) |
返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) |
返回一个数组,该数组由从数据集中随机采样的num个元素组成,能够选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) |
takeOrdered和top相似,只不过以和top相反的顺序返回元素 |
saveAsTextFile(path) |
将数据集的元素以textfile的形式保存到HDFS文件系统或者其余支持的文件系统,对于每一个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) |
将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可使HDFS或者其余Hadoop支持的文件系统。 |
saveAsObjectFile(path) |
|
countByKey() |
针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每个key对应的元素个数。 |
foreach(func) |
在数据集的每个元素上,运行函数func进行更新。 |
RDD支持两种操做:转化操做和行动操做。RDD 的转化操做是返回一个新的 RDD的操做,好比 map()和 filter(),而行动操做则是向驱动器程序返回结果或把结果写入外部系统的操做。好比 count() 和 first()。
Spark采用惰性计算模式,RDD只有第一次在一个行动操做中用到时,才会真正计算。Spark能够优化整个计算过程。默认状况下,Spark 的 RDD 会在你每次对它们进行行动操做时从新计算。若是想在多个行动操做中重用同一个 RDD,可使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。
Transformation算子****
RDD中的全部转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动做。只有当发生一个要求返回结果给Driver的动做时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。
转换 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每个输入元素通过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由通过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 相似于map,可是每个输入元素能够被映射为0或多个输出元素(因此func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 相似于map,但独立地在RDD的每个分片上运行,所以在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 相似于mapPartitions,但func带有一个整数参数表示分片的索引值,所以在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,能够选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一块儿,与groupByKey相似,reduce任务的个数能够经过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 相同的Key值进行聚合操做,在聚合过程当中一样使用了一个中立的初始值zeroValue:中立值,定义返回value的类型,并参与运算seqOp:用来在同一个partition中合并值combOp:用来在不一样partiton中合并值 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey相似,可是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的全部元素对在一块儿的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | 将一些shell命令用于Spark中生成新的RDD |
coalesce(numPartitions) | 从新分区 |
repartition(numPartitions) | 从新分区 |
repartitionAndSortWithinPartitions(partitioner) | 从新分区和排序 |
** Action算子**
在RDD上运行计算,并返回结果给Driver或写入文件系统
动做 | 含义 |
---|---|
reduce(func) | 经过func函数汇集RDD中的全部元素,这个功能必须是可交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的全部元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(相似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,能够选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | takeOrdered和top相似,只不过以和top相反的顺序返回元素 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其余支持的文件系统,对于每一个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可使HDFS或者其余Hadoop支持的文件系统。 |
saveAsObjectFile(path) | |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每个key对应的元素个数。 |
foreach(func) | 在数据集的每个元素上,运行函数func进行更新。 |