RDD编程详解

RDD即弹性分布式数据集(Resilient Distributed Dataset),其实就是分布式的元素集合。就像一个文件中的所有内容、一个数据库表中的所有记录,或者是一个列表中的全部元素、一个字典中的全部键值对,不一样于常规对象的是他们中单个对象的内容被分布式的保存在集群的不一样节点中,在计算时也能够在多个节点上运行。 在spark中,对数据的全部操做都不外乎建立RDD、转化已有RDD和调用RDD操做进行求值。python

建立RDD操做
用户可使用两种方法来建立RDD:读取一个外部数据集,或者在驱动器程序里分发驱动器程序中的对象集合。在Spark中进行的操做都是SparkContext的内部方法,经过SparkContext实例化对象实现,常见的建立操做有:数据库

>>> lines = sc.textFile("README.md") #读取文本文件做为一个字符串RDD,文件中每一行的内容为一个元素。
>>> lines = sc.parallelize(["pandas", "i like pandas"]) #建立RDD的最简单方式

转化RDD操做
转化操做会由一个RDD生成一个新的RDD
当咱们对RDD进行转化操做时不会当即执行,相反Spark会在内部记录下所要求执行的操做的相关信息。因此咱们不该该把RDD看作存放着特定数据的数据集,而最好把每一个RDD当作咱们经过转化操做构建出来的,记录如何计算数据的指令列表。分布式

>>> errorsRDD = inputRDD.filter(lambda line: "error" in line) #筛选包含error字符串的行并组成新RDD 
>>> warningsRDD = inputRDD.filter(lambda line: "warning" in line)
>>> badlinesRDD = errorsRDD.union(warningsRDD) #合并两个RDD中的元素
>>> tenLines = sc.parallelize(tenLines) #将list转化成RDD对象
>>> tenLinesList = tenLines.collect() #collect用来将RDD中对象转化成列表,注意普通的列表对象只能存储在本机,因此使用collect以前要确保本机内存够用

调用RDD行动操做
行动操做是向驱动器程序返回结果或把结果写入外部系统的操做,会触发实际的计算。函数

>>> pythonLines.persist() #将对象持久化到内存中,默认状况下每次进行spark的行动操时会从新计算变量,当须要屡次调用某一对象时能够这么作
>>> pythonLines.first() #提取第一个元素实际操做时只读取pythonLines的第一个元素,并非加载完pythonLines变量而后取第一个元素
>>> pythonLines.count() #统计元素个数
16
>>> nums = sc.parallelize([1,2,3,3,4])
>>> nums.countByValue()	#统计元素的出现频率
defaultdict(int, {1: 1, 2: 1, 3: 2, 4: 1})
>>> nums.take(2)	#take方法能提取RDD中的n个元素,其会尽可能访问少的节点,因此获得的集合会不均衡,该方法的返回值类型是普通集合
[1,2]
>>> nums.top(2)	#返回RDD中顺序最靠前的n个元素
[4,3]
>>> nums.takeSample(True, 3, 2)	#随机取样函数,第一个参数指定是否容许重复取样,第二个参数指定样本容量,第三个参数为取样的随机数种子
[1,4,4]

虽然你能够在任什么时候候定义新的RDD,可是spark只会惰性计算这些RDD。他们只有第一次在一个行动操做中用到时才会真正进行计算。在大数据领域中,对象的内容会十分庞大,这种作法能够有效减小加载到内存中的变量大小。 每当咱们调用一个新的行动操做的时候,整个RDD都会从头开始计算,要避免这种重复低效的行为须要咱们在合适的场景下将中间结果手动持久化。
Spark的一些转化和行动操做都须要依赖用户传递的函数来计算。在Python中有三种方式把函数传递给Spark:lambda表达式、传递顶层函数和调用定义的局部函数。大数据

>>> word = rdd.filter(lambda s: "error" in s)
>>> def containsError(s):
>>> return "error" in s
>>> word = rdd.filter(containsError)

>>> nums = sc.parallelize([1,2,3,4])
>>> squared = nums.map(lambda x: x*x).collect()	#相似于python中map函数的用法
>>> for num in squared:
>>>     print(num)

>>> lines = sc.parallelize(["hello world", "hi girl"])
>>> words = lines.flatMap(lambda line: line.split(" "))	#flatMap()的做用是将全部的输出结果聚合到一个迭代器能够访问的RDD中
>>> for word in words.collect():
>>>     print(word)

>>> rdd1 = ["coffee", "coffee", "tea", "juice", "cola"]
>>> rdd2 = ["tea", "juice", "water"]
>>> rdd1.distinct()	#RDD元素去重操做
["coffee", "tea", "juice", "cola"]
>>> rdd1.union(rdd2)	#两个RDD对象中元素的并集
["coffee", "coffee", "tea", "juice", "cola", "water"]
>>> rdd1.intersection(rdd2)	#计算两个RDD中元素的交集
["tea", "juice"]
>>> rdd1.substract(rdd2)	#计算两个RDD中元素的差集
["coffee", "cola"]

>>> nums = sc.parallelize([1,2,3,4,5])
>>> nums.reduce(lambda x, y: x+y)	#将第一个元素与第二个元素做为形参传入函数,将计算出来的结果做为第一个形参,第三个元素做为第二个形参继续进行迭代计算,以此类推计算出最终值,这里至关于计算1+2+3+4+5
15
>>> nums.aggregate((0, 0), lambda acc, value: (acc[0] + value, acc[1] + 1), lambda
 acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))	#aggregate的用法与reduce相似,不一样之处在于reduce的输出值只能和输入值保持同种类型,可是aggregate能够自定义结果类型,第一个参数指定默认值,第二个参数指定各个节点上数据的处理函数(同reduce函数参数),第三个参数指定各节点计算结果的合并方法
(15, 5)
>>> nums.fold(8, lambda x, y: [x, y])	#fold方法是简化版的aggregate,不可更改输出结果类型,计算原理相似于aggregate,将默认值做为第一个参数参与迭代计算,计算完毕后与其余节点上的默认值进行迭代合并
[8, [[[[[8, 1], 2], 3], 4], 5]]

传递函数时须要注意的是,Python会在你不经意间把函数所在的类对象也序列化传递出去。当你传递的对象是某个类对象的成员,或者包含了对某个类对象中一个字段的引用时,Spark就会把整个类对象发到工做节点上,这可能会传递更大的数据量。有时候传递的类里面包含Python不知道的对象,也会致使程序报错。ui

class SearchFunctions(object):
	def __init__(self, query):
		self.query = query
	def isMatch(self, s):
		return self.query in s
	def getMatchesFunctionReference(self, rdd):
		return rdd.filter(self.isMatch) #这里在“self.isMatch”中引用了整个self

替代的方案是只把须要的字段从对象中拿出来放到一个局部变量中,而后传递这个局部变量。spa

class WorldFunctions(object):
	def getMatchesNoReference(self, rdd):
		query = self.query
		return rdd.filter(lambda x: querty in x)
相关文章
相关标签/搜索