表示并行计算的一个计算单元html
RDD 内部的数据集合在逻辑上和物理上被划分红多个小子集合,这样的每个子集合咱们将其称为分区,分区的个数会决定并行计算的粒度,而每个分区数值的计算都是在一个单独的任务中进行,所以并行任务的个数,也是由 RDD(其实是一个阶段的末 RDD,调度章节会介绍)分区的个数决定的git
RDD的分区数量可经过rdd.getPartitions获取。 getPartitions方法是在RDD类中定义的,由不一样的子类进行具体的实现算法
获取分区的定义数组
在RDD类中定义了getPartition方法,返回一个Partition列表,Partition对象只含有一个编码index字段,不一样RDD的partition会继承Partition类,例如JdbcPartition、KafkaRDDPartition,HadoopPartition等。ide
class RDD{ // 获取分区定义 def getPartitions:Array[Partition] } // Partition类定义 trait Partition { def index:Int }
transformation类型的RDD分区数量和父RDD的保持一致,Action类型的RDD分区数量,不一样的数据源默认值不同,获取的方式也不一样函数
kafkaRDD的partition数量等于compute方法中生成OffsetRange的数量。oop
// DirectKafkaInputDStream类在接收到消息后经过compute方法计算获得OffsetRange class OffsetRange( val topic:String, // Kafka topic name val partition:Int, // Kafka partition id val fromOffset:Long, val untilOffset:Long ){...} class KafkaRDD( val offsetRages:Array[OffsetRange] ) extends RDD{ // 遍历OffsetRange数组,获得数组下标和偏移量等信息生成KafkaRDDPartition offsetRanges.zipWithIndex.map{ case(o,i)=> new KafkaRDDPartition( i,o.topic,o.partition, o.fromOffset,o.untilOffset ) }.toArray }
HadoopRDD的分区是基于hadoop的splits方法进行的。每一个partition的大小默认等于hdfs的block的大小this
例如:一个txt文件12800M,则 val rdd1=sc.textFile("/data.txt");
rdd1默认会有12800/128=10个分区。编码
class HadoopRDD(){ // 生成一个RDD的惟一ID val id=Int=sc.newRddId() def getPartitions:Array[Partition]={ // 调用hadoop的splits方法进行切割 val inputSplits=inputFormat.getSplits( jobConf,minPartitions ) // 组成spark的partition val array=new Array[Partition](inputSplits.size) for(i <- 0 until inputSplits.size){ array(i)=new HadoopPartition(id,i, inputSplits(i)) } } }
hadoop的FileInputFormat类: texfile的分区大小时指定的分区数和block树中取较大值,因此当指定numPartitions,小于block数时无效,大于则生效spa
JDBC的partition划分是指定开始行和结束行,而后将查询到的结果分为3个(默认值)partition。
class JdbcRDD(numPartitions:Int){ def getPartitions:Array[Partition]={ (0 until numPartitions).map{ new JdbcPartition(i,start,end) }.toArray } }
转换类的RDD分区数量是由其父类的分区数决定的
// 获取父RDD列表的第一个RDD class RDD{ def firstParent:RDD={ dependencies.head.rdd.asInstanceOf[RDD] } } class MapPartitionsRDD(){ // 获取父RDD的partitions数量 def getPartitions: Array[Partition] = firstParent[T].partitions }
分区数量的原则:尽量的选择大的分区值
Spark API | partition数量 |
---|---|
sc.parallelize(…) | sc.defaultParallelism |
sc.textFile(…) | max(传参, block数) |
sc.newAPIHadoopRDD(…) | max(传参, block数) |
new JdbcRDD(…) | 传参 |
reduceByKey(),foldByKey(),combineByKey(), groupByKey(),sortByKey(),mapValues(),flatMapValues() 和父RDD相同
cogroup(), join(), ,leftOuterJoin(), rightOuterJoin(): 全部父RDD按照其partition数降序排列,从partition数最大的RDD开始查找是否存在partitioner,存在则partition数由此partitioner肯定,不然,全部RDD不存在partitioner,由spark.default.parallelism肯定,若还没设置,最后partition数为全部RDD中partition数的最大值
注意:只有Key-Value类型的RDD才有分区的,非Key-Value类型的RDD分区的值是None的
abstract class Partitioner extends Serializable { def numPartitions: Int // 分区数量 def getPartition(key: Any): Int // 分区编号 }
partitioner分区器做用:
分区器的选择:
object Partitioner{ def defaultPartitioner(rdd):Partitioner={ val hasPartitioner= rdds.filter( _.partitioner.exists(_numPartitions>0)) } // 若是RDD已经有分区则选取其分区数最多的 if(hasPartitioner.nonEmpty){ hasPartitioner.maxBy(_.partitions.length). partitioner.get }else{ if(rdd.context.conf.contains( "spark.default.parallelism" )){ // 若是在conf中配置了分区数则用之 new HashPartitioner( rdd.context.defaultParallelism ) }else{ // 若是没有配置parallelism则和父RDD中最大的保持一致 new HashPartitioner(rdds.map( _.partitions.length ).max) } } }
HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,若是余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID
class HashPartitioner(partitions:Int) { def getPartition(key:Any):Int=key match{ case null=> 0 case _=> nonNegativeMod(key.hashCode, numPartitions) } def nonNegativeMod(x: Int, mod: Int): Int = { val rawMod = x % mod rawMod + (if (rawMod < 0) mod else 0) } // 判断两个RDD分区方式是否同样 def equals(other:Any):Boolean= other match{ case h:HashPartitioner => h.numPartitions==numPartitions case _ => false } }
HashPartitioner分区可能致使每一个分区中数据量的不均匀。而RangePartitioner分区则尽可能保证每一个分区中数据量的均匀,将必定范围内的数映射到某一个分区内。分区与分区之间数据是有序的,但分区内的元素是不能保证顺序的。
RangePartitioner分区执行原理:
一句话归纳:就是遍历每一个paritiion,对里面的数据进行抽样,把抽样的数据进行排序,并按照对应的权重肯定边界
给定总的数据抽样大小,最多1M的数据量(10^6),最少20倍的RDD分区数量,也就是每一个RDD分区至少抽取20条数据
class RangePartitioner(partitions,rdd) { // 1. 计算样本大小 val sampleSize =math.min(20.0 * partitions, 1e6) }
RDD各分区中的数据量可能会出现倾斜的状况,乘于3的目的就是保证数据量小的分区可以采样到足够的数据,而对于数据量大的分区会进行第二次采样
class RangePartitioner(partitions,rdd) { // 1. 计算样本大小 val sampleSize = math.min(20.0 * partitions, 1e6) // 2. 计算样本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt }
根据以上两个值进行水塘抽样,返回RDD的总数据量,分区ID和每一个分区的采样数据。其中总数据量经过遍历RDD全部partition的key累加获得的,不是经过rdd.count计算获得的
class RangePartitioner(partitions,rdd) { // 1. 计算样本大小 val sampleSize =math.min(20.0 * partitions, 1e6) // 2. 计算样本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt } // 3. 进行抽样,返回总数据量,分区ID和样本数据 val (numItems, sketched) = RangePartitioner.sketch( rdd.map(_._1), sampleSizePerPartition)
若是有较大RDD存在,则按照平均值去采样的话数据量太少,容易形成数据倾斜,因此须要进行二次采样
判断是否须要从新采样方法: 样本数量占比乘以当前RDD的总行数大于预设的每一个RDD最大抽取数量,说明这个RDD的数据量比较大,须要采样更多的数据:eg: 0.2100=20<60;0.220000=2000>60
class RangePartitioner(partitions,rdd) { // 1. 计算样本大小 val sampleSize =math.min(20.0 * partitions, 1e6) // 2. 计算样本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt } // 3. 进行抽样,返回总数据量,分区ID和样本数据 val (numItems, sketched) = RangePartitioner.sketch( rdd.map(_._1), sampleSizePerPartition) // 4. 是否须要二次采样 val imbalancedPartitions = mutable.Set.empty[Int] if (fraction * n > sampleSizePerPartition) { // 记录须要从新采样的RDD的ID imbalancedPartitions += idx }
计算每一个采样数据的权重占比,根据采样数据的ID和权重生成出RDD分区边界数组
权重计算方法:总数据量/当前RDD的采样数据量
class RangePartitioner(partitions,rdd) { // 1. 计算样本大小 val sampleSize = math.min(20.0 * partitions, 1e6) // 2. 计算样本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt } // 3. 进行抽样,返回总数据量,分区ID和样本数据 val (numItems, sketched) = RangePartitioner.sketch( rdd.map(_._1), sampleSizePerPartition) // 4. 是否须要二次采样 val imbalancedPartitions = mutable.Set.empty[Int] // 5. 保存样本数据的集合buffer:包含数据和权重 val candidates = ArrayBuffer.empty[(K, Float)] if (fraction * n > sampleSizePerPartition) { // 记录须要从新采样的RDD的ID imbalancedPartitions += idx }else{ // 5. 计算样本权重 val weight = ( // 采样数据的占比 n.toDouble / sample.length).toFloat for (key <- sample) { // 记录采样数据key和权重 candidates += ((key, weight)) } } }
对于数据分布不均衡的RDD分区,从新进行二次抽样。 二次抽样采用的是RDD的采样方法:RDD.sample
class RangePartitioner(partitions,rdd) { // 1. 计算样本大小 val sampleSize = math.min(20.0 * partitions, 1e6) // 2. 计算样本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt } // 3. 进行抽样,返回总数据量,分区ID和样本数据 val (numItems, sketched) = RangePartitioner.sketch( rdd.map(_._1), sampleSizePerPartition) // 4. 是否须要二次采样 val imbalancedPartitions = mutable.Set.empty[Int] // 5. 保存样本数据的集合buffer:包含数据和权重 val candidates = ArrayBuffer.empty[(K, Float)] if (fraction * n > sampleSizePerPartition) { // 记录须要从新采样的RDD的ID imbalancedPartitions += idx }else{ // 5. 计算样本权重 val weight = ( // 采样数据的占比 n.toDouble / sample.length).toFloat for (key <- sample) { // 记录采样数据key和权重 candidates += ((key, weight)) } } // 6. 对于数据分布不均衡的RDD分区,从新数据抽样 if (imbalancedPartitions.nonEmpty) { // 利用rdd的sample抽样函数API进行数据抽样 val reSampled = imbalanced.sample( withReplacement = false, fraction, seed).collect() } }
将最终的抽样数据计算出分区边界数组返回,边界数组里面存放的是RDD里面数据的key值, 好比最终返回的数组是:array[0,10,20,30..] 其中0,10,20,30是采样数据中的key值,对于每一条数据都会判断其在此数组的那个区间中间,例若有一条数据key值是3则其在0到10之间,属于第一个分区,同理Key值为15的数据在第二个分区
class RangePartitioner(partitions,rdd) { // 1. 计算样本大小 val sampleSize = math.min(20.0 * partitions, 1e6) // 2. 计算样本最大值 val sampleSizePerPartition = math.ceil( 3.0 * sampleSize / rdd.partitions.length ).toInt } // 3. 进行抽样,返回总数据量,分区ID和样本数据 val (numItems, sketched) = RangePartitioner.sketch( rdd.map(_._1), sampleSizePerPartition) // 4. 是否须要二次采样 val imbalancedPartitions = mutable.Set.empty[Int] // 5. 保存样本数据的集合buffer:包含数据和权重 val candidates = ArrayBuffer.empty[(K, Float)] if (fraction * n > sampleSizePerPartition) { // 记录须要从新采样的RDD的ID imbalancedPartitions += idx }else{ // 5. 计算样本权重 val weight = ( // 采样数据的占比 n.toDouble / sample.length).toFloat for (key <- sample) { // 记录采样数据key和权重 candidates += ((key, weight)) } } // 6. 对于数据分布不均衡的RDD分区,从新数据抽样 if (imbalancedPartitions.nonEmpty) { // 利用rdd的sample抽样函数API进行数据抽样 val reSampled = imbalanced.sample( withReplacement = false, fraction, seed).collect() } // 7. 生成边界数组 RangePartitioner.determineBounds( candidates, partitions) }
水塘抽样概念: 它是一系列的随机算法,其目的在于从包含n个项目的集合S中选取k个样本,使得每条数据抽中的几率是k/n。其中n为一很大或未知的数量,尤为适用于不能把全部n个项目都存放到主内存的状况
咱们能够:定义取出的行号为choice,第一次直接以第一行做为取出行 choice ,然后第二次以二分之一律率决定是否用第二行替换 choice ,第三次以三分之一的几率决定是否以第三行替换 choice ……,以此类推。由上面的分析咱们能够得出结论,在取第n个数据的时候,咱们生成一个0到1的随机数p,若是p小于1/n,保留第n个数。大于1/n,继续保留前面的数。直到数据流结束,返回此数,算法结束。
详见: https://www.iteblog.com/archives/1525.html https://my.oschina.net/freelili/blog/2987667
实现:
难点:
// 计算出须要替换的数组下标 // 选取第n个数的几率是:n/l; 若是随机替换数组值的几率是p=rand.nextDouble, // 则若是p<k/l;则替换池中任意一个数,即: p*l < k 则进行替换,用p*l做为随机替换的下标 val replacementIndex = (rand.nextDouble() * l).toLong if (replacementIndex < k) { // 替换reservoir[随机抽取的下标]的值为input[l]的值item reservoir(replacementIndex.toInt) = item }
若是分区边界数组的大小小于或等于128的时候直接变量数组,不然采用二分查找法肯定key属于某个分区。
遍历数组,判断当前key值是否属于当前区间
// 根据RDD的key值返回对应的分区id。从0开始 def getPartition(key: Any): Int = { // 强制转换key类型为RDD中本来的数据类型 val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <= 128) { // 若是分区数据小于等于128个,那么直接本地循环寻找当前k所属的分区下标 // ordering.gt(x,y):若是x>y,则返回true while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 }
对于分区数大于128的状况,采样二分法查找
// 根据RDD的key值返回对应的分区id。从0开始 def getPartition(key: Any): Int = { // 若是分区数量大于128个,那么使用二分查找方法寻找对应k所属的下标; // 可是若是k在rangeBounds中没有出现,实质上返回的是一个负数(范围)或者是一个超过rangeBounds大小的数(最后一个分区,比全部数据都大) // Determine which binary search method to use only once. partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition-1 } if (partition > rangeBounds.length) { partition = rangeBounds.length }
自定义:
public class MyPartioner extends Partitioner { @Override public int numPartitions() { return 1000; } @Override public int getPartition(Object key) { String k = (String) key; int code = k.hashCode() % 1000; System.out.println(k+":"+code); return code < 0?code+1000:code; } @Override public boolean equals(Object obj) { if(obj instanceof MyPartioner){ if(this.numPartitions()==((MyPartioner) obj).numPartitions()){ return true; } return false; } return super.equals(obj); } }
调用:pairRdd.groupbykey(new MyPartitioner())
参考连接:https://ihainan.gitbooks.io/spark-source-code/content/section1/rddPartitions.html
sparkCore源码解析系列: