键值对RDD(pair RDD)是spark中许多操做所须要的常见数据类型,一般用来进行聚合计算。java
spark有多种方式能够建立pair RDD。好比:不少存储键值对的数据格式在读取时直接返回pair RDD;经过map()算子将普通的RDD转为pair RDD。python
# 使用第一个单词做为键建立一个pair RDD val pairs = lines.map(x => (x.split(" ")(0), x))
# 使用第一个单词做为键建立一个pair RDD # jdk1.8后也支持lambda表达式方式 PairFunction<String, String, String> keyData = new PairFunction<String, String, String>() { public Tuple2<String, String> call(String x) { return new Tuple2(x.split(" ")[0], x); } }; JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
# 使用第一个单词做为键建立一个pair RDD pairs = lines.map(lambda x: (x.split(" ")[0], x))
从一个内存中的数据集建立pair RDD时,scala和python只须要对这个二元组集合调用SparkContext的parallelize()方法便可;而java须要使用SparkContext.parallelizePairs()方法。编程
函数名 | 做用 | 示例 |
---|---|---|
reduceByKey(func) | 合并具备相同键的值 | rdd.reduceByKey((x, y) => x + y) |
groupByKey() | 对具备相同键的值进行分组 | rdd.groupByKey() |
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) | 使用不一样的返回类型合并具备相同键的值 | rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) |
mapValues(func) | 对pair RDD中的每一个值应用一个函数而不改变键 | rdd.mapValues(x => x + 1) |
flatMapValues(func) | 对pair RDD中的每一个值应用一个返回迭代器的函数,生成对应原键的键值对记录 | rdd.flatMapValues(x => (x to 5)) |
keys() | 返回一个仅包含键的RDD | rdd.keys |
values() | 返回一个仅包含值得RDD | rdd.values |
sortByKey() | 返回一个根据键排序的RDD | rdd.sortByKey() |
函数名 | 做用 | 示例 |
---|---|---|
subtractByKey | 删除RDD中键与other RDD中键相同的元素 | rdd.subtractByKey(other) |
join | 对两个RDD进行内链接 | rdd.join(other) |
leftOuterJoin | 对两个RDD进行链接操做,确保第二个RDD的键必须存在(左外链接) | rdd.leftOuterJoin(other) |
rightOuterJoin | 对两个RDD进行链接操做,确保第一个RDD的键必须存在(右外链接) | rdd.rightOuterJoin(other) |
cogroup | 将两个RDD中拥有相同键的数据分组在一块儿 | rdd.cogroup(other) |
rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
val input = sc.textFile("s3://...") val words = input.flatMap(x => x.split(" ")) val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
JavaRDD<String> input = sc.textFile("s3://..."); JavaRDD<String> words = input.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); } }); JavaPairRDD<String, Integer> result = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); } }).reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer a, Integer b) { return a + b; } } )
rdd = sc.textFile("s3://...") words = rdd.flatMap(lambda x: x.split(" ")) result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
执行原理 1.combineByKey()做用于rdd的每一个分区。 2.若是访问的元素在分区中第一次出现,就使用createCombiner()方法建立那个键对应累加器的初始值。 3.若是访问的元素在当前分区已经出现过,就使用mergeValue()方法将该键的累加器对应的当前值和新值合并。 4.若是有两个或多个分区都有对应同一个键的累加器时,就使用mergeCombiners()方法将各个分区的结果进行合并。
val result = rdd.combineByKey(v => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)).map{case (key, value) => (key, value._1 / value._2.toFloat)}
public static class AvgCount implements Serializable { public int total_; public int num_; public AvgCount(int total, int num) { total_ = total; num_ = num; } public float avg() { return total_/(float)num_; } } Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() { public AvgCount call(Integer x) { return new AvgCount(x, 1); } }; Function2<AvgCount, Integer, AvgCount> addAndCount = new Function2<AvgCount, Integer, AvgCount>() { public AvgCount call(AvgCount a, Integer x) { a.total_ += x; a.num_ += 1; return a; } }; Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() { public AvgCount call(AvgCount a, AvgCount b) { a.total_ += b.total_; a.num_ += b.num_; return a; } }; AvgCount initial = new AvgCount(0, 0); JavaPairRDD<String, AvgCount> avgCounts = input.combineByKey(createAcc, addAndCount, combine); Map<String, AvgCount> countMap = avgCounts.collectAsMap(); for (Entry<String, AvgCount> entry : countMap.entrySet()) { System.out.println(entry.getKey() + ":" + entry.getValue().avg()); }
sumCount = input.combineByKey((lambda x: (x, 1)), (lambda x, y: (x[0] + y, x[1] + 1)), (lambda x, y: (x[0] + y[0], x[1] + y[1]))) sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()
对于单个RDD数据进行分组时,使用groupByKey()。若是先使用groupByKey(),再使用reduce()或fold()时,可能使用一种根据键进行聚合的函数更高效。好比,rdd.reduceByKey(func)与rdd.groupByKey().mapValues(value => value.reduce(func))等价,但前者更高效,由于避免了为每一个键存放值列表的步骤。ide
对多个共享同一个键的RDD进行分组时,使用cogroup()。cogroup方法会获得结果RDD类型为[(K, (Iterable[V], Iterable[W]))]。函数
将一组有键的数据与另外一组有键的数据链接使用是对键值对数据执行的经常使用操做。链接方式主要有:内链接、左外链接、右外链接。大数据
val storeAddress = sc.parallelize(Seq((Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"), (Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle"))) val storeRating = sc.parallelize(Seq(Store("Ritual"), 4.9), (Store("Philz"), 4.8))) # 内链接 storeAddress.join(storeRating) #左外链接 storeAddress.leftOuterJoin(storeRating) #右外链接 storeAddress.rightOuterJoin(storeRating)
将数据排序输出是很常见的场景。sortByKey()函数接收一个叫作ascending的参数,表示是否让结果升序排序(默认true)。有时,也能够提供自定义比较函数。好比,以字符串顺序对整数进行自定义排序。spa
implicit val sortIntegersByString = new Ordering[Int] { override def compare(a: Int, b: Int) = a.toString.compare(b.toString) } rdd.sortByKey()
class IntegerComparator implements Comparator<Integer> { public int compare(Integer a, Integer b) { return String.valueOf(a).compareTo(String.valueOf(b)) } } rdd.sortByKey(new IntegerComparator());
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x))
和转化操做同样,全部基础RDD支持的行动操做也都在pair RDD上可用。另外,Pair RDD提供了一些额外的行动操做。scala
函数 | 做用 | 示例 |
---|---|---|
countByKey | 对每一个键对应的元素分别计数 | rdd.countByKey() |
collectAsMap | 将结果以映射表的形式返回 | rdd.collectAsMap() |
lookup(key) | 返回指定键对应的全部值 | rdd.lookup(3) |
忠于技术,热爱分享。欢迎关注公众号:java大数据编程,了解更多技术内容。code