spark 中的RDD编程 -如下基于Java api

1.RDD介绍:
    RDD,弹性分布式数据集,即分布式的元素集合。在spark中,对全部数据的操做不外乎是建立RDD、转化已有的RDD以及调用RDD操做进行求值。在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操做并行化。
    Spark中的RDD就是一个不可变的分布式对象集合。每一个RDD都被分为多个分区,这些分区运行在集群中的不一样节点上。RDD能够包含Python,Java,Scala中任意类型的对象,甚至能够包含用户自定义的对象。
    用户可使用两种方法建立RDD: 读取一个外部数据集,或在驱动器程序中分发驱动器程序中的对象集合,好比list或者set。
    RDD的转化操做都是惰性求值的,这意味着咱们对RDD调用转化操做,操做不会当即执行。相反,Spark会在内部记录下所要求执行的操做的相关信息。咱们不该该把RDD看作存放着特定数据的数据集,而最好把每一个RDD当作咱们经过转化操做构建出来的、记录如何计算数据的指令列表。数据读取到RDD中的操做也是惰性的,数据只会在必要时读取。转化操做和读取操做都有可能屡次执行。
2.建立RDD数据集
    (1)读取一个外部数据集
JavaRDD<String> lines=sc.textFile(inputFile);
    (2)分发对象集合,这里以list为例
List<String> list=new ArrayList<String>();
list.add("a");
list.add("b");
list.add("c");
JavaRDD<String> temp=sc.parallelize(list);
//上述方式等价于
JavaRDD<String> temp2=sc.parallelize(Arrays.asList("a","b","c"));
3.RDD操做
(1)转化操做
    用java实现过滤器转化操做:
List<String> list=new ArrayList<String>();
//创建列表,列表中包含如下自定义表项
list.add("error:a");
list.add("error:b");
list.add("error:c");
list.add("warning:d");
list.add("hadppy ending!");
//将列表转换为RDD对象
JavaRDD<String> lines = sc.parallelize(list);
//RDD对象lines中有error的表项过滤出来,放在RDD对象errorLines
JavaRDD<String> errorLines = lines.filter(
new Function<String, Boolean>() {
public Boolean call(String v1) throws Exception {
return v1.contains("error");
}
}
);
//遍历过滤出来的列表项
List<String> errorList = errorLines.collect();
for (String line : errorList)
System.out.println(line);
       
输出:
error:a
error:b
error:c
可见,列表list中包含词语error的表项都被正确的过滤出来了。
(2)合并操做
将两个RDD数据集合并为一个RDD数据集
接上述程序示例:
   
   
   
   
JavaRDD<String> warningLines=lines.filter( new Function<String, Boolean>() { public Boolean call(String v1) throws Exception { return v1.contains("warning"); } });JavaRDD<String> unionLines=errorLines.union(warningLines);for(String line :unionLines.collect()) System.out.println(line);
输出:
error:a
error:b
error:c
warning:d
可见,将原始列表项中的全部error项和warning项都过滤出来了。
(3)获取RDD数据集中的部分或者所有元素
①获取RDD数据集中的部分元素   .take(int num)  返回值List<T>   
获取RDD数据集中的前num项。
/**
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
*/
def take(num: Int): JList[T]
程序示例:接上
JavaRDD<String> unionLines=errorLines.union(warningLines);

for(String line :unionLines.
take(2))
System.
out.println(line);
输出:
error:a
error:b
可见,输出了RDD数据集unionLines的前2项
②获取RDD数据集中的所有元素 .collect() 返回值 List<T>
程序示例:
List<String> unions=unionLines.collect();
for(String line :unions)
System.
out.println(line);
遍历输出RDD数据集unions的每一项
4.向spark传递函数
函数名
实现的方法
用途
Function<T,R>
R call(T)
接收一个输入值并返回一个输出值,用于相似map()和filter()的操做中
Function<T1,T2,R>
R call(T1,T2)
接收两个输入值并返回一个输出值,用于相似aggregate()和fold()等操做中
FlatMapFunction<T,R>
Iterable <R> call(T)
接收一个输入值并返回任意个输出,用于相似flatMap()这样的操做中
 ① Function<T,R>
JavaRDD<String> errorLines=lines.filter(
new Function<String, Boolean>() {
public Boolean call(String v1)throws Exception {
return v1.contains("error");
}
}
);
过滤RDD数据集中包含error的表项,新建RDD数据集errorLines
FlatMapFunction<T,R>  
List<String> strLine=new ArrayList<String>();
strLine.add("how are you");
strLine.add("I am ok");
strLine.add("do you love me")
JavaRDD<String> input=sc.parallelize(strLine);
JavaRDD<String> words=input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
将文本行的单词过滤出来,并将全部的单词保存在RDD数据集words中。
 ③  Function<T1,T2,R>
List<String> strLine=new ArrayList<String>();
strLine.add("how are you");
strLine.add("I am ok");
strLine.add("do you love me");
JavaRDD<String> input=sc.parallelize(strLine);
JavaRDD<String> words=input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
JavaPairRDD<String,Integer> counts=words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2(s, 1);
}
}
);
JavaPairRDD <String,Integer> results=counts.reduceByKey(
new org.apache.spark.api.java.function.Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}
) ;
上述程序是spark中的wordcount实现方式,其中的reduceByKey操做的Function2函数定义了遇到相同的key时,value是如何reduce的->直接将二者的value相加。
*注意:
能够将咱们的函数类定义为使用匿名内部类,就像上述程序实现的那样,也能够建立一个具名类,就像这样:
class ContainError implements Function<String,Boolean>{
public Boolean call(String v1) throws Exception {
return v1.contains("error");
}
}
JavaRDD<String> errorLines=lines.filter(new ContainError());
for(String line :errorLines.collect())
System.out.println(line);
具名类也能够有参数,就像上述过滤出含有”error“的表项,咱们能够自定义到底含有哪一个词语,就像这样,程序就更有普适性了。

5.针对每一个元素的转化操做:
    转化操做map()接收一个函数,把这个函数用于RDD中的每一个元素,将函数的返回结果做为结果RDD中对应的元素。关键词: 转化
    转化操做filter()接受一个函数,并将RDD中知足该函数的元素放入新的RDD中返回。关键词: 过滤
示例图以下所示:
map()
计算RDD中各值的平方
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4));
JavaRDD<Integer> result=rdd.map(
new Function<Integer, Integer>() {
public Integer call(Integer v1) throwsException {
return v1*v1;
}
}
);
System.out.println( StringUtils.join(result.collect(),","));
输出:
1,4,9,16
filter()
② 去除RDD集合中值为1的元素:
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4));
JavaRDD<Integer> results=rdd.filter(
new Function<Integer, Boolean>() {
public Boolean call(Integer v1) throws Exception {
return v1!=1;
}
}
);
System.out.println(StringUtils.join(results.collect(),","));
结果:
2,3,4
③ 有时候,咱们但愿对每一个输入元素生成多个输出元素。实现该功能的操做叫作flatMap()。和map()相似,咱们提供给flatMap()的函数被分别应用到了输入的RDD的每一个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器。输出的RDD倒不是由迭代器组成的。咱们获得的是一个包含各个迭代器能够访问的全部元素的RDD。flatMap()的一个简单用途是将输入的字符串切分红单词,以下所示: 
JavaRDD<String> rdd =sc.parallelize(Arrays.asList("hello world","hello you","world i love you"));
JavaRDD<String> words=rdd.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
}
);
System.out.println(StringUtils.join(words.collect(),'\n'));
输出:
hello
world
hello
you
world
i
love
you
6.集合操做

RDD中的集合操做
函数
用途
RDD1.distinct()
生成一个只包含不一样元素的新RDD。须要数据混洗。
RDD1.union(RDD2)
返回一个包含两个RDD中全部元素的RDD
RDD1.intersection(RDD2)
只返回两个RDD中都有的元素
RDD1.substr(RDD2)
返回一个只存在于第一个RDD而不存在于第二个RDD中的全部元素组成的RDD。须要数据混洗。
集合操做对笛卡尔集的处理:

RDD1.cartesian(RDD2)
返回两个RDD数据集的笛卡尔集
程序示例:生成RDD集合{1,2} 和{1,2}的笛卡尔集
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2));
JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1,2));
JavaPairRDD<Integer ,Integer> rdd=rdd1.cartesian(rdd2);
for(Tuple2<Integer,Integer> tuple:rdd.collect())
System.out.println(tuple._1()+"->"+tuple._2());
输出:
1->1
1->2
2->1
2->2
7.行动操做
(1)reduce操做
    reduce()接收一个函数做为参数,这个函数要操做两个RDD的元素类型的数据并返回一个一样类型的新元素。一个简单的例子就是函数+,能够用它来对咱们的RDD进行累加。使用reduce(),能够很方便地计算出RDD中全部元素的总和,元素的个数,以及其余类型的聚合操做。
    如下是求RDD数据集全部元素和的程序示例:
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
Integer sum =rdd.reduce(
new Function2<Integer, Integer, Integer>() {
public Integercall(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}
);
System.out.println(sum.intValue());
输出:55
(2)fold()操做
    接收一个与reduce()接收的函数签名相同的函数,再加上一个初始值来做为每一个分区第一次调用时的结果。你所提供的初始值应当是你提供的操做的单位元素,也就是说,使用你的函数对这个初始值进行屡次计算不会改变结果(例如+对应的0,*对应的1,或者拼接操做对应的空列表)。
    程序实例:
①计算RDD数据集中全部元素的和:
zeroValue=0;//求和时,初始值为0。
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
Integer sum =rdd.fold(0,
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
}
);
System.out.println(sum);
②计算RDD数据集中全部元素的积:
zeroValue=1;//求积时,初始值为1。
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
Integer result =rdd.fold(1,
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1*v2;
}
}
);
System.out.println(result);
(3)aggregate()操做
    aggregate()函数返回值类型 没必要与所操做的RDD类型相同
    与fold()相似,使用aggregate()时,须要提供咱们期待返回的类型的初始值。而后经过一个函数把RDD中的元素合并起来放入累加器。考虑到每一个节点是在本地进行累加的,最终,还须要提供第二个函数来将累加器两两合并。
如下是程序实例:
public class AvgCount implements Serializable{
public int total;
public int num;
public AvgCount(int total,int num){
this.total=total;
this.num=num;
}
public double avg(){
return total/(double)num;
}
static Function2<AvgCount,Integer,AvgCount> addAndCount=
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) throws Exception {
a.total+=x;
a.num+=1;
return a;
}
};
static Function2<AvgCount,AvgCount,AvgCount> combine=
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) throws Exception {
a.total+=b.total;
a.num+=b.num;
return a;
}
};
public static void main(String args[]){

SparkConf conf = new SparkConf().setMaster("local").setAppName("my app");
JavaSparkContext sc = new JavaSparkContext(conf);

AvgCount intial =new AvgCount(0,0);
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
AvgCount result=rdd.aggregate(intial,addAndCount,combine);
System.out.println(result.avg());

}

}
这个程序示例能够实现求出RDD对象集的平均数的功能。其中addAndCount将RDD对象集中的元素合并起来放入AvgCount对象之中,combine提供两个AvgCount对象的合并的实现。咱们初始化AvgCount(0,0),表示有0个对象,对象的和为0,最终返回的result对象中total中储存了全部元素的和,num储存了元素的个数,这样调用result对象的函数avg()就可以返回最终所需的平均数,即avg=tatal/(double)num。
8.持久化缓存
    由于Spark RDD是惰性求值的,而有时咱们但愿能屡次使用同一个RDD。 若是简单地对RDD调用行动操做,Spark每次都会重算RDD以及它的全部依赖。这在迭代算法中消耗格外大,由于迭代算法经常会屡次使用同一组数据。
    为了不屡次计算同一个RDD,可让Spark对数据进行持久化。当咱们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。
    出于不一样的目的,咱们能够为RDD选择不一样的持久化级别。默认状况下persist()会把数据以序列化的形式缓存在JVM的堆空间中
                                                        不一样关键字对应的存储级别表
级别
使用的空间
cpu时间
是否在内存
是否在磁盘
备注
MEMORY_ONLY
直接储存在内存
MEMORY_ONLY_SER
序列化后储存在内存里
MEMORY_AND_DISK
中等
部分
部分
若是数据在内存中放不下,溢写在磁盘上
MEMORY_AND_DISK_SER
部分
部分
数据在内存中放不下,溢写在磁盘中。内存中存放序列化的数据。
DISK_ONLY
直接储存在硬盘里面
程序示例: 将RDD数据集持久化在内存中。
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5));
rdd.persist(StorageLevel.MEMORY_ONLY());
System.out.println(rdd.count());
System.out.println(StringUtils.join(rdd.collect(),','));
RDD还有unpersist()方法,调用该方法能够手动把持久化的RDD从缓存中移除。
9.不一样的RDD类型
    Java中有两个专门的类JavaDoubleRDD和JavaPairRDD,来处理特殊类型的RDD,这两个类还针对这些类型提供了额外的函数,折让你能够更加了解所发生的一切,可是也显得有些累赘。
    要构建这些特殊类型的RDD,须要使用特殊版本的类来替代通常使用的Function类。若是要从T类型的RDD建立出一个DoubleRDD,咱们就应当在映射操做中使用DoubleFunction<T>来替代Function<T,Double>。
程序实例:如下是一个求RDD每一个对象的平方值的程序实例,将普通的RDD对象转化为DoubleRDD对象,最后调用DoubleRDD对象的max()方法,返回生成的平方值中的最大值。
JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5));
JavaDoubleRDD result=rdd.mapToDouble(
new DoubleFunction<Integer>() {
public double call(Integer integer) throws Exception {
return (double) integer*integer;
}
}
);
System.out.println(result.max());





相关文章
相关标签/搜索