Spark (Python版) 零基础学习笔记(二)—— Spark Transformations总结及举例

1. map(func) 
func函数做用到数据集的每一个元素,生成一个新的分布式的数据集并返回python

1 >>> a = sc.parallelize(('a', 'b', 'c'))
2 >>> a.map(lambda x: x+'1').collect()
3 ['a1', 'b1', 'c1']

 

2. filter(func) 
选出全部func返回值为true的元素,做为一个新的数据集返回正则表达式

1 >>> a = sc.parallelize(range(10))
2 >>> a.filter(lambda x: x%2==0).collect()  # 选出0-9的偶数
3 [0, 2, 4, 6, 8]

 

3. flatMap(func) 
与map类似,可是每一个输入的item可以被map到0个或者更多的items输出,也就是说func的返回值应当是一个Sequence,而不是一个单独的itemshell

1 >>> l = ['I am Tom', 'She is Jenny', 'He is Ben']
2 >>> a = sc.parallelize(l,3)
3 >>> a.flatMap(lambda line: line.split()).collect()  # 将每一个字符串中的单词划分出来
4 ['I', 'am', 'Tom', 'She', 'is', 'Jenny', 'He', 'is', 'Ben']

 

4. mapPartitions(func) 
与map类似,可是mapPartitions的输入函数单独做用于RDD的每一个分区(block)上,所以func的输入和返回值都必须是迭代器iterator。 
例如:假设RDD有十个元素0~9,分红三个区,使用mapPartitions返回每一个元素的平方。若是使用map方法,map中的输入函数会被调用10次,而使用mapPartitions方法,输入函数只会被调用3次,每一个分区被调用1次。apache

1 >>> def squareFunc(a):
2 . . .     for i in a:
3 . . .         yield i*i
4 . . .
5 >>> a = sc.parallelize(range(10), 3)
6 PythonRDD[1] at RDD at PythonRDD.scala:48
7 >>> a.mapPartitions(squareFunc).collect()
8 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

 

5. mapPartitionsWithIndex(func) 
与mapPartitions类似,可是输入函数func提供了一个正式的参数,能够用来表示分区的编号。bash

 1 >>> def func(index, iterator):  # 返回每一个分区的编号和数值
 2 . . .     yield (‘index ‘ + str(index) + ’ is: ‘ + str(list(iterator)))
 3 . . .
 4 >>> a = sc.parallelize(range(10),3)
 5 >>> a.mapPartitionsWithIndex(func).collect()
 6 ['index 0 is: [0, 1, 2]', 'index 1 is: [3, 4, 5]', 'index 2 is: [6, 7, 8, 9]']
 7 >>> def squareIndex(index, iterator):  # 返回每一个数值所属分区的编号和数值的平方
 8 ...     for i in iterator:
 9 ...         yield ("The index is: " + str(index) + ", and the square is: " + str(i*i))
10 ... 
11 >>> a.mapPartitionsWithIndex(squareIndex).collect()
12 ['The index is: 0, and the square is: 0', 
'The index is: 0, and the square is: 1',
'The index is: 1, and the square is: 4',
'The index is: 1, and the square is: 9',
'The index is: 1, and the square is: 16',
'The index is: 2, and the square is: 25',
'The index is: 2, and the square is: 36',
'The index is: 3, and the square is: 49',
'The index is: 3, and the square is: 64',
'The index is: 3, and the square is: 81']

 

6. sample(withReplacementfractionseed) 
从数据中抽样,withReplacement表示是否有放回,withReplacement=true表示有放回抽样,fraction为抽样的几率(0<=fraction<=1),seed为随机种子。 
例如:从1-100之间抽取样本,被抽取为样本的几率为0.2网络

1 >>> data = sc.parallelize(range(1,101),2)
2 >>> sample = data.sample(True, 0.2)
3 >>> sampleData.count()
4 19
5 >>> sampleData.collect()
6 [16, 19, 24, 29, 32, 33, 44, 45, 55, 56, 56, 57, 65, 65, 73, 83, 84, 92, 96]

!!!注意,Spark中的sample抽样,当withReplacement=True时,至关于采用的是泊松抽样;当withReplacement=False时,至关于采用伯努利抽样,fraction并非表示抽样获得的样本占原来数据总量的百分比,而是一个元素被抽取为样本的几率。fraction=0.2并非说明要抽出100个数字中20%的数据做为样本,而是每一个数字被抽取为样本的几率为0.2,这些数字被认为来自同一整体,样本的大小并非固定的,而是服从二项分布。并发

 

7. union(otherDataset) 
并集操做,将源数据集与union中的输入数据集取并集,默认保留重复元素(若是不保留重复元素,能够利用distinct操做去除,下边介绍distinct时会介绍)。分布式

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.union(data2).collect()
4 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]

 

8. intersection(otherDataset) 
交集操做,将源数据集与union中的输入数据集取交集,并返回新的数据集。ide

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.intersection(data2).collect()
4 [8, 9, 6, 7]

 

9. distinct([numTasks]) 
去除数据集中的重复元素。函数

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.union(data2).distinct().collect()
4 [0, 8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]
 

下边的一系列transactions会用的键(Key)这一律念,在进行下列有关Key操做时使用的数据集为记录伦敦各个片区(英文称为ward)中学校和学生人数相关信息的表格,下载地址: 
https://data.london.gov.uk/dataset/london-schools-atlas/resource/64f771ee-38b1-4eff-8cd2-e9ba31b90685# 
下载后将其中命名为WardtoSecSchool_LDS_2015的sheet里边的数据保存为csv格式,删除第一行的表头,并从新命名为school.csv 
数据格式为: 
(Ward_CODE, Ward_NAME, TotalWardPupils, Ward2Sec_Flow_No., Secondary_School_URN, Secondary_School_Name, Pupil_count) 
首先对数据进行一些预处理:

1 >>> school = sc.textFile("file:///home/yang/下载/school.csv")  
2 Data = sc.textFile("file:///home/yang/下载/school.csv") 
3 >>> school.count()  # 共有16796行数据
4 16796
5 >>> import re  # 引入python的正则表达式包
6 >>> rows = school.map(lambda line: re.subn(',[\s]+',': ', line))

注意:1. 从本地读取数据时,代码中要经过 “file://” 前缀指定读取本地文件。Spark shell 默认是读取 HDFS 中的文件,须要先上传文件到 HDFS 中,不然会有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/school.csv”的错误。 
2. 对数据集进行了一下预处理,利用正则匹配替换字符串,因为一些学校的名字的字符串中自己含有逗号,好比“The City Academy, Hackney”, 此时若是利用csv的分隔符’,’进行分割,并不能将名字分割为“The City Academy”和“Hackney”。咱们注意到csv的分隔符逗号后边是没有空格的,而名字里边的逗号后边都会有空格(英语书写习惯),所以,先利用re.subn语句对逗号后边含有至少一个空格(正则表达式为’,[\s]+’)的子字符串进行替换,替换为’: ’,而后再进行后续操做。以上即为对这一数据集的预处理过程。

 

10. groupByKey([numTasks]) 
做用于由键值对(K, V)组成的数据集上,将Key相同的数据放在一块儿,返回一个由键值对(K, Iterable)组成的数据集。 
注意:1. 若是这一操做是为了后续在每一个键上进行汇集(aggregation),好比sum或者average,此时使用reduceByKey或者aggregateByKey的效率更高。2. 默认状况下,输出的并行程度取决于RDD分区的数量,但也能够经过给可选参数numTasks赋值来调整并发任务的数量。

1 >>> newRows = rows.map(lambda r: r[0].split(','))  
2 >>> ward_schoolname = newRows .map(lambda r: (r[1], r[5])).groupByKey()  # r[1]为ward的名字,r[5]为学校的名字
3 >>> ward_schoolname.map(lambda x: {x[0]: list(x[1])}).collect()  # 列出每一个ward区域内全部的学校的名字
4 [{'Stifford Clays': ['William Edwards School', 'Brentwood County High School', "The Coopers' Company and Coborn School", 'Becket Keys Church of England Free School', ...] 
# 输出结果为在Stifford Clays这个ward里的学校有William Edwards School,Brentwood County High School,The Coopers' Company and Coborn School等等...

 

11. reduceByKey(func, [numTasks]) 
做用于键值对(K, V)上,按Key分组,而后将Key相同的键值对的Value都执行func操做,获得一个值,注意func的类型必须知足

1 >>> pupils = newRows.map(lambda r: (r[1], int(r[6])))  # r[1]为ward的名字,r[6]为每一个学校的学生数
2 >>> ward_pupils = pupils.reduceByKey(lambda x, y: x+y)   # 计算各个ward中的学生数
3 >>> ward_pupils.collect()  # 输出各个ward中的学生数
4 [('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526), 
('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835),
('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585),
('Camberwell Green', 1374), ('Glyndon', 4633),...]

 

12. aggregateByKey(zeroValueseqOpcomOp, [numTasks]) 
在于键值对(K, V)的RDD中,按key将value进行分组合并,合并时,将每一个value和初始值做为seqOp函数的参数,进行计算,返回的结果做为一个新的键值对(K, V),而后再将结果按照key进行合并,最后将每一个分组的value传递给comOp函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给comOp函数,以此类推),将key与计算结果做为一个新的键值对(K, V)输出。 
例子: 上述统计ward内学生人数的操做也能够经过aggregateByKey实现,此时,seqOpcomOp都是进行加法操做,代码以下:

1 >>> ward_pupils = pupils.aggregateByKey(0, lambda x, y: x+y, lambda x, y: x+y)
2 >>> ward_pupils.collect()  
3 [('Stifford Clays', 1566), ('Shenley', 1625), ('Southbury', 3526), 
('Rainham and Wennington', 769), ('Bromley Town', 574), ('Waltham Abbey Honey Lane', 835),
('Telegraph Hill', 1238), ('Chigwell Village', 1506), ('Gooshays', 2097), ('Edgware', 2585),
('Camberwell Green', 1374), ('Glyndon', 4633),...]

 

13. sortByKey([ascending=True], [numTasks]) 
按照Key进行排序,ascending的值默认为True,True/False表示升序仍是降序 
例如:将上述ward按照ward名字降序排列,打印出前十个

1 >>> ward_pupils.sortByKey(False, 4).take(10)
2 [('Yiewsley', 2560), ('Wormholt and White City', 1455), ('Woodside', 1204), 
('Woodhouse', 2930), ('Woodcote', 1214), ('Winchmore Hill', 1116), ('Wilmington', 2243),
('Willesden Green', 1896), ('Whitefoot', 676), ('Whalebone', 2294)]

 

14. join(otherDataset, [numTasks]) 
相似于SQL中的链接操做,即做用于键值对(K, V)和(K, W)上,返回元组 (K, (V, W)),spark也支持外链接,包括leftOuterJoin,rightOuterJoin和fullOuterJoin。例子:

 1 >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
 2 >>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
 3 >>> class1.join(class2).collect()
 4 [('Tom', ('attended', 'attended'))]
 5 >>> class1.leftOuterJoin(class2).collect()
 6 [('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None))]
 7 >>> class1.rightOuterJoin(class2).collect()
 8 [('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]
 9 >>> class1.fullOuterJoin(class2).collect()
10 [('John', (None, 'attended')), ('Tom', ('attended', 'attended')), ('Jenny', ('attended', None)), ('Bob', ('attended', None)), ('Amy', (None, 'attended')), ('Alice', (None, 'attended'))]

 

15. cogroup(otherDataset, [numTasks]) 
做用于键值对(K, V)和(K, W)上,返回元组 (K, (Iterable, Iterable))。这一操做可叫作groupWith。

1 >>> class1 = sc.parallelize(('Tom', 'Jenny', 'Bob')).map(lambda a: (a, 'attended'))
2 >>> class2 = sc.parallelize(('Tom', 'Amy', 'Alice', 'John')).map(lambda a: (a, 'attended'))
3 >>> group = class1.cogroup(class2)
4 >>> group.collect()
5 [('John', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808afd0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a1d0>)), 
('Tom', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a7f0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a048>)),
('Jenny', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a9b0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a208>)),
('Bob', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808ae80>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b448d0>)),
('Amy', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44c88>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44588>)),
('Alice', (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44748>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44f98>))] 6 >>> group.map(lambda x: {x[0]: [list(x[1][0]), list(x[1][1])]}).collect() 7 [{'John': [[], ['attended']]}, {'Tom': [['attended'], ['attended']]}, {'Jenny': [['attended'], []]}, {'Bob': [['attended'], []]}, {'Amy': [[], ['attended']]}, {'Alice': [[], ['attended']]}]

 

16. cartesian(otherDataset) 
笛卡尔乘积,做用于数据集T和U上,返回(T, U),即数据集中每一个元素的两两组合

1 >>> a = sc.parallelize(('a', 'b', 'c'))
2 >>> b = sc.parallelize(('d', 'e', 'f'))
3 >>> a.cartesian(b).collect()
4 [('a', 'd'), ('a', 'e'), ('a', 'f'), ('b', 'd'), ('b', 'e'), ('b', 'f'), ('c', 'd'), ('c', 'e'), ('c', 'f')]

 

17. pipe(command, [envVars]) 
将驱动程序中的RDD交给shell处理(外部进程),例如Perl或bash脚本。RDD元素做为标准输入传给脚本,脚本处理以后的标准输出会做为新的RDD返回给驱动程序。

 

18. coalesce(numPartitions) 
将RDD的分区数减少到numPartitions个。当数据集经过过滤规模减少时,使用这个操做能够提高性能。

 

19. repartition(numPartitions) 
重组数据,数据被从新随机分区为numPartitions个,numPartitions能够比原来大,也能够比原来小,平衡各个分区。这一操做会将整个数据集在网络中从新洗牌。

 

20. repartitionAndSortWithinPartitions(partitioner) 
根据给定的partitioner函数从新将RDD分区,并在分区内排序。这比先repartition而后在分区内sort高效,缘由是这样迫使排序操做被移到了shuffle阶段。

相关文章
相关标签/搜索