如下是我的理解,一切以官网文档为准。html
http://spark.apache.org/docs/latest/api/python/pyspark.htmljava
在开始以前,我先介绍一下,RDD是什么?python
RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD能够简单当作是一个数组。和普通数组的区别是,RDD中的数据是分区存储的,这样不一样分区的数据就能够分布在不一样的机器上,同时能够被并行处理。所以,Spark应用程序所作的无非是把须要处理的数据转换为RDD,而后对RDD进行一系列的变换和操做从而获得结果。apache
建立RDD:编程
>>> sc.parallelize([1,2,3,4,5], 3) #意思是将数组中的元素转换为RDD,而且存储在3个分区上[1]、[2,3]、[4,5]。若是是4个分区:[1]、[2]、[3]、[4,5]
上面这种是数组建立,也能够从文件系统或者HDFS中的文件建立出来,后面会讲到。api
只要搞懂了spark的函数们,你就成功了一大半。数组
spark的函数主要分两类,Transformations和Actions。Transformations为一些数据转换类函数,actions为一些行动类函数:数据结构
转换:转换的返回值是一个新的RDD集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个RDD做为参数,而后返回一个新的RDD。app
行动:行动操做计算并返回一个新的值。当在一个RDD对象上调用行动函数时,会在这一时刻计算所有的数据处理查询并返回结果值。分布式
下面介绍spark经常使用的Transformations, Actions函数:
Transformations
map(func [, preservesPartitioning=False]) --- 返回一个新的分布式数据集,这个数据集中的每一个元素都是通过func函数处理过的。
>>> data = [1,2,3,4,5] >>> distData = sc.parallelize(data).map(lambda x: x+1).collect() #结果:[2,3,4,5,6]
filter(func) --- 返回一个新的数据集,这个数据集中的元素是经过func函数筛选后返回为true的元素(简单的说就是,对数据集中的每一个元素进行筛选,若是符合条件则返回true,不符合返回false,最后将返回为true的元素组成新的数据集返回)。
>>> rdd = sc.parallelize(data).filter(lambda x:x%2==0).collect() #结果:[2, 4]
flatMap(func [, preservesPartitioning=False]) --- 相似于map(func), 可是不一样的是map对每一个元素处理完后返回与原数据集相同元素数量的数据集,而flatMap返回的元素数不必定和原数据集相同。each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item)
#### for flatMap() >>> rdd = sc.parallelize([2,3,4]) >>> sorted(rdd.flatMap(lambda x: range(1,x)).collect()) #结果:[1, 1, 1, 2, 2, 3] >>> sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect()) #结果:[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] #### for map() >>> rdd = sc.parallelize([2,3,4]) >>> sorted(rdd.flatMap(lambda x: range(1,x)).collect()) #结果:[[1], [1, 2], [1, 2, 3]] >>> sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect()) #结果:[[(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]
mapPartitions(func [, preservesPartitioning=False]) ---mapPartitions是map的一个变种。map的输入函数是应用于RDD中每一个元素,而mapPartitions的输入函数是应用于每一个分区,也就是把每一个分区中的内容做为总体来处理的。
>>> rdd = sc.parallelize([1,2,3,4,5], 3) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() #结果:[1,5,9]
mapPartitionsWithIndex(func [, preservesPartitioning=False]) ---Similar to mapPartitions, but takes two parameters. The first parameter is the index of the partition and the second is an iterator through all the items within this partition. The output is an iterator containing the list of items after applying whatever transformation the function encodes.
>>> rdd = sc.parallelize([1,2,3,4,5], 3) >>> def f(splitIndex, iterator): yield splitIndex >>> rdd.mapPartitionsWithIndex(f).collect() #结果:[0,1,2] #三个分区的索引
reduceByKey(func [, numPartitions=None, partitionFunc=<function portable_hash at 0x7fa664f3cb90>]) --- reduceByKey就是对元素为kv对的RDD中Key相同的元素的value进行reduce,所以,key相同的多个元素的值被reduce为一个值,而后与原RDD中的key组成一个新的kv对。
>>> from operator import add >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) >>> sorted(rdd.reduceByKey(add).collect()) >>> #或者 sorted(rdd.reduceByKey(lambda a,b:a+b).collect()) #结果:[('a', 2), ('b', 1)]
aggregateByKey(zeroValue)(seqOp, combOp [, numPartitions=None]) ---
sortByKey([ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7fa665048c80>]) --- 返回排序后的数据集。该函数就是队kv对的RDD数据进行排序,keyfunc是对key进行处理的函数,如非须要,不用管。
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('D', 4)] >>> sc.parallelize(tmp).sortByKey(True, 1).collect() #结果: [('1', 3), ('D', 4), ('a', 1), ('b', 2)] >>> sc.parallelize(tmp).sortByKey(True, 2, keyfunc=lambda k:k.lower()).collect() #结果:[('1', 3), ('a', 1), ('b', 2), ('D', 4)] #注意,比较两个结果可看出,keyfunc对键的处理只是在数据处理的过程当中起做用,不能真正的去改变键名
join(otherDataset [, numPartitions=None]) --- join就是对元素为kv对的RDD中key相同的value收集到一块儿组成(v1,v2),而后与原RDD中的key组合成一个新的kv对,返回。
>>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2), ("a", 3)]) >>> sorted(x.join(y).collect()) #结果:[('a', (1, 2)), ('a', (1, 3))]
cartesian(otherDataset) --- 返回一个笛卡尔积的数据集,这个数据集是经过计算两个RDDs获得的。
>>> x = sc.parallelize([1,2,3]) >>> y = sc.parallelize([4,5]) >>> x.cartesian(y).collect() #结果:[(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)]
Action (这里只讲支持python的,java和scala的后面用到了在作详解,固然支持python就必定支持java和scala)
reduce(func) --- reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
>>> from operator import add >>> sc.parallelize([1,2,3,4,5]).reduce(add) # 结果:15
collect() --- 返回RDD中的数据,以list形式。
>>> sc.parallelize([1,2,3,4,5]).collect() #结果:[1,2,3,4,5]
count() --- 返回RDD中的元素个数。
>>> sc.parallelize([1,2,3,4,5]).count #结果:5
first() --- 返回RDD中的第一个元素。
>>> sc.parallelize([1,2,3,4,5]).first() #结果:1
take(n) --- 返回RDD中前n个元素。
>>> sc.parallelize([1,2,3,4,5]).take(2) #结果:[1,2]
takeOrdered(n [, key=None]) --- 返回RDD中前n个元素,可是是升序(默认)排列后的前n个元素,或者是经过key函数指定后的RDD(这个key我也没理解透,后面在作详解)
>>> sc.parallelize([9,7,3,2,6,4]).takeOrdered(3) #结果:[2,3,4] >>> sc.parallelize([9,7,3,2,6,4]).takeOrdered(3, key=lambda x:-x) #结果:[9,7,6]
saveAsTextFile(path [, compressionCodecClass=None]) --- 该函数将RDD保存到文件系统里面,而且将其转换为文本行的文件中的每一个元素调用 tostring 方法。
parameters: path - 保存于文件系统的路径
compressionCodecClass - (None by default) string i.e. “org.apache.hadoop.io.compress.GzipCodec”
>>> tempFile = NamedTemporaryFile(delete=True) >>> tempFile.close() >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) >>> from fileinput import input >>> from glob import glob >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) '0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n'
Empty lines are tolerated when saving to text files:
>>> tempFile2 = NamedTemporaryFile(delete=True) >>> tempFile2.close() >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) '\n\n\nbar\nfoo\n'
Using compressionCodecClass:
>>> tempFile3 = NamedTemporaryFile(delete=True) >>> tempFile3.close() >>> codec = "org.apache.hadoop.io.compress.GzipCodec" >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec) >>> from fileinput import input, hook_compressed >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed)) >>> b''.join(result).decode('utf-8') u'bar\nfoo\n'
countByKey() --- 返回一个字典(key,count),该函数操做数据集为kv形式的数据,用于统计RDD中拥有相同key的元素个数。
>>> defdict = sc.parallelize([("a",1), ("b",1), ("a", 1)]).countByKey() >>> defdict #结果:defaultdict(<type 'int'>, {'a': 2, 'b': 1}) >>> defdict.items() #结果:[('a', 2), ('b', 1)]
countByValue() --- 返回一个字典(value,count),该函数操做一个list数据集,用于统计RDD中拥有相同value的元素个数。
>>> sc.parallelize([1,2,3,1,2,5,3,2,3,2]).countByValue().items() #结果:[(1, 2), (2, 4), (3, 3), (5, 1)]
foreach(func) --- 运行函数func来处理RDD中的每一个元素,这个函数常被用来updating an Accumulator或者与外部存储系统的交互。
>>> def f(x): print(x) >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) #note: 打印是随机的,并非必定按1,2,3,4,5的顺序打印