本篇博客中的操做都在 ./bin/pyspark
中执行。python
下面会对 Pair RDD 的一些转化操做进行解释。先假设咱们有下面这些RDD(在pyspark中操做):spa
nums = sc.parallelize( [ (1,2) ,(3,4) ,(3,6) ] ) x = sc.parallelize( [ (1,[2,4,5]) ,(4,[7,8,0]) ,(4,[6,7,5])] )
概述:合并具备相同键值的值。code
例子:blog
>>> nums.reduceByKey(lambda x, y : x + y).collect() [(1, 2), (3, 10)] >>> >>> x.reduceByKey(lambda x, y: x + y).collect() [(1, [2, 4, 5]), (4, [7, 8, 0, 6, 7, 5])]
这个方法操做的是值(Values),对上面的两个RDD的操做,第一个是对值作加法,第二个是对列表合并;这两个操做均可以使用lambda x, y : x + y
来完成。排序
再来一个例子,求平均值,(下面的这个RDD的键值中,第一个值是总和,第二个值是数量):博客
>>> test = sc.parallelize([('panda', (1,2)), ('pink',(7,2)), ('pirate',(3,1))]) >>> test.mapValues(lambda (x,y): x / (y* 1.0)).collect() [('panda', 0.5), ('pink', 3.5), ('pirate', 3.0)]
groupByKey
方法的目的是对具备相同键值的数据进行分组,好比说:it
>>> l = nums.groupByKey().collect()[1][1] >>> l <pyspark.resultiterable.ResultIterable object at 0x109320f10> >>> for i in l: ... print i ... 4 6
直观地来讲,对nums
这个RDD的groupByKey
操做能够表示为:spark
[(1,2),(3,4),(3,6)] -> [ (1,[2]), (3, [4,6] )]
而后是对于x
这个RDD的:class
>>> x = sc.parallelize( [ (1,[2,4,5]) ,(4,[7,8,0]) ,(4,[6,7,5])] ) >>> l = x.groupByKey().collect() >>> l [(1, <pyspark.resultiterable.ResultIterable object at 0x109310690>), (4, <pyspark.resultiterable.ResultIterable object at 0x109310050>)] >>> l2 = l[1][1] >>> l2 <pyspark.resultiterable.ResultIterable object at 0x109310050> >>> for i in l2: ... print i ... [7, 8, 0] [6, 7, 5]
直观的来讲:test
[ (1,[2,4,5]), (4,[7,8,0]) ,(4,[6,7,5] ) ] + | +-------------+ | | RDD.join | | +-------------+ v [ (1,[2,4,5]), (4, [ [6,7,5], [7,8,0] ] ) ]
这个比较好理解,对每一个键值进行操做:
>>> nums.mapValues(lambda x : x+ 3).collect() [(1, 5), (3, 7), (3, 9)]
这个方法的做用是对pair RDD 的每一个值(values)生成一个与原键(key)对应的键值对记录。
x = sc.parallelize( [ (1,[2,4,5]) ,(4,[7,8,0]) ,(4,[6,7,5])] ) >>> def f(x): ... return x ... >>> x.flatMapValues(f).collect() [(1, 2), (1, 4), (1, 5), (4, 7), (4, 8), (4, 0), (4, 6), (4, 7), (4, 5)]
这个能够用"flat"这个英文单词的意思来大体理解一下,flat有使变平,拍扁的意思。
对于 nums
这种RDD是进行不了这个方法的。
返回全部的键值:
>>> nums.keys().collect() [1, 3, 3] >>> x.keys().collect() [1, 4, 4]
返回全部的值:
>>> nums.values().collect() [2, 4, 6] >>> x.values().collect() [[2, 4, 5], [7, 8, 0], [6, 7, 5]]
按照键值排序,这个比较好理解:
>>> notSorted = sc.parallelize( [ (7,[2,4,5]) ,(9,[7,8,0]) ,(4,[6,7,5])] ) >>> notSorted.sortByKey().collect() [(4, [6, 7, 5]), (7, [2, 4, 5]), (9, [7, 8, 0])]
这里咱们有:
other = sc.parallelize([(3,9)]) nums = sc.parallelize([(1,2),(3,4),(3,6)])
返回一个减去两个RDD中同样的Key的RDD,能够理解为除去下图中重合的部分:
一个例子:
>>> nums.subtractByKey(other).collect() [(1, 2)]
这个操的做描为:返回一个RDD,返回的RDD只包含输入的两个RDD都包含的键值,每一个键值对形如(k, (v1, v2))
,其中v1
被本身包含,v2
被另外一个RDD包含。一个例子:
>>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2), ("a", 3)]) >>> x.join(y).collect() [('a', (1, 2)), ('a', (1, 3))]
能够理解为:
同理换作咱们开始提到的两个RDD:
>>> nums.join(other).collect() [(3, (4, 9)), (3, (6, 9))]
这两个方法的Join的原理和上面的join同样,关于left 和 right 的说明是:
一个例子:
>>> nums.rightOuterJoin(other).collect() [(3, (4, 9)), (3, (6, 9))] >>> nums.leftOuterJoin(other).collect() [(1, (2, None)), (3, (4, 9)), (3, (6, 9))]