来自官网的Spark Programming Guide,包括我的理解的东西。node
这里有一个疑惑点,pyspark是否支持Python内置函数(list、tuple、dictionary相关操做)?思考加搜索查询以后是这么考虑的:要想在多台机器上分布式处理数据,首先须要是spark支持的数据类型(要使用spark的文件I/O接口来读取数据),pyspark主要是Dataframe;而后须要用到spark的API。原本spark是支持Python的C语言开发的库包,那么Python的内置函数都是能够运行的,可是要想实现分布式处理,提升计算效率,在涉及到数据分发处理时要使用spark的transformation和action。潜台词是非分布式处理的操做能够用内置函数。是这样的吧?python
RDD是spark中最重要的抽象概念(数据结构),是集群中各节点上并行处理的分隔元素的集合(汇总),总会用到collect()方法。程序员
RDD能够从Hadoop文件系统中的文件建立,也能够从执行程序中的Scala集合中建立或转换。spark能够在内存中留存一份RDD,方便在并行运算中高效重用。shell
还有个抽象概念,共享变量。spark在不一样的节点并行执行任务集时,须要把每一个变量的副本传送一份到每一个任务中,有时候变量须要在任务中共享。数组
共享变量有两种:广播变量(Broadcast Variables)和累加器(Accumulators)。前者缓存在全部节点的内存中,后者用来叠加计数或求和。缓存
Spark2.2.0可使用标准的CPython接口,故C库如Numpy可使用,Pandas亦可。数据结构
1)spark程序的第一件事是建立一个spark上下文对象,其中,先要配置本身的应用信息。闭包
from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName('myFirstAPP').setMaster('local[*]') sc = SparkContext(conf=conf)
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
#SparkContext.parallelize()用于将本地Python集合分布式处理为RDD格式,以便并行处理。能够设置分隔的数量,如sc.parallelize(data,6)
#即,要想并行处理,数据必需要是RDD或DataSets或DataFrame格式。数据转换成这些格式后,就可使用C库包来进行其余运算操做。
2)外部文件,spark支持文本文件、序列文件及其余Hadoop输入格式。分布式
distFile = sc.textFile("data.txt") #文本文件,以行集合的格式读取 distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b) #textFile可以使用DataSets的操做
3)RDD操做:两种操做Transformation(从现存数据集中建立新的数据集DataSets)和Action(执行运算后将值返回给执行程序)。好比,map是transformation,reduce是action。ide
全部的transformation都是‘懒的’,只记忆并不执行,只有当action须要返回值给执行程序时才执行计算,这样spark能够更高效。这样只会返回reduce结果,而没有庞大的map数据集。
可是,若是有多个reduce,那么每次都要从新map,解决方法是:能够经过persist
(or cache
)方法将RDD留存在内存中。
lines = sc.textFile("data.txt") #此处只建立一个指针 lineLengths = lines.map(lambda s: len(s)) #此处未计算 lineLengths.persist() #留存,以重用 totalLength = lineLengths.reduce(lambda a, b: a + b) #此处开始计算,只返回计算结果。任务在多台机器上运行,每台机器只负责本身的map部分及本地reduce,并返回本身的值给执行程序。
4)传递函数给spark:lambda表达式(不支持多语句,且要求有返回值),本地自定义def(适用于长代码),模块的Top-level函数。
"""MyScript.py""" if __name__ == "__main__": def myFunc(s): words = s.split(" ") return len(words) #分隔后返回长度。s是文件数据,下面的textFile是RDD #关于if __name__=="__main__"这种写法的用处,前面必然定义了一些函数,那么只在本程序中执行时运行该段代码,载入到其余程序时,就能够只用所定义的函数,而不会执行该段代码 sc = SparkContext(...) sc.textFile("file.txt").map(myFunc)
注意:若是建立新的MyClass并调用doStuff()时,须要调用self.field,这样就须要把整个对象传送到集群中。把field复制到本地变量中可避免该状况。
class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + s) def doStuff(self, rdd): #复制field到本地变量 field = self.field return rdd.map(lambda s: field + s)
5)理解闭包:全局变量须要聚合时,建议使用Accumulator(累加器)。
counter = 0 rdd = sc.parallelize(data) # Wrong: Don't do this!! def increment_counter(x): global counter counter += x rdd.foreach(increment_counter) print("Counter value: ", counter)
本地模式(使用相同的JVM)时可能能够执行,但集群模式就不会如预期般执行。执行以前,spark会计算任务的(序列化)闭包(对每一个执行器均可见的变量或方法),但counter变量传递给执行器的是副本(copies),当foreach方法引用counter时,这已经不是执行节点的counter,而是工做节点的counter,那么最终counter可能仍是0。
执行节点(driver node)执行程序存在的地方,工做节点(work node)把任务分发到集群中的地方。
此外,想要使用rdd.foreach(println)
或rdd.map(println)
打印时,并不能实现预期效果。由于闭包模式中,stdout在工做节点的执行器中,并不在执行节点,故须要先使用collect()将全部元素汇总到执行节点。但把全部元素汇总到一台机器上可能会内存溢出,解决方法是take()
: rdd.take(100).foreach(println),
只打印部分元素。
6)用键值对进行操做:reduceByKey,sortByKey。键值对可以使用Python內建的tuple轻松得到。
lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b) #统计该文件每行的值出现几回,大概有重复的行 counts.collect()
7)常见transformation和action。列出经常使用操做,知道都能实现哪些功能。
Transformation:
map(func) | 通过func映射后,返回新的分布式数据集 |
filter(func) | 返回新的数据集,由func为True时的元素的组成。过滤 |
flatMap(func) | 类map,但每一个输入项可映射到0或多个输出项,故func返回的是个序列 |
mapPartitions(func) | 类map,在RDD的每一个分区上分别执行,那么func的类型必须是迭代器Iterator<T> => Iterator<U> |
mapPartitionsWithIndex(func) | func提供整型值来表示分区的index,func的类型(Int, Iterator<T>) => Iterator<U> |
sample(withReplacement, fraction, seed) | 采样数据的fraction部分,可替换可不替换,随机数种子 |
union(otherDataset) | 返回新的数据集,包括源数据和其余数据的元素,联合 |
intersection(otherDataset) | 插入 |
distinct([numTasks])) | 去重 |
groupByKey([numTasks]) | 分组,note:若分组后要聚合,那么直接使用reduceByKey()或aggregateByKey()效率更高。任务数可选 |
reduceByKey(func, [numTasks]) | 聚合 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 聚合 |
sortByKey([ascending], [numTasks]) | 排序 |
join(otherDataset, [numTasks]) | 链接, (K, V) and (K, W)->(K,(V,W))。外链接leftOuterJoin , rightOuterJoin , and fullOuterJoin |
cogroup(otherDataset, [numTasks]) | (K, V) and (K, W)->(K, (Iterable<V>, Iterable<W>)) tuples |
cartesian(otherDataset) | 用于T和U类型RDD时,返回(T, U)对(类型键值对RDD)。笛卡尔的(笛卡尔乘积?) |
pipe(command, [envVars]) | 经过shell命令管道处理每一个RDD分片 |
coalesce(numPartitions) | 减小分片数,适用于大的数据集过滤后 |
repartition(numPartitions) | 从新分片,生成多的或少的分片数 |
repartitionAndSortWithinPartitions(partitioner) | 从新分片并排序,若是重分片后须要排序,那么直接使用该函数 |
Action:
reduce(func) | 使用func聚合元素,(两个参数,而后返回一个结果),要求func是可交换、可组合的(加法交换律、结合律?),以便并行处理 |
collect() | 返回数据集的全部元素,做为执行程序的数组 |
count() | 返回数据集的元素数 |
first() | 返回数据集的第一个元素 |
take(n) | 返回前n个元素组成的数组 |
takeSample(withReplacement, num, [seed]) | 返回随机采样的num个元素组成的数组 |
takeOrdered(n, [ordering]) | 返回排序后的前n个元素,天然顺序或自定义比较器 |
saveAsTextFile(path) | 把数据集的元素做为TextFile写入到指定路径。spark会对每一个元素调用toString,将其转换为文件中的一行文本 |
saveAsSequenceFile(path) (Java and Scala) |
将数据集的元素保存到序列文件中 |
saveAsObjectFile(path) (Java and Scala) |
将数据集的元素使用Java的序列化特性写到文件中 |
countByKey() | 只适用于键值对RDD,返回哈希映射(key,int),对每一个key计数 |
foreach(func) | 对数据集的每一个元素执行func。适用于带反作用的操做,如更新累加器或与外部存储系统交互 |
8)洗牌(Shuffle)操做:包括重分片操做(repartition和coalesce),ByKey操做(reduceByKey、groupByKey、sortByKey,除去countByKey),链接操做(cogroup和join)
好比reduceByKey(),须要按照某个能够去reduce时,同一个能够可能在不一样的分片或者不一样的机器上,那么每一个分片执行以后,须要从每一个分片读数据而后计算出最终的结果,这个过程就是洗牌。
9)共享变量(broadcast变量 and accumulators)
若是spark操做额函数是在远程集群节点上运行,那么函数所用到的全部变量都会分发一个副本到每台机器上,可是这些副本的修改(操做结果)并不能反馈回到执行程序(若是是原始变量的引用就能够修改原始变量)。那么多任务之间共享变量就是无效的。so,spark提供了两个限制类型的共享变量:广播变量和累加器。看具体用在什么场景:
一、广播变量:容许在每台机器上缓存只读变量,好比给每一个节点一个大型输入集的副本。显式地建立广播变量仅适用于跨多阶段须要相同数据的任务或者以非序列化的形式缓存数据。
使用SparkContext.broadcast(v)建立广播变量。
>>> broadcastVar = sc.broadcast([1, 2, 3]) <pyspark.broadcast.Broadcast object at 0x102789f10> >>> broadcastVar.value [1, 2, 3]
二、累加器,只适用于在可交换、可结合的操做中去叠加。好比计数或是加和。spark自然支持数值类型,程序员也能够自行添加新的类型。
使用SparkContext.accumulator(v)建立累加器。
>>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s >>> accum.value 10