spark aggregate

package com.xp.cn

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by xupan on 2017/12/15.
 * Spark操做—aggregate:action
 * 将每一个分区里面的元素进行聚合,而后用combine函数将每一个分区的结果和初始值(zeroValue)进行combine操做。
 * 这个函数最终返回的类型不须要和RDD中元素类型一致。
 */
object AggerateDemo {

  def main(args: Array[String]) {


    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    //SparkContext:spark执行入口
    val sc: SparkContext = new SparkContext(
      new SparkConf()
        .setAppName("WordCount")
        .setMaster("local[4]")
    )

    /**
     * 先在每一个分区中迭代执行 (x : Int,y : Int) => x + y 而且使用zeroValue的值1
     * 即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16
     * part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41
     * 再将两个分区的结果合并(a : Int,b : Int) => a + b ,而且使用zeroValue的值1
     * 即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58
     **/
    val kk = sc.parallelize(Array(5, 4, 3, 2, 1, 10, 9, 8, 7, 6), 2)
    val oo = kk.aggregate(1)({ (x: Int, y: Int) => x + y }, { (a: Int, b: Int) => a + b })
    println(oo + "oo") //58


    /**
     * ##此次zeroValue=2
      ##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
      ##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
      ##最后:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428
     */
    val res = kk.aggregate(2)({ (x: Int, y: Int) => x + y }, { (a: Int, b: Int) => a * b })
    println("res :" + res)

    /**
    1.  0+1,  0+1
    2.  1+2,  1+1
    3.  3+3,  2+1
    4.  6+4,  3+1
    5.  10+5,  4+1
      ......
    实际Spark执行中是分布式计算,可能会把List分红多个分区,
    假如3个,p1(1,2,3,4),p2(5,6,7,8),p3(9),
    通过计算各分区的的结果(10,4),(26,4),(9,1),
    这样,执行(par1,par2) =>(par1._1 + par2._1, par1._2 + par2._2)
    就是(10+26+9,4+4+1)即(45,9),再求平均值就简单了。
      */
    val ll = kk.aggregate((0, 0))(
      (acc, number) => (acc._1 + number, acc._2 + 1),
      (par1, par2) => (par1._1 + par2._1, par1._2 + par2._2)
    )
    println("ll : " + (ll._1.toDouble / ll._2))


    //聚合求,指定2个分区,每一个分区对应一个task
    val rdd1 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9), 2)
    //aggregate:聚合,有两个括号的方法,柯立化方法,初始值为0,传递了两个函数
    //第一个:每一个分区上作聚合 + 1
    //第二个:将每一个分区的结果再次聚合 + 1
    val aggRDD = rdd1.aggregate(1)(_ + _, _ + _)
    println(" aggRDD :  " + aggRDD)


    //求每一个分区的最大值,再将结果聚合
    val result = rdd1.aggregate(1)(math.max(_, _), _ + _)
    println(" result : " + result) //14

    val rdd2 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9), 2)
    val result2 = rdd2.aggregate(0)(math.max(_, _), _ + _)
    println(" result2 : " + result2) //18


    //第一个参数2也要参与计算,只是最后参加一次
    val result3 = rdd2.aggregate(2)(math.max(_, _), _ + _)
    println("result3 : " + result3) //result3 : 20


    println("===============================")


    val rddS = sc.parallelize(Array("a", "b", "c", "d", "e"), 2)
    val resultS = rddS.aggregate("")(_ + _, _ + _)
    //结果在变化 resultS : cdeab  resultS : abcde
    //变化的缘由是由于并不肯定哪一个分区先执行完成
    println(" resultS : " + resultS)



    println("================================")

    //计算每一个分区最大的字符串长度,并转换为字符串相加
    //rdds2Result : 65   rdds2Result : 56
    //结果在变化
    val rdds2 = sc.parallelize(Array("sda", "bsdfs", "c", "d", "esdfsd"), 2);
    val rdds2Result = rdds2.aggregate("")((x, y) => math.max(x.length, y.length).toString, (x, y) => x + y)
    println(" rdds2Result : " + rdds2Result)


    println("================================")
    val rddw = sc.parallelize(List("4444", "11113"), 2)
    val rddws = rddw.aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x + y)
    println(" rddws :" + rddws)

    //关闭资源
    sc.stop()
  }

}
相关文章
相关标签/搜索