面试题引出:css
Stage:根据RDD之间的依赖关系的不一样将Job划分红不一样的Stage,遇到一个宽依赖则划分一个Stage。python
Task:Stage是一个TaskSet,将Stage根据分区数划分红一个个的Task。面试
1)map(func):返回一个新的RDD,该RDD由每个输入元素通过func函数转换后组成.shell
2)mapPartitions(func):相似于map,但独立地在RDD的每个分片上运行,所以在类型为T的RD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理全部分区。数组
3)reduceByKey(func,[numTask]):在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用定的reduce函数,将相同key的值聚合到一块儿,reduce任务的个数能够经过第二个可选的参数来设置。ruby
4)aggregateByKey (zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U: 在kv对的RDD中,,按key将value进行分组合并,合并时,将每一个value和初始值做为seq函数的参数,进行计算,返回的结果做为一个新的kv对,而后再将结果按照key进行合并,最后将每一个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果做为一个新的kv对输出。app
5)combineByKey(createCombiner: V=>C, mergeValue: (C, V) =>C, mergeCombiners: (C, C) =>C):函数
对相同K,把V合并成一个集合。oop
1.createCombiner: combineByKey() 会遍历分区中的全部元素,所以每一个元素的键要么尚未遇到过,要么就和以前的某个元素的键相同。若是这是一个新的元素,combineByKey()会使用一个叫做createCombiner()的函数来建立那个键对应的累加器的初始值spa
2.mergeValue: 若是这是一个在处理当前分区以前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
3.mergeCombiners: 因为每一个分区都是独立处理的, 所以对于同一个键能够有多个累加器。若是有两个或者更多的分区都有对应同一个键的累加器, 就须要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
…
根据自身状况选择比较熟悉的算子加以介绍。
1)reduce:
2)collect:
3)first:
4)take:
5)aggregate:
6)countByKey:
7)foreach:
8)saveAsTextFile:
reduceBykey:
groupByKey:
…ByKey:
初识spark,须要对其API有熟悉的了解才能方便开发上层应用。本文用图形的方式直观表达相关API的工做特色,并提供了解新的API接口使用的方法。例子代码所有使用python实现。
准备输入文件:
$ cat /tmp/in apple bag bag cat cat cat
启动pyspark:
$ ./spark/bin/pyspark
使用textFile建立RDD:
>>> txt = sc.textFile("file:///tmp/in", 2)
查看RDD分区与数据:
>>> txt.glom().collect() [[u'apple', u'bag bag'], [u'cat cat cat']]
处理RDD的每一行,一对多映射。
代码示例:
>>> txt.flatMap(lambda line: line.split()).collect() [u'apple', u'bag', u'bag', u'cat', u'cat', u'cat']
示意图:
处理RDD的每一行,一对一映射。
代码示例:
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).collect() [(u'apple', 1), (u'bag', 1), (u'bag', 1), (u'cat', 1), (u'cat', 1), (u'cat', 1)]
示意图:
处理RDD的每一行,过滤掉不知足条件的行。
代码示例:
>>> txt.flatMap(lambda line: line.split()).filter(lambda word: word !='bag').collect() [u'apple', u'cat', u'cat', u'cat']
逐个处理每个partition,使用迭代器it访问每一个partition的行。
代码示例:
>>> txt.flatMap(lambda line: line.split()).mapPartitions(lambda it: [len(list(it))]).collect() [3, 3]
示意图:
逐个处理每个partition,使用迭代器it访问每一个partition的行,index保存partition的索引,等价于mapPartitionsWithSplit(过时函数)。
代码示例:
>>> txt.flatMap(lambda line: line.split()).mapPartitionsWithIndex(lambda index, it: [index]).collect() [0, 1]
示意图:
根据采样因子指定的比例,对数据进行采样,能够选择是否用随机数进行替换,seed用于指定随机数生成器种子。第一个参数表示是否放回抽样,第二个参数表示抽样比例,第三个参数表示随机数seed。
代码示例:
>>> txt.flatMap(lambda line: line.split()).sample(False, 0.5, 5).collect() [u'bag', u'bag', u'cat', u'cat']
示意图:
合并RDD,不去重。
代码示例:
>>> txt.union(txt).collect() [u'apple', u'bag bag', u'cat cat cat', u'apple', u'bag bag', u'cat cat cat']
示意图:
对RDD去重。
代码示例:
>>> txt.flatMap(lambda line: line.split()).distinct().collect() [u'bag', u'apple', u'cat']
示意图:
在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集。
代码示例:
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect() [(u'bag', <pyspark.resultiterable.ResultIterable object at 0x128a150>), (u'apple', <pyspark.resultiterable.ResultIterable object at 0x128a550>), (u'cat', <pyspark.resultiterable.ResultIterable object at 0x13234d0>)] >>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).groupByKey().collect()[0][1].data [1, 1]
示意图:
在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的reduce函数,将相同key的值聚合到一块儿。
代码示例:
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).collect() [(u'bag', 2), (u'apple', 1), (u'cat', 3)]
示意图:
自定义聚合函数,相似groupByKey。在一个(K,V)对的数据集上调用,不过能够返回一个(K,Seq[U])对的数据集。
代码示例(实现groupByKey的功能):
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).aggregateByKey([], lambda seq, elem: seq + [elem], lambda a, b: a + b).collect() [(u'bag', [1, 1]), (u'apple', [1]), (u'cat', [1, 1, 1])]
在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由ascending布尔参数决定。
代码示例:
>>> txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey().collect() [(u'apple', 1), (u'bag', 2), (u'cat', 3)]
示意图:
在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的全部元素对在一块儿的(K, (V, W))数据集。
代码示例:
>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey() >>> sorted_txt.join(sorted_txt).collect() [(u'bag', (2, 2)), (u'apple', (1, 1)), (u'cat', (3, 3))]
示意图:
在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, (Seq[V], Seq[W]))元组的数据集。这个操做也能够称之为groupwith。
代码示例:
>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey() >>> sorted_txt.cogroup(sorted_txt).collect() [(u'bag', (<pyspark.resultiterable.ResultIterable object at 0x1323790>, <pyspark.resultiterable.ResultIterable object at 0x1323310>)), (u'apple', (<pyspark.resultiterable.ResultIterable object at 0x1323990>, <pyspark.resultiterable.ResultIterable object at 0x1323ad0>)), (u'cat', (<pyspark.resultiterable.ResultIterable object at 0x1323110>, <pyspark.resultiterable.ResultIterable object at 0x13230d0>))] >>> sorted_txt.cogroup(sorted_txt).collect()[0][1][0].data [2]
示意图:
笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U)对数据集(两两的元素对)。
代码示例:
>>> sorted_txt = txt.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey() >>> sorted_txt.cogroup(sorted_txt).collect() [(u'bag', (<pyspark.resultiterable.ResultIterable object at 0x1323790>, <pyspark.resultiterable.ResultIterable object at 0x1323310>)), (u'apple', (<pyspark.resultiterable.ResultIterable object at 0x1323990>, <pyspark.resultiterable.ResultIterable object at 0x1323ad0>)), (u'cat', (<pyspark.resultiterable.ResultIterable object at 0x1323110>, <pyspark.resultiterable.ResultIterable object at 0x13230d0>))] >>> sorted_txt.cogroup(sorted_txt).collect()[0][1][0].data [2]
示意图:
处理RDD的每一行做为shell命令输入,shell命令结果为输出。
代码示例:
>>> txt.pipe("awk '{print $1}'").collect() [u'apple', u'bag', u'cat']
示意图:
减小RDD分区数。
代码示例:
>>> txt.coalesce(1).collect() [u'apple', u'bag bag', u'cat cat cat']
示意图:
对RDD从新分区,相似于coalesce。
代码示例:
>>> txt.repartition(1).collect() [u'apple', u'bag bag', u'cat cat cat']
合并两个RDD序列为元组,要求序列长度相等。
代码示例:
>>> txt.zip(txt).collect() [(u'apple', u'apple'), (u'bag bag', u'bag bag'), (u'cat cat cat', u'cat cat cat')]
示意图:
汇集数据集中的全部元素。
代码示例:
>>> txt.reduce(lambda a, b: a + " " + b) u'apple bag bag cat cat cat'
示意图:
以数组的形式,返回数据集的全部元素。
代码示例:
>>> txt.collect() [u'apple', u'bag bag', u'cat cat cat']
返回数据集的元素的个数。
代码示例:
>>> txt.count() 3
返回数据集第一个元素。
代码示例:
>>> txt.first() u'apple'
返回数据集前n个元素。
代码示例:
>>> txt.take(2) [u'apple', u'bag bag']
采样返回数据集前n个元素。第一个参数表示是否放回抽样,第二个参数表示抽样个数,第三个参数表示随机数seed。
代码示例:
>>> txt.takeSample(False, 2, 1) [u'cat cat cat', u'bag bag']
排序返回前n个元素。
代码示例:
>>> txt.takeOrdered(2) [u'apple', u'bag bag']
将数据集的元素,以textfile的形式,保存到本地文件系统,HDFS或者任何其它hadoop支持的文件系统。
代码示例:
>>> txt.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).saveAsTextFile("file:///tmp/out")
查看输出文件:
$cat /tmp/out/part-00001 (u'bag', 2) (u'apple', 1) (u'cat', 3)
将数据集的元素,以Hadoop sequencefile的格式,保存到指定的目录下,本地系统,HDFS或者任何其它hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者隐式的能够转换为Writable的RDD。
对(K,V)类型的RDD有效,返回一个(K,Int)对的Map,表示每个key对应的元素个数。
代码示例:
>>> txt.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).countByKey() defaultdict(<type 'int'>, {u'bag': 2, u'apple': 1, u'cat': 3})
在数据集的每个元素上,运行函数func进行更新。这一般用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互。
代码示例:
>>> def func(line): print line >>> txt.foreach(lambda line: func(line)) apple bag bag cat cat cat