spark一个最重要的特性就是对数据集在各个节点的分区进行控制。控制数据分布能够减小网络开销,极大地提高总体性能。java
只有Pair RDD才有分区,非Pair RDD分区的值是None。若是RDD只被扫描一次,不必预先分区处理;若是RDD屡次在诸如链接这种基于键的操做中使用时,分区才有做用。apache
分区器决定了RDD的分区个数及每条数据最终属于哪一个分区。编程
spark提供了两个分区器:HashPartitioner和RangePartitioner,它们都继承于org.apache.spark.Partitioner类并实现三个方法。数组
HashPartitioner分区执行原理:对于给定的key,计算其hashCode,再除以分区数取余,最后的值就是这个key所属的分区ID。实现以下:网络
class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") def numPartitions: Int = partitions def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions }
HashPartitioner分区可能致使每一个分区中数据量的不均匀。而RangePartitioner分区则尽可能保证每一个分区中数据量的均匀,将必定范围内的数映射到某一个分区内。分区与分区之间数据是有序的,但分区内的元素是不能保证顺序的。app
RangePartitioner分区执行原理:less
class RangePartitioner[K: Ordering : ClassTag, V]( partitions: Int, rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true) extends Partitioner { // We allow partitions = 0, which happens when sorting an empty RDD under the default settings. require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.") // 获取RDD中K类型数据的排序器 private var ordering = implicitly[Ordering[K]] // An array of upper bounds for the first (partitions - 1) partitions private var rangeBounds: Array[K] = { if (partitions <= 1) { // 若是给定的分区数小于等于1的状况下,直接返回一个空的集合,表示数据不进行分区 Array.empty } else { // This is the sample size we need to have roughly balanced output partitions, capped at 1M. // 给定总的数据抽样大小,最多1M的数据量(10^6),最少20倍的RDD分区数量,也就是每一个RDD分区至少抽取20条数据 val sampleSize = math.min(20.0 * partitions, 1e6) // Assume the input partitions are roughly balanced and over-sample a little bit. // RDD各分区中的数据量可能会出现倾斜的状况,乘于3的目的就是保证数据量小的分区可以采样到足够的数据,而对于数据量大的分区会进行第二次采样 val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt // 从rdd中抽取数据,返回值:(总rdd数据量, Array[分区id,当前分区的数据量,当前分区抽取的数据]) val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) if (numItems == 0L) { // 若是总的数据量为0(RDD为空),那么直接返回一个空的数组 Array.empty } else { // If a partition contains much more than the average number of items, we re-sample from it // to ensure that enough items are collected from that partition. // 计算总样本数量和总记录数的占比,占比最大为1.0 val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0) // 保存样本数据的集合buffer val candidates = ArrayBuffer.empty[(K, Float)] // 保存数据分布不均衡的分区id(数据量超过fraction比率的分区) val imbalancedPartitions = mutable.Set.empty[Int] // 计算抽取出来的样本数据 sketched.foreach { case (idx, n, sample) => if (fraction * n > sampleSizePerPartition) { // 若是fraction乘以当前分区中的数据量大于以前计算的每一个分区的抽象数据大小,那么表示当前分区抽取的数据太少了,该分区数据分布不均衡,须要从新抽取 imbalancedPartitions += idx } else { // 当前分区不属于数据分布不均衡的分区,计算占比权重,并添加到candidates集合中 // The weight is 1 over the sampling probability. val weight = (n.toDouble / sample.size).toFloat for (key <- sample) { candidates += ((key, weight)) } } } // 对于数据分布不均衡的RDD分区,从新进行数据抽样 if (imbalancedPartitions.nonEmpty) { // Re-sample imbalanced partitions with the desired sampling probability. // 获取数据分布不均衡的RDD分区,并构成RDD val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains) // 随机种子 val seed = byteswap32(-rdd.id - 1) // 利用rdd的sample抽样函数API进行数据抽样 val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect() val weight = (1.0 / fraction).toFloat candidates ++= reSampled.map(x => (x, weight)) } // 将最终的抽样数据计算出rangeBounds出来 RangePartitioner.determineBounds(candidates, partitions) } } } // 下一个RDD的分区数量是rangeBounds数组中元素数量+ 1个 def numPartitions: Int = rangeBounds.length + 1 // 二分查找器,内部使用java中的Arrays类提供的二分查找方法 private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K] // 根据RDD的key值返回对应的分区id。从0开始 def getPartition(key: Any): Int = { // 强制转换key类型为RDD中本来的数据类型 val k = key.asInstanceOf[K] var partition = 0 if (rangeBounds.length <= 128) { // If we have less than 128 partitions naive search // 若是分区数据小于等于128个,那么直接本地循环寻找当前k所属的分区下标 while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 } } else { // Determine which binary search method to use only once. // 若是分区数量大于128个,那么使用二分查找方法寻找对应k所属的下标; // 可是若是k在rangeBounds中没有出现,实质上返回的是一个负数(范围)或者是一个超过rangeBounds大小的数(最后一个分区,比全部数据都大) partition = binarySearch(rangeBounds, k) // binarySearch either returns the match location or -[insertion point]-1 if (partition < 0) { partition = -partition - 1 } if (partition > rangeBounds.length) { partition = rangeBounds.length } } // 根据数据排序是升序仍是降序进行数据的排列,默认为升序 if (ascending) { partition } else { rangeBounds.length - partition } }
影响分区的算子操做有:cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、repartition()、coalesce()、sort()、mapValues()(若是父RDD有分区方式)、flatMapValues()(若是父RDD有分区方式)。dom
对于执行两个RDD的算子操做,输出数据的分区方式取决于父RDD的分区方式。默认状况下,结果会采用哈希分区,分区的数量和操做的并行度同样。不过,若是其中一个父RDD设置过度区方式,结果就采用那种分区方式;若是两个父RDD都设置过度区方式,结果RDD采用第一个父RDD的分区方式。ide
repartition 和 partitionBy 都是对数据进行从新分区,默认都是使用 HashPartitioner。可是两者之间的区别有:函数
其实partitionBy的结果才是咱们所预期的。repartition 其实使用了一个随机生成的数来看成 key,而不是使用原来的key。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) : RDD[T] = withScope { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions).values } else { new CoalescedRDD(this, numPartitions) } }
两个算子都是对RDD的分区进行从新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,须要从新划分红M个分区)
统计用户访问其未订阅主题页面的状况。
val sc = new SparkContext() val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...").persist def processNewLogs(logFileName:String){ val events = sc.sequenceFile[UserID, LinkInfo](logFileName) //RDD of (UserID,(UserInfo,LinkInfo)) pairs val joined = usersData.join(events) val offTopicVisits = joined.filter { // Expand the tuple into its components case (userId, (userInfo, linkInfo)) => !userInfo.topics.contains(linkInfo.topic) }.count() println("Number of visits to non-subscribed opics: " + offTopicVisits) }
链接操做会将两个数据集中的全部键的哈希值都求出来,将哈希值相同的记录经过网络传到同一台机器上,而后再对全部键相同的记录进行链接操做。userData表数据量很大,因此这样进行哈希计算和跨节点数据混洗很是耗时。
val userData = sc.sequenceFile[UserID,LinkInfo]("hdfs://...") .partionBy(new HashPartiotioner(100)) .persist()
userData表进行了从新分区,将键相同的数据都放在一个分区中。而后调用persist持久化结果数据,不用每次都计算哈希和跨节点混洗。程序运行速度显著提高。
忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。