本文基于Spark 2.1.0版本node
新手首先要明白几个配置:shell
spark.default.parallelism:(默认的并发数)json
若是配置文件spark-default.conf中没有显示的配置,则按照以下规则取值:并发
本地模式(不会启动executor,由SparkSubmit进程生成指定数量的线程数来并发):分布式
spark-shell spark.default.parallelism = 1oop
spark-shell --master local[N] spark.default.parallelism = N (使用N个核)spa
spark-shell --master local spark.default.parallelism = 1线程
伪集群模式(x为本机上启动的executor数,y为每一个executor使用的core数,scala
z为每一个 executor使用的内存)orm
spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y
mesos 细粒度模式
Mesos fine grained mode spark.default.parallelism = 8
其余模式(这里主要指yarn模式,固然standalone也是如此)
Others: total number of cores on all executor nodes or 2, whichever is larger
spark.default.parallelism = max(全部executor使用的core总数, 2)
通过上面的规则,就能肯定了spark.default.parallelism的默认值(前提是配置文件spark-default.conf中没有显示的配置,若是配置了,则spark.default.parallelism = 配置的值)
还有一个配置比较重要,spark.files.maxPartitionBytes = 128 M(默认)
The maximum number of bytes to pack into a single partition when reading files.
表明着rdd的一个分区能存放数据的最大字节数,若是一个400m的文件,只分了两个区,则在action时会发生错误。
当一个spark应用程序执行时,生成spark.context,同时会生成两个参数,由上面获得的spark.default.parallelism推导出这两个参数的值
sc.defaultParallelism = spark.default.parallelism
sc.defaultMinPartitions = min(spark.default.parallelism,2)
当sc.defaultParallelism和sc.defaultMinPartitions最终确认后,就能够推算rdd的分区数了。
有两种产生rdd的方式:
1,经过scala 集合方式parallelize生成rdd,
如, val rdd = sc.parallelize(1 to 10)
这种方式下,若是在parallelize操做时没有指定分区数,则
rdd的分区数 = sc.defaultParallelism
2,经过textFile方式生成的rdd,
如, val rdd = sc.textFile(“path/file”)
有两种状况:
a,从本地文件file:///生成的rdd,操做时若是没有指定分区数,则默认分区数规则为:
(按照官网的描述,本地file的分片规则,应该按照hdfs的block大小划分,但实测的结果是固定按照32M来分片,多是bug,不过不影响使用,由于spark能用全部hadoop接口支持的存储系统,因此spark textFile使用hadoop接口访问本地文件时和访问hdfs仍是有区别的)
rdd的分区数 = max(本地file的分片数, sc.defaultMinPartitions)
b,从hdfs分布式文件系统hdfs://生成的rdd,操做时若是没有指定分区数,则默认分区数规则为:
rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)
补充:
1,若是使用以下方式,从HBase的数据表转换为RDD,则该RDD的分区数为该Table的region数。
String tableName ="pic_test2";
conf.set(TableInputFormat.INPUT_TABLE,tableName);
conf.set(TableInputFormat.SCAN,convertScanToString(scan));
JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf,
TableInputFormat.class,ImmutableBytesWritable.class,
Result.class);
Hbase Table:pic_test2的region为10,则hBaseRDD的分区数也为10。
2,若是使用以下方式,经过获取json(或者parquet等等)文件转换为DataFrame,则该DataFrame的分区数和该文件在文件系统中存放的Block数量对应。
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
people.json大小为300M,在HDFS中占用了2个blocks,则该DataFrame df分区数为2。
3,Spark Streaming获取Kafka消息对应的分区数,不在本文讨论。