Spark学习之JavaRdd

RDD 介绍

RDD,全称Resilient Distributed Datasets(弹性分布式数据集),是Spark最为核心的概念,是Spark对数据的抽象。RDD是分布式的元素集合,每一个RDD只支持读操做,且每一个RDD都被分为多个分区存储到集群的不一样节点上。除此以外,RDD还容许用户显示的指定数据存储到内存和磁盘中,掌握了RDD编程是SPARK开发的第一步。java

 

1:建立操做(creation operation):RDD的建立由SparkContext来负责。
2:转换操做(transformation operation):将一个RDD经过必定操做转换为另外一个RDD。
3:行动操做(action operation):Spark为惰性计算,对RDD的行动操做都会触发Spark做业的运行
4:控制操做(control operation):对RDD进行持久化等。

 

DEMO代码地址:https://github.com/zhp8341/sparkdemo/blob/master/src/main/java/com/demo/spark/rdddemo/OneRDD.javagit

一:建立操做

建立RDD有两种方式:
1 读取一个数据集(SparkContext.textFile()) :github

JavaDStreamlines=jssc.textFileStream("/Users/huipeizhu/Documents/sparkdata/input/"); 
JavaReceiverInputDStreamlines = jssc.socketTextStream("localhost", 9999);


2 读取一个集合(SparkContext.parallelize()) :redis

Listlist = Arrays.asList(5, 4, 3, 2, 1);
JavaRDDrdd = sc.parallelize(list);

二:转换操做

1:单个RDD转换操做
map() : 对每一个元素进行操做,返回一个新的RDD
System.out.println("RDD每一个元素乘10:" + rdd.map(v -> v * 10)
编程


filter() : 最每一个元素进行筛选,返回符合条件的元素组成的一个新RDD
System.out.println("RDD去掉1的元素:" + rdd.filter(v -> v != 1));缓存

flatMap() : 对每一个元素进行操做,将返回的迭代器的全部元素组成一个新的RDD返回
r.dd.flatMap(x -> x.to(3)).collect()
socket

distinct():去重操做
System.out.println("RDD去重操做:" + rdd.distinct());
分布式

rdd最大和最小值
ui

Integer max=  rdd.reduce((v1, v2) -> Math.max(v1, v2));spa

Integer min=  rdd.reduce((v1, v2) -> Math.min(v1, v2))


2:两个RDD的转化操做:


[1, 2, 3] [3, 4, 5] 两个个RDD简单相关操做

union() :合并,不去重
System.out.println("两个RDD集合:" + rdd1.union(rdd2).collect());

intersection() :交集
System.out.println("两个RDD集合共同元素:" + rdd1.intersection(rdd2).collect());

cartesian() :笛卡儿积
System.out.println("和另一个RDD集合的笛卡尔积:" + rdd1.cartesian(rdd2).collect());

subtract() : 移除相同的内容
rdd1.subtract(rdd2).collect()

 

三:行动操做


collect() :返回全部元素
System.out.println("原始数据:" + rdd.collect());

count() :返回元素个数
System.out.println("统计RDD的全部元素:" + rdd.count());

countByValue() : 各个元素出现的次数
System.out.println("每一个元素出现的次数:" + rdd.countByValue());

take(num) : 返回num个元素
System.out.println("取出rdd返回2个元素:" + rdd.take(2));

top(num) : 返回前num个元素
System.out.println("取出rdd返回最前2个元素:" + rdd.top(2));


reduce(func) :并行整合RDD中的全部数据(最经常使用的)
System.out.println("整合RDD中全部数据(sum):" + rdd.reduce((v1, v2) -> v1 + v2));

foreach(func):对每一个元素使用func
rdd.foreach(t -> System.out.print(t));


四:控制操做


cache():

persist():保留着RDD的依赖关系

checkpoint(level:StorageLevel):RDD[T]切断RDD依赖关系

所谓的控制操做就是持久化你能经过persist()或者cache()方法持久化一个rdd。首先,在action中计算获得rdd;而后,将其保存在每一个节点的内存中。Spark的缓存是一个容错的技术-若是RDD的任何一个分区丢失,它 能够经过原有的转换(transformations)操做自动的重复计算而且建立出这个分区。此外,咱们能够利用不一样的存储级别存储每个被持久化的RDD。Spark自动的监控每一个节点缓存的使用状况,利用最近最少使用原则删除老旧的数据。若是你想手动的删除RDD,可使用RDD.unpersist()方法。在实际操做当中咱们能够借助第三方进行数据持久化 如:redis

相关文章
相关标签/搜索