最近在作项目的时候遇到了Spark RDD里面的一个aggregate函数,以为它的用法挺有意思的,在此记录一下。分布式
Spark 文档中对 aggregate的函数定义以下:函数
def aggregate[U](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U)(implicit arg0: ClassTag[U]): U
注释:this
Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U's, as in Scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
aggregate函数首先对每一个分区里面的元素进行聚合,而后用combine函数将每一个分区的结果和初始值(zeroValue)进行combine操做。这个操做返回的类型不须要和RDD中元素类型一致,因此在使用 aggregate()时,须要提供咱们期待的返回类型的初始值,而后经过一个函数把RDD中的元素累加起来??放入累加器?。考虑到每一个节点是在本地进行累加的,最终还须要提供第二个函数来将累加器两两合并。code
其中seqOp操做会聚合各分区中的元素,而后combOp操做会把全部分区的聚合结果再次聚合,两个操做的初始值都是zeroValue. seqOp的操做是遍历分区中的全部元素(T),第一个T跟zeroValue作操做,结果再做为与第二个T作操做的zeroValue,直到遍历完整个分区。combOp操做是把各分区聚合的结果,再聚合。aggregate函数返回一个跟RDD不一样类型的值。所以,须要一个操做seqOp来把分区中的元素T合并成一个U,另一个操做combOp把全部U聚合。ci
下面举一个利用aggreated求平均数的例子:element
val rdd = List(1,2,3,4) val input = sc.parallelize(rdd) val result = input.aggregate((0,0))( (acc,value) => (acc._1 + value, acc._2 + 1), (acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) result: (Int, Int) = (10, 4) val avg = result._1 / result._2 avg: Int = 2.5
程序的详细过程大概以下:文档
首先定义一个初始值 (0, 0),即咱们期待的返回类型的初始值。input
(acc,value) => (acc._1 + value, acc._2 + 1), value是函数定义里面的T,这里是List里面的元素。因此acc._1 + value, acc._2 + 1的过程以下:it
0+1, 0+1io
1+2, 1+1
3+3, 2+1
6+4, 3+1
结果为 (10,4)。在实际Spark执行中是分布式计算,可能会把List分红多个分区,假如3个,p1(1,2), p2(3), p3(4),通过计算各分区的的结果 (3,2), (3,1), (4,1),这样,执行 (acc1,acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2) 就是 (3+3+4,2+1+1) 即 (10,4),而后再计算平均值。