Spark的主要抽象是分布式的元素集合(distributed collection of items),称为RDD(Resilient Distributed Dataset,弹性分布式数据集)html
它可被分发到集群各个节点上,进行并行操做。RDD能够经过Hadoop InputFormats建立,如 HDFS,或者从其余RDD转化而来。java
RDD的操做分为两种,一种是转化(transformation)操做,一种是执行(action)操做,相似于SQL中的聚合函数。算法
Transformation数据库
Actionapache
RDD的建立2种方式:api
主要使用parallelize和makeRDD函数网络
sc.parallelize(List(1,2,3)) sc.makeRDD(List(1,2,3)) sc.parallelize(Array(1 to 10))
sc.textFile("hdfs://127.0.0.1:9000/tmp/in.txt") sc.textFile("file://G:/tmp/in.txt")
前面的内容算是一个简单的复习,这里咱们介绍Spark中另外一个很是重要的的知识,partition,能够翻译为分区或者分片,在不少涉及分布式的应用中都有这个概念。分布式
本质就是将大的数据拆分为小的数据集合,好比咱们从hdfs目录建立一个RDD,这个目录中的数据可能有几百个G,显然放在一个一块儿对于处理是不利的。ide
算法中有一个重要的思想就是分而治之,Spark的partition就利用了这种思想,将一个RDD拆分到不一样的partition,这样咱们的算法就能够利用多机器、多核并行来处理这些数据了。函数
下文中sc没有特殊说明,都是:
SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("create JavaSparkContext"); sparkConf.setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(sparkConf);
sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3) .partitionBy(new HashPartitioner(3))
HashPartitioner肯定分区的方式:partition = key.hashCode () % numPartitions
sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3) .partitionBy(new RangePartitioner(3,counts))
RangePartitioner会对key值进行排序,而后将key值被划分红3份key值集合。
继承org.apache.spark.Partitioner,重写getPartition方法。
sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new CustomPartitioner(3))
sc.defaultParallelism sc.defaultMinPartitions
sc.defaultParallelism的值能够经过spark-defaults.conf配置文件配置:
spark.default.parallelism=20
还会受代码设置影响:
sparkConf.setMaster("local[*]");
local[*]和local,分区数就是运行机器cpu的核数,local[n],分区数就是n的值
还和提交时下面两个参数相关: --num-executors --executor-cores
从HDFS读入文件的分区数默认等于HDFS文件的blocks,例如,咱们有一个目录的数据量是100G, HDFS默认的块容量大小128MB,那么hdfs的块数为: 100G/128M = 800 那么Spark读取SparkContext.textFile()读取该目录,默认分区数为800。
分区数太多意味着任务数太多,每次调度任务也是很耗时的,因此分区数太多会致使整体耗时增多。
分区数太少的话,会致使一些结点没有分配到任务;另外一方面,分区数少则每一个分区要处理的数据量就会增大,从而对每一个结点的内存要求就会提升;还有分区数不合理,会致使数据倾斜问题。
根据任务是IO型仍是CPU型,分区数设置为executor-cores * num-executor的2到3倍。
首先咱们来梳理一下Spark的执行,咱们使用Java编写Spark应用,会有一个包含main方法的Class,这个通常被称做是Driver program。
咱们经过集群运行Spark应用,会把应用和应用依赖的jar包拷贝到全部的节点,固然为了不拷贝能够指定一个网络磁盘来存放依赖jar包。
Spark本质是处理数据,当Spark读取数据的时候,就会把这些数据拆分为n个partition,而后封装为不一样的Task发给不一样的机器去执行。
对于不少操做是有效的,例如map、foreach等,可是对于另外一些操做就须要额外的处理,例如reduceByKey,由于相同的key可能分布在不一样的partition中,甚至不一样的机器节点上,因此必须读取全部partition中的数据到一个节点,而后执行夸partition的计算来获取最终结果,这个过程就称之为shuffle。
可以形成shuffle的操做:
repartition、coalesce、repartitionAndSortWithinPartitions、groupByKey、reduceByKey、sortByKey、aggregateByKey、groupByKey、cogroup、join
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.junit.Before; import org.junit.Test; import scala.Tuple2; import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Map; public class ByKeyAccumulatorTest implements Serializable { private transient JavaSparkContext sc; @Before public void setUp(){ SparkConf conf = new SparkConf().setAppName("test").setMaster("local"); sc = new JavaSparkContext(conf); } @Test public void reduceByKey(){ List<Tuple2<String, Integer>> datas = Arrays.asList( new Tuple2<>("A", 10), new Tuple2<>("B", 20), new Tuple2<>("A", 30), new Tuple2<>("A", 60), new Tuple2<>("C", 30), new Tuple2<>("B", 40) ); JavaRDD<Tuple2<String, Integer>> RDDOne = sc.parallelize(datas); JavaPairRDD<String, Integer> pairRDDOne = JavaPairRDD.fromJavaRDD(RDDOne); JavaPairRDD<String, Integer> rdd = pairRDDOne.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("v1:" + v1); System.out.println("v2:" + v2); return v1 + v2; } }); Map<String, Integer> stringIntegerMap = rdd.collectAsMap(); System.out.println(stringIntegerMap); rdd.saveAsTextFile("G:\\tmp\\spark"); } @Test public void aggregateByKey() { List<Tuple2<String, Integer>> datas = Arrays.asList( new Tuple2<>("A", 10), new Tuple2<>("B", 20), new Tuple2<>("A", 30), new Tuple2<>("A", 60), new Tuple2<>("C", 30), new Tuple2<>("B", 40) ); JavaRDD<Tuple2<String, Integer>> RDDOne = sc.parallelize(datas); JavaPairRDD<String, Integer> pairRDDOne = JavaPairRDD.fromJavaRDD(RDDOne); JavaPairRDD<String, Integer> rdd = pairRDDOne.aggregateByKey(0, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 > v2 ? v1 : v2; } }, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); Map<String, Integer> stringIntegerMap = rdd.collectAsMap(); System.out.println(stringIntegerMap); } @Test public void countByKey(){ List<Tuple2<String, Integer>> datas = Arrays.asList( new Tuple2<>("A", 10), new Tuple2<>("B", 20), new Tuple2<>("A", 30), new Tuple2<>("A", 60), new Tuple2<>("C", 30), new Tuple2<>("B", 40) ); JavaRDD<Tuple2<String, Integer>> RDDOne = sc.parallelize(datas); JavaPairRDD<String, Integer> pairRDDOne = JavaPairRDD.fromJavaRDD(RDDOne); Map<String, Long> stringLongMap = pairRDDOne.countByKey(); System.out.println(stringLongMap); } }
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.junit.Before; import org.junit.Test; import java.io.Serializable; import java.util.Arrays; import java.util.List; public class ForeachAccumulatorTest implements Serializable { private transient JavaSparkContext sc; private transient List<String> datas; @Before public void setUp(){ SparkConf conf = new SparkConf().setAppName("test").setMaster("local"); sc = new JavaSparkContext(conf); datas = Arrays.asList("hello", "spark", "hadoop", "瑞雪兆丰年","Java"); } @Test public void foreach(){ JavaRDD<String> dataRDD = sc.parallelize(datas); dataRDD.foreach(new VoidFunction<String>() { @Override public void call(String s) throws Exception { System.out.println(s.length()); } }); } @Test public void filter(){ JavaRDD<String> dataRDD = sc.parallelize(datas); JavaRDD<String> resultRDD = dataRDD.filter(new Function<String, Boolean>() { @Override public Boolean call(String v1) throws Exception { return v1.contains("h"); } }); List<String> collect = resultRDD.collect(); collect.forEach(System.out::println); } }
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.DoubleFlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.junit.Before; import org.junit.Test; import scala.Tuple2; import java.io.Serializable; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedList; import java.util.List; public class MapAccumulatorTest implements Serializable { private transient JavaSparkContext sc; private transient List<String> datas; @Before public void setUp(){ SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("create JavaSparkContext"); sparkConf.setMaster("local[*]"); sc = new JavaSparkContext(sparkConf); //datas = sc.textFile("file:///G:/tmp/in.txt"); datas = Arrays.asList("hello", "spark", "hadoop", "瑞雪兆丰年","Java"); } @Test public void map(){ JavaRDD<String> toDealRDD = sc.parallelize(datas, 3); JavaRDD<Integer> map = toDealRDD.map(s -> s.length()); //收集因此数据到驱动节点 List<Integer> results = map.collect(); results.forEach(System.out::println); } @Test public void mapPartitions(){ JavaRDD<String> toDealRDD = sc.parallelize(datas, 3); toDealRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() { @Override public Iterator<Integer> call(Iterator<String> stringIterator) throws Exception { LinkedList<Integer> lengths = new LinkedList<>(); //处理外部资源等公共耗时操做 while (stringIterator.hasNext()) { String content = stringIterator.next(); lengths.add(content.length()); } //返回的是iterator return lengths.iterator(); } }); } @Test public void mapPartitionsToParis(){ JavaRDD<String> toDealRDD = sc.parallelize(datas, 3); toDealRDD.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, Integer>() { @Override public Iterator<Tuple2<String, Integer>> call(Iterator<String> stringIterator) throws Exception { List<Tuple2<String,Integer>> result = new LinkedList<>(); //处理外部资源等公共耗时操做 while (stringIterator.hasNext()) { String content = stringIterator.next(); Tuple2<String, Integer> tuple = new Tuple2<>(content, content.length()); result.add(tuple); } return result.iterator(); } }); } @Test public void mapPartitionsToDouble(){ JavaRDD<String> toDealRDD = sc.parallelize(datas, 3); toDealRDD.mapPartitionsToDouble(new DoubleFlatMapFunction<Iterator<String>>() { @Override public Iterator<Double> call(Iterator<String> stringIterator) throws Exception { LinkedList<Double> result = new LinkedList<>(); while (stringIterator.hasNext()) { String content = stringIterator.next(); result.add(Double.valueOf(content.length())); } return result.iterator(); } }); } @Test public void flatMap(){ List<String> listOne = Arrays.asList("hello", "spark", "hadoop", "瑞雪兆丰年", "Java"); List<String> listTwo = Arrays.asList("语言", "C", "C++", "Java", "Python"); List<List<String>> datas = Arrays.asList(listOne, listTwo); // JavaRDD<List<String>> toDealRDD = sc.parallelize(datas, 3); JavaRDD<List<String>> toDealRDD = sc.parallelize(datas); JavaRDD<Integer> mapResult = toDealRDD.map(new Function<List<String>, Integer>() { @Override public Integer call(List<String> content) throws Exception { return content.size(); } }); List<Integer> mapList = mapResult.collect(); mapList.forEach(System.out::println); System.out.println("------------------"); JavaRDD<Integer> flatMapRDD = toDealRDD.flatMap(new FlatMapFunction<List<String>, Integer>() { @Override public Iterator<Integer> call(List<String> strings) throws Exception { LinkedList<Integer> result = new LinkedList<>(); strings.forEach(content -> result.add(content.length())); return result.iterator(); } }); List<Integer> flatMapResult = flatMapRDD.collect(); flatMapResult.forEach(System.out::println); } }
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.junit.Before; import org.junit.Test; import scala.Tuple2; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class SaveAccumulatorTest implements Serializable { private transient JavaSparkContext sc; private transient List<Tuple2<String, Integer>> datas; @Before public void setUp(){ SparkConf conf = new SparkConf().setAppName("test").setMaster("local"); sc = new JavaSparkContext(conf); datas = new ArrayList<>(); datas.add(new Tuple2<>("A", 10)); datas.add(new Tuple2<>("B", 1)); datas.add(new Tuple2<>("A", 6)); datas.add(new Tuple2<>("C", 5)); datas.add(new Tuple2<>("B", 3)); } @Test public void saveAsTextFile(){ JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(datas); String path = "G:\\tmp\\spark\\saveAsTextFile\\" + System.currentTimeMillis(); dataRDD.saveAsTextFile(path); } @Test public void saveAsSequenceFile(){ List<Tuple2<String,Integer>> datas = new ArrayList<>(); datas.add(new Tuple2<>("A", 10)); datas.add(new Tuple2<>("B", 1)); datas.add(new Tuple2<>("A", 6)); datas.add(new Tuple2<>("C", 5)); datas.add(new Tuple2<>("B", 3)); JavaPairRDD<String, Integer> dataRDD = sc.parallelizePairs(datas); String path = "G:\\tmp\\spark\\saveAsHadoopFile\\" + System.currentTimeMillis(); dataRDD.saveAsHadoopFile(path, String.class, IntWritable.class, TextOutputFormat.class); } @Test public void saveAsObjectFile(){ JavaRDD<Tuple2<String, Integer>> dataRDD = sc.parallelize(datas); String path = "G:\\tmp\\spark\\saveAsObjectFile\\" + System.currentTimeMillis(); dataRDD.saveAsObjectFile(path); } }
map、filter、foreach都会遍历元素,可是他们仍是有一些差异的。
map是把dataset中的元素映射为另外一个元素,甚至是另外一种类型。
filter接收的函数返回值是boolean类型,主要是过滤元素用,减小dataset中的数据量。
foreach没有返回值。
coalesce、repartition、repartitionAndSortWithinPartitions这三个算子都会从新分区,可是仍是有明显的区别。
coalesce主要是用于减小分区数量,在filter算子以后数据量大量减小以后,减小分区数量能够提升效率。
repartition能够用于增长或者减小分区数量,老是会shuffle全部的数据
repartitionAndSortWithinPartitions,若是分区以后要排序,那么使用repartitionAndSortWithinPartitions,而不是先使用repartition,而后使用sortBy。
有什么区别呢?
假如处理函数内部存在数据库连接、文件等的建立及关闭,无partition的算子会致使处理每一个元素时建立一次连接或者句柄,致使性能问题。
因此在处理函数中须要访问外部资源的时候,咱们应该尽可能选择有partition的算子。这样函数传入的参数是整个分区数据的迭代器,能够一次处理一个partition,能够减小外部资源处理时间。