[Spark] Spark 对RDD编程

本篇博客中的操做都在 ./bin/pyspark 中执行。python

RDD,即弹性分布式数据集(Resilient Distributed Dataset),是Spark对数据的核心抽象。RDD是分布式元素的集合,对手的全部操做均可以归纳为:shell

  • 建立RDD
  • 转化已有RDD
  • 调用RDD操做进行求值

在这些操做中,Spark会自动将RDD中的数据分发的集群上,并将操做自动化执行。缓存

每一个RDD都被分为多个分区,这些分区运行在集群中的不一样节点上。分布式

Get Started

用户能够:函数

  • 读取一个外部数据集
  • 或者使用对象集合(好比 list 或者 set)

来建立 RDD。好比使用 SparkContext.textFile() 来建立一个字符串RDD:优化

lines = sc.textFile("README.md")

RDD建立以后,支持:url

  • 转化操做(transformation):会由一个RDD生成一个新的RDD。
  • 行动操做(action):会计算出一个结果,并把结果返回到驱动器程序中,或存储在外部存储系统中。

Spark对RDD是惰性计算的,只有在行动操做(action)时,才会真正计算。spa

回到shell 中,再执行:.net

pythonLines = lines.filter(lambda line: "Python" in line)

行动操做 first() 之中,Spark才进行真正的计算,而这时候只须要计算结果中真正须要的数据:在这里,Spark只须要扫面文件知道找到第一个匹配的行(包含"Python"的行)就中止了。3d

默认状况下,Spark的RDD会在每次进行行动(Action)操做的时候从新计算,若是想在多个行动操做中使用同一个RDD,可使用.persist()方法来让Spark把这个RDD缓存下来,这个操做叫作:持久化。

持久化方便在之后的操做中重用数据。

总的来讲,Spark会这样工做:

  1. 建立出RDD
  2. 使用转化操做(好比filter)对RDD进行转化,建立出新RDD
  3. 告诉Spark咱们要重用哪些中间结果,对这些RDD进行持久化操做
  4. 使用行动(Action)操做,来触发一次计算,Spark会对计算进行优化后再执行。

另:cache()persist() 使用的默认存储级别是同样的。

建立一个RDD

使用外部数据集的方式比较常见,这里咱们就看一个文本文档的例子:

lines = sc.textFile("README.md")

为了下面的演示不麻烦,咱们这里主要看经过将程序中的集合转化为RDD的方法,快速建立一个RDD:

lines = sc.parallelize(["Hello world", "News about Senate Hacking Hearing","US official says Russia undoubtedly meddled in US election"])

// 注意上面的RDD中出现了两个"US",后面有用。

对进行RDD操做

RDD的转化操做是返回一个新的RDD的操做,好比.filter()操做就是转化操做。

RDD的行动操做是向驱动器程序返回结果,或者把结果写入外部驱动器,行动操做会触发实际的计算,好比.count()或者 .first()方法。

转化操做

场景:找了个本身之前程序的log文件,咱们使用Spark找出其中的错误(ERROR)信息,文件link

下面是使用.filter()实现转化操做:

>>> inputRDD   =  sc.textFile("url_Requests.log")
>>> errorsRDD  =  inputRDD.filter(lambda x : "ERROR"   in x)
>>> cautionRDD =  inputRDD.filter(lambda x : "CAUTION" in x)

注意 .filter() 方法不会改变已有的 inputRDD 中的数据,该操做会返回一个全新的RDD,inputRDD还在后面的程序中还能够继续使用。

而后再来一个.union() 操做:

>>> badlineRDD = errorsRDD.union(cautionRDD)

.union() 操做就是取并集,这个还比较好理解。

经过转化操做,能够从已有的RDD中派生出新的RDD。

行动(action)操做

好比说.count()操做,就是一个行动操做:

count

另一个常见的操做是.collect():

collect-operation

对于.collect()操做来讲,能够用来获取整个RDD中的数据。只有当整个RDD的数据能在单台机器的内存中放得下时,才能使用该方法。

当咱们每次调用一个新的行动操做时,整个RDD都会从头开始计算,若是要避免这种行为,用户可让中间结果持久化,这个在后面会提到。

关于惰性求值

RDD的转化都是惰性求值的,就是说在被调用行动曹组偶以前,Spark不会开始计算。

惰性求值觉得这咱们对RDD调用转化操做是,操做不会当即执行,Spark会在内部记录下全部要执行的操做信息。咱们能够把RDD当成咱们经过转化操做构建出来的特定数据集。

上面操做过的把文本数据读到RDD的操做一样也是惰性的,当咱们调用 sc.textFile()时,数据并无读取进来,而是在必要时才会读取。 和转化操做相同的是,读取数据的操做也有可能被屡次执行。

常见RDD的转化操做和行动操做

对各个元素的转化操做

其中的一个例子是 .map()方法,map能够对RDD中的每一个数据进行操做:

>>> nums = sc.parallelize([1,2,3,4])
>>> squared = nums.map(lambda x : x ** 2)
>>> squared.collect()
[1, 4, 9, 16]

再好比咱们刚才的日志文件:

>>> numberOfLines = errorsRDD.map(lambda line: len(line))
>>> numberOfLines.collect()

这里,咱们计算了每行错误日志的字符数,结果为:

另外一个是flatMap,看一个例子就懂了,还记得咱们刚才建立的lines吗:

lines.collect()
words = lines.flatMap(lambda line: line.split(" "))
words.collect()

其输出结果为:

.map() 有什么区别呢,这是 map 的输出结果,很容易懂:

difference-between-map-and-flatmap

使用 .distinct() 操做进行去重:

distinct

常见行动操做

.reduce() 是最经常使用的行动操做:

reduce 接受一个函数做为参数,这个函数要操做2个相同元素类型的RDD数据,并返回一个一样类型的新元素。

此外,还有toptake 等常见操做:

行动操做

相关文章
相关标签/搜索