【原】Learning Spark (Python版) 学习笔记(一)----RDD 基本概念与命令

       《Learning Spark》这本书算是Spark入门的必读书了,中文版是《Spark快速大数据分析》,不过豆瓣书评颇有意思的是,英文原版评分7.4,评论都说入门而已深刻不足,中文译版评分8.4,评论一片好评,有点意思。我倒以为这本书能够做为官方文档的一个补充,刷完后基本上对Spark的一些基本概念、码简单的程序是没有问题的了。这本书有一个好处是它是用三门语言写的,Python/Java/Scala,因此适用性很广,个人观点是,先精通一门语言,再去学其余语言。因为我工做中比较经常使用的是Python,因此就用把Python相关的命令总结一下。下一阶段再深刻学习Java和Scala。这一篇总结第一章-第三章的重点内容。shell

 
  说到Spark,就不得不提到RDD,RDD,字面意思是弹性分布式数据集,其实就是分布式的元素集合。Python的基本内置的数据类型有整型、字符串、元祖、列表、字典,布尔类型等,而Spark的数据类型只有RDD这一种,在Spark里,对数据的全部操做,基本上就是围绕RDD来的,譬如建立、转换、求值等等。全部RDD的转换都是lazy(惰性求值)的,RDD的转换操做会生成新的RDD,新的RDD的数据依赖于原来的RDD的数据,每一个RDD又包含多个分区。那么一段程序实际上就构造了一个由相互依赖的多个RDD组成的有向无环图(DAG)。并经过在RDD上执行动做将这个有向无环图做为一个Job提交给Spark执行。理解RDD后能够避免之后走不少弯路。关于RDD的特色,能够搜到不少资料,其实咱们只须要理解两点就能够了:
  1.不可变
      2.分布式
 
     有人会以为很奇怪,若是RDD不可变,那么在进行数据操做的时候,怎么改变它的值,怎么进行计算呢?其实RDD支持两种操做
     1.Tansformation(转化操做):返回值仍是一个RDD
     2.Action(行动操做):返回值不是一个RDD
 
     第一种Transformation是返回一个新的RDD,如map(),filter()等。这种操做是lazy(惰性)的,即从一个RDD转换生成另外一个RDD的操做不是立刻执行,只是记录下来,只有等到有Action操做是才会真正启动计算,将生成的新RDD写到内存或hdfs里,不会对原有的RDD的值进行改变。而Action操做才会实际触发Spark计算,对RDD计算出一个结果,并把结果返回到内存或hdfs中,如count(),first()等。
 
     通俗点理解的话,就是假设你写了一堆程序,里面对数据进行了屡次转换,这个时候实际上没有计算,就只是放着这里。在最后出结果的时候会用到Action操做,这个时候Action会执行与之相关的转换操做,运算速度会很是快(一是Action不必定须要调用全部的transformation操做,二是只有在最后一步才会计算相关的transformation操做)。若是Transformation没有lazy性质的话,每转换一次就要计算一次,最后Action操做的时候还要计算一次,会很是耗内存,也会极大下降计算速度。
 
     还有一种状况,若是咱们想屡次使用同一个RDD,每次都对RDD进行Action操做的话,会极大的消耗Spark的内存,这种状况下,咱们可使用RDD.persist()把这个RDD缓存下来,在内存不足时,能够存储到磁盘(disk)里。在Python中,储存的对象永远是经过Pickle库序列化过的,因此社不设置序列化级别不会产生影响。
 
     RDD的性质和操做方式讲完了,如今来讲说怎么建立RDD,有两种方式
     1.读取一个外部数据集
     2.在内存中对一个集合进行并行化(parallelize)
 
     第二种方式相对来讲更简单,你能够直接在shell里快速建立RDD,举个例子:
1 A = [1,2,3,4,5] 2 lines = sc.parallelize(A) 3 #另外一种方式
4 lines = sc.parallelize([1,2,3,4,5])
  
  可是这种方式并非很好,由于你须要把你的整个数据集放在内存里,若是数据量比较大,会很占内存。因此,能够在测试的时候用这种方式,简单快速。
     
  读取外部数据及时须要用到SparkContext.textFile()
 
 1 lines = sc.textFile("README.md") 
 
  RDD的操做命令不少,包括map(),filter()等Transformation操做以及reduce(),fold(),aggregate()等Action操做。
  • 常见的Transformation操做:

   map( )和flatMap( )的联系和区别 缓存

map( ):接收一个函数,应用到RDD中的每一个元素,而后为每一条输入返回一个对象。

filter( ):接收一个函数,将函数的元素放入新的RDD中返回。

flatMap( ):接收一个函数,应用到RDD中的每一个元素,返回一个包含可迭代的类型(如list等)的RDD,能够理解为先Map(),后flat().

 

  
  用一个图能够很清楚的理解:
  
 
 
  伪集合操做:
 
1 distinct( )、union( )、intersection( )、subtract( )
2 distinct( ):去重
3 union( ):两个RDD的并集
4 intersection( ):两个RDD的交集
5 subtract( ):两个RDD的补集
6 cartesian( ):两个RDD的笛卡尔积(能够应用于计算类似度中,如计算各用户对各类产品的预期兴趣程度)

注:
 
    
1.intersection( )的性能比union( )差不少,由于它须要数据混洗来发现共同数据
 
    
2.substract( )也须要数据混洗

 

  • 常见的Action操做:

  

1 reduce( ):接收一个函数做为参数,这个函数要操做两个相同元素类型的RDD,也返回一个一样类型的RDD,能够计算RDD中元素的和、个数、以及其余聚合类型的操做。
2 
3 fold( ):和reduce同样,但须要提供初始值。
4 
5 aggregate( ):和fold相似,但一般返回不一样类型的函数。
6 
7 注:
关于fold()和aggregate(),再说点题外话。fold()只能作同构聚合操做,就是说,若是你有一个RDD[X],经过fold,你只能构造出一个X。可是若是你想经过RDD[X]构造一个Y呢?那就得用到aggregate()了,使用aggregate时,须要提供初始值(初始值的类型与最终返回的类型相同),而后经过一个函数把一RDD的元素合并起来放到累加器里,再提供一个函数将累加器两两相加。由此能够看出,fold()须要保证灭个partition可以独立进行运算,而aggregate()对于不一样partition(分区)提交的最终结果专门定义了一个函数来进行处理。

 

 

  RDD还有不少其余的操做命令,譬如collect(),count(),take(),top(),countByValue(),foreach()等,限于篇幅,就不一一表述了。分布式

 

  最后来说讲如何Spark传递函数
  两种方式:
  1.简单的函数:lambda表达式
     适合比较短的函数,不支持多语句函数和无返回值的语句。
  2.def函数
     会将整个对象传递过去,可是最好不要传递一个带字段引用的函数。若是你传递的对象是某个对象的成员,或者在某个函数中引用了一个整个字段,会报错。举个例子:
1 class MyClass(object): 2     def __init__(self): 3         self.field = “Hello” 4 
5     def doStuff(self, rdd): 6         #报错:由于在self.field中引用了整个self
7         return rdd.map(lambda s: self.field + x)

 

 解决方法:直接把你须要的字段拿出来放到一个局部变量里,而后传递这个局部变量就能够了。函数

 

1 class MyClass(object): 2     def __init__(self): 3         self.field = “Hello” 4 
5     def doStuff(self, rdd): 6         #将须要的字段提取到局部变量中便可
7         field = self.field 8         return rdd.map(lambda s: field + x)

  

  前面三章讲了Spark的基本概念和RDD的特性以及一些简单的命令,比较简单。后面三章主要讲了键值对操做、数据的读取和保存以及累加器、广播变量等,下周再更新。性能

相关文章
相关标签/搜索