在分布式程序中,通讯的代价是很大的,所以控制数据分布以得到最少的网络传输能够极大地提高总体性能。和单节点的程序须要为记录集合选择合适的数据结构同样,Spark 程序能够经过控制RDD 分区方式来减小通讯开销。java
分区并非对全部应用都有好处的——好比,若是给定RDD 只须要被扫描一次,咱们彻底没有必要对其预先进行分区处理。只有当数据集屡次在诸如链接这种基于键的操做中使用时,分区才会有帮助。apache
好比, sortByKey() 和 groupByKey()会分别生成范围分区的 RDD 和哈希分区的 RDD。而另外一方面,诸如 map() 这样的操做会致使新的 RDD 失去父 RDD 的分区信息,由于这样的操做理论上可能会修改每条记录的键。api
从分区中获益的操做缓存
Spark 的许多操做都引入了将数据根据键跨节点进行混洗的过程。全部这些操做都会从数据分区中获益。就 Spark 1.0 而言,可以从数据分区中获益的操做有 cogroup() 、groupWith() 、 join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 以及 lookup() 。网络
而对于诸如 cogroup() 和join() 这样的二元操做,预先进行数据分区会致使其中至少一个 RDD(使用已知分区器的那个 RDD)不发生数据混洗。若是两个 RDD 使用一样的分区方式,而且它们还缓存在一样的机器上(好比一个 RDD 是经过 mapValues() 从另外一个 RDD 中建立出来的,这两个RDD 就会拥有相同的键和分区方式),或者其中一个 RDD 尚未被计算出来,那么跨节点的数据混洗就不会发生了。数据结构
影响分区方式的操做分布式
全部会为生成的结果 RDD 设好分区方式的操做: cogroup() 、 groupWith() 、join() 、 leftOuterJoin() 、 rightOuterJoin() 、 groupByKey() 、 reduceByKey() 、combineByKey() 、 partitionBy() 、 sort() 、 mapValues() (若是父 RDD 有分区方式的话)、flatMapValues() (若是父 RDD 有分区方式的话),以及 filter() (若是父 RDD 有分区方式的话)。其余全部的操做生成的结果都不会存在特定的分区方式。ide
最后,对于二元操做,输出数据的分区方式取决于父 RDD 的分区方式。默认状况下,结果会采用哈希分区,分区的数量和操做的并行度同样。不过,若是其中的一个父 RDD 已性能
经设置过度区方式,那么结果就会采用那种分区方式;若是两个父 RDD 都设置过度区方式,结果 RDD 会采用第一个父 RDD 的分区方式。this
import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Scanner; import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.AbstractJavaRDDLike; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y); public class CombineByKeyTest3 { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCounter"); conf.set("spark.testing.memory", "2147480000"); conf.set("spark.default.parallelism", "4"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)经过读取外部存储 ----- 集群环境使用 2)经过内存中的集合 List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>(); data.add(new Tuple2<>("Cake", 2)); data.add(new Tuple2<>("Bread", 3)); data.add(new Tuple2<>("Cheese", 4)); data.add(new Tuple2<>("Milk", 1)); data.add(new Tuple2<>("Toast", 2)); data.add(new Tuple2<>("Bread", 2)); data.add(new Tuple2<>("Egg", 6)); JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2); JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY()); JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() { @Override public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception { return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1)); } }); mapRdd1.foreach(x->System.out.println(x)); /* * 所有使用List或者Iterable都能实现 */ // JavaPairRDD<String, List<Tuple2<Integer, Integer>>> results = mapRdd1.groupByKey(); // JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> results = mapRdd1.groupByKey(); JavaPairRDD<String, List<Tuple2<Integer, Integer>>> results = mapRdd1.combineByKey( new Function<Tuple2<Integer,Integer>, List<Tuple2<Integer, Integer>>>() { @Override public List<Tuple2<Integer, Integer>> call(Tuple2<Integer, Integer> value) throws Exception { List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>(); list.add(value); return list; } }, new Function2<List<Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>, List<Tuple2<Integer, Integer>>>() { @Override public List<Tuple2<Integer, Integer>> call( List<Tuple2<Integer, Integer>> it, Tuple2<Integer, Integer> value) throws Exception { // List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>(); // it.forEach(list::add); // list.add(value); ((List<Tuple2<Integer, Integer>>)it).add(value); return it; } }, new Function2<List<Tuple2<Integer, Integer>>, List<Tuple2<Integer, Integer>>, List<Tuple2<Integer, Integer>>>() { @Override public List<Tuple2<Integer, Integer>> call( List<Tuple2<Integer, Integer>> it1, List<Tuple2<Integer, Integer>> it2) throws Exception { // List<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>(); // it1.forEach(list::add); // it2.forEach(list::add); // return list; ((List)it1).addAll((List)it2); return it1; } }); results.foreach(x->System.out.println(x)); //其实,distinct 基于 reduceByKey实现 // mapRdd1.distinct(); ctx.stop(); } }
import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Scanner; import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.AbstractJavaRDDLike; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y); public class CombineByKeyTest2 { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.set("spark.testing.memory", "2147480000"); conf.setAppName("WordCounter"); conf.set("spark.default.parallelism", "4"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)经过读取外部存储 ----- 集群环境使用 2)经过内存中的集合 List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>(); data.add(new Tuple2<>("Cake", 2)); data.add(new Tuple2<>("Bread", 3)); data.add(new Tuple2<>("Cheese", 4)); data.add(new Tuple2<>("Milk", 1)); data.add(new Tuple2<>("Toast", 2)); data.add(new Tuple2<>("Bread", 2)); data.add(new Tuple2<>("Egg", 6)); int index="Code".hashCode() % 4; JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2); JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }).partitionBy(new HashPartitioner(4)).persist(StorageLevel.MEMORY_ONLY()); // JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.combineByKey( // (value) -> new Tuple2<Integer, Integer>(value,1), // (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), // (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()), // new HashPartitioner(2), // false, // null // ); // JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.aggregateByKey( // new Tuple2<Integer, Integer>(0,0), // (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), // (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()) // ); // JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() { // @Override // public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception { // return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1)); // } // }); // mapRdd1.foreach(System.out::println); // JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd1.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { // @Override // public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception { // return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); // } // }); //results.foreach(System.out::println); // results = mapRdd1.foldByKey(new Tuple2<Integer, Integer>(0, 0), new Function2<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() { // @Override // public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception { // return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); // } // }); //results.foreach(System.out::println); //思考:如何用combineByKey实现groupByKey // mapRdd.groupByKey().foreach(System.out::println); Function<Integer, List<Integer>> createCombiner=new Function<Integer, List<Integer>>() { @Override public List<Integer> call(Integer arg0) throws Exception { List<Integer>list=new ArrayList<Integer>(); list.add(arg0); return list; } }; Function2<List<Integer>, Integer, List<Integer>> mergeValue=new Function2<List<Integer>, Integer, List<Integer>>() { @Override public List<Integer> call(List<Integer> list, Integer value) throws Exception { list.add(value); return list; } }; Function2< List<Integer>,List<Integer> ,List<Integer> > mergeCombiners=new Function2<List<Integer>, List<Integer>, List<Integer>>() { @Override public List<Integer> call(List<Integer> list1, List<Integer> list2) throws Exception { List<Integer> list=new ArrayList<Integer>(); // list.addAll(list1); // list.addAll(list2); list1.forEach(list::add); list2.forEach(list::add); return list; } }; JavaPairRDD<String, List<Integer>> results=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners); results.foreach(x->System.out.println(x)); JavaPairRDD<String, Integer> re=mapRdd.partitionBy(new HashPartitioner(2)); System.out.println(re.glom().collect()); //第四个参数是分区数,glom()打印分区状态 JavaPairRDD<String, List<Integer>> results2=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners, 2); System.out.println(results2.glom().collect()); System.out.println(results2.getNumPartitions()); //第四个参数自定义分区器 JavaPairRDD<String, List<Integer>> results3=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners,new HashPartitioner(3)); System.out.println(results3.glom().collect()); System.out.println(results3.getNumPartitions()); //第四个参数自定义分区器,第五个参数Boolean类型(map短是否merge),第六个参数定义序列化规则,null为默认序列化规则 JavaPairRDD<String, List<Integer>> results4=mapRdd.combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(3), true, null); System.out.println(results4.glom().collect()); System.out.println(results4.getNumPartitions()); // mapRdd1.combineByKey( // new Function<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() { // @Override // public Tuple2<Integer,Integer> call(Tuple2<Integer, Integer> arg0) throws Exception { // return arg0; // } // }, // // new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>[]>() { // @Override // public Tuple2<Integer, Integer>[] call(Tuple2<Integer, Integer> arg0, Integer arg1) throws Exception { // return null; // } // }, // mergeCombiners); //其实,distinct 基于 reduceByKey实现 // mapRdd1.distinct(); ctx.stop(); } }
import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Scanner; import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.AbstractJavaRDDLike; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y); public class CombineByKeyTest { @SuppressWarnings("serial") public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCounter"); conf.set("spark.testing.memory", "5435657567560"); conf.set("spark.default.parallelism", "4"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)经过读取外部存储 ----- 集群环境使用 2)经过内存中的集合 List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>(); data.add(new Tuple2<>("Cake", 2)); data.add(new Tuple2<>("Bread", 3)); //<"Bread", <3,1>> data.add(new Tuple2<>("Cheese", 4)); data.add(new Tuple2<>("Milk", 1)); data.add(new Tuple2<>("Toast", 2)); data.add(new Tuple2<>("Bread", 2)); data.add(new Tuple2<>("Egg", 6)); JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2); JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair( new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }); // JavaPairRDD<String, Integer> mapRdd=ctx.parallelizePairs(data,2); mapRdd.groupByKey().foreach(x->System.out.println(x)); // JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd.combineByKey( // new Function<Integer, Tuple2<Integer, Integer>>() { // @Override // public Tuple2<Integer, Integer> call(Integer v1) throws Exception { // return new Tuple2<Integer, Integer>(v1 ,1); // } // }, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() { // @Override // public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Integer v2) throws Exception { // return new Tuple2<Integer, Integer>(v1._1() + v2, v1._2() + 1); // } // }, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { // @Override // public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception { // return new Tuple2<Integer, Integer>(v1._1() + v2._1(), v1._2() + v2._2()); // } // }); JavaPairRDD<String, Tuple2<Integer, Integer>> result2s = mapRdd.combineByKey( (Integer value) -> new Tuple2<Integer, Integer>(value,1), (Tuple2<Integer, Integer> acc, Integer value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), (Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()), new HashPartitioner(3), true, null ); result2s.foreach(x->System.out.println(x)); JavaPairRDD<String, Tuple2<Integer, Integer>> results3 = mapRdd.aggregateByKey( new Tuple2<Integer, Integer>(0,0), (acc, value) -> new Tuple2<Integer, Integer>(acc._1() + value, acc._2() + 1), (acc1, acc2) -> new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()) ); results3.foreach(x->System.out.println(x)); JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd1 = mapRdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String, Tuple2<Integer, Integer>>() { @Override public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> t) throws Exception { return new Tuple2<String, Tuple2<Integer, Integer>>(t._1(), new Tuple2<Integer, Integer>(t._2() , 1)); } }); JavaPairRDD<String, Tuple2<Integer, Integer>> results = mapRdd1.reduceByKey(new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception { return new Tuple2<Integer, Integer>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); } }); // results.foreach(System.out::println); results.foreach(x->System.out.println(x)); ctx.stop(); } }
import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Scanner; import org.apache.hive.com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.spark.HashPartitioner; import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.AbstractJavaRDDLike; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; //JavaPairRDD<String, Integer> results = mapRdd.reduceByKey((x, y)->x+y); public class CogroupApiTest { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.set("spark.testing.memory", "2147480000"); conf.setAppName("WordCounter"); conf.set("spark.default.parallelism", "4"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)经过读取外部存储 ----- 集群环境使用 2)经过内存中的集合 List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>(); data1.add(new Tuple2<>("Cake", 2)); data1.add(new Tuple2<>("Bread", 3)); data1.add(new Tuple2<>("Cheese", 4)); data1.add(new Tuple2<>("Milk", 1)); data1.add(new Tuple2<>("Toast", 2)); data1.add(new Tuple2<>("Bread", 2)); data1.add(new Tuple2<>("Egg", 6)); // JavaPairRDD<String, Integer> mapRdd1=ctx.parallelizePairs(data1); JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data1, 2); JavaPairRDD<String, Integer> mapRdd1 = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY()); List<Tuple2<String, Integer>> data2 = new ArrayList<Tuple2<String, Integer>>(); data2.add(new Tuple2<>("Cake", 2)); data2.add(new Tuple2<>("Bread", 3)); data2.add(new Tuple2<>("Cheese", 4)); data2.add(new Tuple2<>("Milk", 1)); data2.add(new Tuple2<>("Toast", 2)); JavaRDD<Tuple2<String, Integer>> rdd2 = ctx.parallelize(data2, 2); JavaPairRDD<String, Integer> mapRdd2 = rdd2.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY()); //groupWith,和cogroup是同样的效果 (Bread,([3, 2],[3])) JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> mapRdd3 = mapRdd1.cogroup(mapRdd2); mapRdd3.foreach(x->System.out.println(x)); //(Bread,(3,3)),(Bread,(2,3)),(Cake,(2,2)) 聚合操做 // JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd3 = mapRdd1.join(mapRdd2); // mapRdd3.foreach(x->System.out.println(x)); //(Bread,(Optional[3],3)), (Bread,(Optional[3],2)),(Cake,(Optional[2],2)) 聚合操做,主集合能够为optional.empty // JavaPairRDD<String, Tuple2<Optional<Integer>, Integer>> mapRdd3 = mapRdd2.rightOuterJoin(mapRdd1); // mapRdd3.foreach(x->System.out.println(x)); //(Cheese,(4,Optional[4])), (Toast,(2,Optional[2])), (Egg,(6,Optional.empty)) // JavaPairRDD<String, Tuple2<Integer, Optional<Integer>>> mapRdd4 = mapRdd1.leftOuterJoin(mapRdd2); // mapRdd4.foreach(x->System.out.println(x)); //两边都能为空 // JavaPairRDD<String, Tuple2<Optional<Integer>, Optional<Integer>>> mapRdd5 = mapRdd1.fullOuterJoin(mapRdd2); // mapRdd5.foreach(x->System.out.println(x)); //groupWith,和cogroup是同样的效果 (Bread,([3, 2],[3])) // JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> mapRdd6 = mapRdd1.groupWith(mapRdd2); // mapRdd6.foreach(x->System.out.println(x)); //(Bread,(3,3)),(Bread,(2,3)),(Cake,(2,2)) 聚合操做 // JavaPairRDD<String, Tuple2<Integer, Integer>> mapRdd7=mapRdd1.join(mapRdd2); // mapRdd7.foreach(x->System.out.println(x)); //聚合操做,将两个maprdd并集,重复元素不会被删掉 // JavaPairRDD<String,Integer> mapRdd8=mapRdd1.union(mapRdd2); // mapRdd8.foreach(x->System.out.println(x)); //删除key相同的元素 // JavaPairRDD<String, Integer> mapRdd9=mapRdd1.subtractByKey(mapRdd2); // mapRdd9.foreach(x->System.out.println(x)); //求交集,只返回key,value相同的tuple // JavaPairRDD<String, Integer> mapRdd10=mapRdd1.intersection(mapRdd2); // mapRdd10.foreach(x->System.out.println(x)); } }
import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.storage.StorageLevel; import scala.Tuple2; public class SortByKeyApi { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCounter"); conf.set("spark.testing.memory", "2147480000"); conf.set("spark.default.parallelism", "4"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)经过读取外部存储 ----- 集群环境使用 2)经过内存中的集合 List<Tuple2<String, Integer>> data = new ArrayList<Tuple2<String, Integer>>(); data.add(new Tuple2<>("Cake", 2)); data.add(new Tuple2<>("Bread", 3)); data.add(new Tuple2<>("Cheese", 4)); data.add(new Tuple2<>("Milk", 1)); data.add(new Tuple2<>("Toast", 2)); data.add(new Tuple2<>("Bread", 2)); data.add(new Tuple2<>("Egg", 6)); JavaRDD<Tuple2<String, Integer>> rdd1 = ctx.parallelize(data, 2); JavaPairRDD<String, Integer> mapRdd = rdd1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception { return t; } }).partitionBy(new HashPartitioner(2)).persist(StorageLevel.MEMORY_ONLY()); //mapRdd.sortByKey().foreach(System.out::println); mapRdd.sortByKey(false).foreach(x->System.out.println(x)); // mapRdd.sortByKey(new Comparator<Tuple2<String, Integer>>() { // @Override // public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) { // return 0; // } // }); // mapRdd.f // mapRdd.mapValues(x->x+1).foreach(x->System.out.println(x)); // mapRdd.flatMapValues(()->Arrays.asList(1,1,1)); ctx.stop(); } }
import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class SortByKeyApiTest { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCounter"); conf.set("spark.default.parallelism", "4"); conf.set("spark.testing.memory", "2147480000"); JavaSparkContext ctx = new JavaSparkContext(conf); //建立RDD:1)经过读取外部存储 ----- 集群环境使用 2)经过内存中的集合 List<Person> data1 = new ArrayList<Person>(); data1.add(new Person("Cake",32)); data1.add(new Person("Bread",21)); data1.add(new Person("Smith",32)); data1.add(new Person("Hourse",21)); data1.add(new Person("Mary",32)); data1.add(new Person("Greey",21)); data1.add(new Person("Greey",21)); data1.add(new Person("Tom",32)); data1.add(new Person("Gao",21)); System.out.println(ctx.parallelize(data1).distinct().count()); // .sortBy(x->x, true, 2).foreach(x->System.out.println(x)); List<Tuple2<Person, Integer>> data = new ArrayList<Tuple2<Person, Integer>>(); data.add(new Tuple2<Person, Integer>(new Person("Cake",32), 2)); data.add(new Tuple2<Person, Integer>(new Person("Bread",21), 3)); data.add(new Tuple2<Person, Integer>(new Person("Smith",32), 2)); data.add(new Tuple2<Person, Integer>(new Person("Hourse",21), 3)); data.add(new Tuple2<Person, Integer>(new Person("Mary",32), 2)); data.add(new Tuple2<Person, Integer>(new Person("Greey",21), 3)); data.add(new Tuple2<Person, Integer>(new Person("Greey",11), 3)); data.add(new Tuple2<Person, Integer>(new Person("Tom",32), 2)); data.add(new Tuple2<Person, Integer>(new Person("Gao",21), 3)); JavaPairRDD<Person, Integer> dataRdd = ctx.parallelizePairs(data); dataRdd.sortByKey().foreach(x->System.out.println(x)); dataRdd.sortByKey(new Comparator<Person>() { @Override public int compare(Person o1, Person o2) { int res = o1.name.compareTo(o2.name); if(res == 0){ res = o1.age - o2.age; } return res; } }); ctx.close(); ctx.stop(); } } class Person implements Serializable, Comparable<Person>{ private static final long serialVersionUID = 1L; public Person(String name, int age) { super(); this.name = name; this.age = age; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + age; result = prime * result + ((name == null) ? 0 : name.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Person other = (Person) obj; if (age != other.age) return false; if (name == null) { if (other.name != null) return false; } else if (!name.equals(other.name)) return false; return true; } String name; int age; @Override public int compareTo(Person p) { int res = this.name.compareTo(p.name); if(res == 0){ res = this.age - p.age; } return res; } @Override public String toString() { return "Person [name=" + name + ", age=" + age + "]"; } }