combineByKey的强大之处,在于提供了三个函数操做来操做一个函数。第一个函数,是对元数据处理,从而得到一个键值对。第二个函数,是对键值键值对进行一对一的操做,即一个键值对对应一个输出,且这里是根据key进行整合。第三个函数是对key相同的键值对进行操做,有点像reduceByKey,但真正实现又有着很大的不一样。java
在Spark入门(五)--Spark的reduce和reduceByKey中,咱们用reduce进行求平均值。用combineByKey咱们则能够求比平均值更为丰富的事情。如今有一个数据集,每一行数据包括一个a-z字母和一个整数,其中字母和整数之间以空格分隔。如今要求得每一个字母的平均数。这个场景有点像多个学生,每一个学生多门成绩,求得学生的平均分。但这里将问题简化,其中数据集放在grades中。数据集以及下面的代码均可以在github上下载。python
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
object SparkCombineByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")
val sc = new SparkContext(conf)
sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(
value => (value,1),
(x:(Int,Int),y)=>(x._1+y,x._2+1),
(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)
}
}
复制代码
(d,338451)
(e,335306)
(a,336184)
(i,346279)
(b,333069)
(h,334343)
(f,341380)
(j,320145)
(g,334042)
(c,325022)
复制代码
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.sources.In;
import scala.Tuple2;
public class SparkCombineByKeyJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKeyJava");
JavaSparkContext sc = new JavaSparkContext(conf);
combineByKeyJava(sc);
combineByKeyJava8(sc);
}
public static void combineByKeyJava(JavaSparkContext sc){
JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
String[] splits = s.split(" ");
return new Tuple2<>(splits[0],Integer.parseInt(splits[1]));
}
});
splitData.combineByKey(new Function<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<>(integer, 1);
}
}, new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Integer integer) throws Exception {
return new Tuple2<>(integerIntegerTuple2._1 + integer, integerIntegerTuple2._2 + 1);
}
}, new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> integerIntegerTuple2, Tuple2<Integer, Integer> integerIntegerTuple22) throws Exception {
return new Tuple2<>(integerIntegerTuple2._1+integerIntegerTuple22._1,integerIntegerTuple2._2+integerIntegerTuple22._2);
}
}).map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Tuple2<String,Double>>() {
@Override
public Tuple2<String,Double> call(Tuple2<String, Tuple2<Integer, Integer>> stringTuple2Tuple2) throws Exception {
return new Tuple2<>(stringTuple2Tuple2._1,stringTuple2Tuple2._2._1*1.0/stringTuple2Tuple2._2._2);
}
}).foreach(new VoidFunction<Tuple2<String, Double>>() {
@Override
public void call(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
System.out.println(stringDoubleTuple2._1+" "+stringDoubleTuple2._2);
}
});
}
public static void combineByKeyJava8(JavaSparkContext sc){
JavaPairRDD<String,Integer> splitData = sc.textFile("./grades").mapToPair(line -> {
String[] splits = line.split(" ");
return new Tuple2<>(splits[0],Integer.parseInt(splits[1]));
});
splitData.combineByKey(
x->new Tuple2<>(x,1),
(x,y)->new Tuple2<>(x._1+y,x._2+1),
(x,y)->new Tuple2<>(x._1+y._1,x._2+y._2)
).map(x->new Tuple2(x._1,x._2._1*1.0/x._2._2)).foreach(x->System.out.println(x._1+" "+x._2));
}
}
复制代码
d 338451.6
e 335306.7480769231
a 336184.95321637427
i 346279.497029703
b 333069.8589473684
h 334343.75
f 341380.94444444444
j 320145.7618069815
g 334042.37605042016
c 325022.4183673469
复制代码
在开始python以前,咱们先观察java和scala两个程序。咱们发现java7的代码很是冗余,而java8和scala则相比起来很是干净利落。固然,咱们难说好坏,可是这也表现出当代语言开始从繁就简的一个转变。到了python这一特色就体现的更加淋漓尽致。git
但咱们不光说语言,咱们分析这个求平均的实现方式,因为java中对数值作了一个处理,所以有保留小数,而scala则没有,但至少能够判断二者的结果是一致的。固然,这不是重点,重点是,这个combinByKey很是复杂,有三个函数。咱们很难观察到每一个过程作了什么。所以咱们在这里,对scala程序进行进一步的输出,从而观察combineByKey到底作了什么。github
import org.apache.spark.{SparkConf, SparkContext}
object SparkCombineByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")
val sc = new SparkContext(conf)
sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(
value => {
println("这是第一个函数")
println("将全部的值遍历,并放在元组中,标记1")
println(value)
(value,1)
},
(x:(Int,Int),y)=>{
println("这是第二个函数")
println("将x中的第一个值进行累加求和,第二个值加一,求得元素总个数")
println("x:"+x.toString())
println("y:"+y)
(x._1+y,x._2+1)
},
(x:(Int,Int),y:(Int,Int))=>{
(x._1+y._1,x._2+y._2)
}
).map(x=>(x._1,x._2._1/x._2._2)).foreach(println)
}
}
复制代码
这是第一个函数
将全部的值遍历,并放在元组中,标记1
222783
这是第一个函数
将全部的值遍历,并放在元组中,标记1
48364
这是第一个函数
将全部的值遍历,并放在元组中,标记1
204950
这是第一个函数
将全部的值遍历,并放在元组中,标记1
261777
...
...
...
这是第二个函数
将x中的第一个值进行累加求和,第二个值加一,求得元素总个数
x:(554875,2)
y:357748
这是第二个函数
将x中的第一个值进行累加求和,第二个值加一,求得元素总个数
x:(912623,3)
y:202407
这是第一个函数
将全部的值遍历,并放在元组中,标记1
48608
这是第二个函数
将x中的第一个值进行累加求和,第二个值加一,求得元素总个数
x:(1115030,4)
y:69003
这是第一个函数
将全部的值遍历,并放在元组中,标记1
476893
...
...
...
(d,338451)
(e,335306)
(a,336184)
(i,346279)
(b,333069)
(h,334343)
(f,341380)
(j,320145)
(g,334042)
(c,325022)
复制代码
这里咱们发现了,函数的顺序并不先所有执行完第一个函数,再执行第二个函数。而是分区并行,即第一个分区执行完第一个函数,并不等待其余分区执行完第一个函数,而是紧接着执行第二个函数,最后在第三个函数进行处理。在本地单机下,该并行特色并不能充分发挥,但在集群环境中,各个分区在不一样节点计算,而后处理完结果汇总处理。这样,当数据量十分庞大时,集群节点数越多,该优点就表现地越明显。sql
此外还有一个很是值得关注的特色,当咱们把foreach(println)这句话去掉时apache
foreach(println)
复制代码
咱们运行程序,发现程序没有任何输出。这是因为spark的懒加载特色,spark只用在对数据执行具体操做时,如输出、保存等才会执行计算。这看起来有点不合理,但实际上这样作在不少场景下能大幅度提高效率,但若是没有处理好,可能会致使spark每次执行操做都会从头开始计算该过程。所以当一个操做结果须要被频繁或者屡次调用的时候,咱们应该将结果存下来。api
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")
sc = SparkContext(conf=conf)
sc.textFile("./grades")\
.map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\
.combineByKey(
lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[0],x[1][0]/x[1][1])).foreach(print)
复制代码
('b', 333069.8589473684)
('f', 341380.94444444444)
('j', 320145.7618069815)
('h', 334343.75)
('a', 336184.95321637427)
('g', 334042.37605042016)
('d', 338451.6)
('e', 335306.7480769231)
('c', 325022.4183673469)
复制代码
sortByKey很是简单,也很是经常使用。这里依然采用上述文本,将处理后的结果,进行排序,获得平均值最大的字母。在实际运用中咱们这里能够当作求得按照成绩排序,或者按照姓名排序。bash
import org.apache.spark.{SparkConf, SparkContext}
object SparkSortByKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("SparkCombineByKey")
val sc = new SparkContext(conf)
val result = sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
).map(x=>(x._1,x._2._1/x._2._2))
//按照名字排序,顺序
result.sortByKey(true).foreach(println)
//按照名字排序,倒序
result.sortByKey(false).foreach(println)
val result1 = sc.textFile("./grades").map(line=>{
val splits = line.split(" ")
(splits(0),splits(1).toInt)
}).combineByKey(value =>(value,1),(x:(Int,Int),y)=>(x._1+y,x._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2)
).map(x=>(x._2._1/x._2._2,x._1))
//按照成绩排序,顺序
result1.sortByKey(true).foreach(println)
//按照成绩排序,倒序
result1.sortByKey(false).foreach(println)
}
}
复制代码
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")
sc = SparkContext(conf=conf)
result = sc.textFile("./grades")\
.map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\
.combineByKey(
lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[0],x[1][0]/x[1][1]))
result.sortByKey(True).foreach(print)
result.sortByKey(False).foreach(print)
result1 = sc.textFile("./grades")\
.map(lambda line : (line.split(" ")[0],int(line.split(" ")[1])))\
.combineByKey(
lambda num:(num,1),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])
).map(lambda x:(x[1][0]/x[1][1],x[0]))
result1.sortByKey(True).foreach(print)
result1.sortByKey(False).foreach(print)
复制代码
(a,336184)
(b,333069)
(c,325022)
(d,338451)
(e,335306)
(f,341380)
(g,334042)
(h,334343)
(i,346279)
(j,320145)
(j,320145)
(i,346279)
(h,334343)
(g,334042)
(f,341380)
(e,335306)
(d,338451)
(c,325022)
(b,333069)
(a,336184)
(320145,j)
(325022,c)
(333069,b)
(334042,g)
(334343,h)
(335306,e)
(336184,a)
(338451,d)
(341380,f)
(346279,i)
(346279,i)
(341380,f)
(338451,d)
(336184,a)
(335306,e)
(334343,h)
(334042,g)
(333069,b)
(325022,c)
(320145,j)
复制代码
数据集以及代码均可以在github上下载。ide