spark采用textFile()方法来从文件系统中加载数据建立RDD,该方法把文件的URL做为参数,这个URL能够是:node
上面三条等价的前提是当前登陆linux系统的用户名必须是Hadoop用户,这样当执行括号里给的文本文件名称word.txt时,系统默认去当前登陆Ubuntu系统的用户在HDFS当中对应的用户主目录中寻找python
/user/hadoop是hdfs中的用户主目录linux
能够调用SparkContext对象的Parallelize方法,在Driver中一个已经存在的集合(数组)上建立。apache
a.经过数组建立RDD编程
b.经过列表建立RDD数组
RDD经常使用转换操做:缓存
groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集网络
reduceByKey(func)应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每一个值是将每一个key传递到函数func中进行聚合后获得的结果分布式
reduceByKey和groupByKey的区别:reduceByKey不只会进行一个分组,还会把value list再根据传入的函数再作操做函数
返回数据集中的元素个数。
把在不一样电脑上生成结果都收集回来,能够在发起程序的终端上显示出来。
返回数据集中的第一个元素
返回数据集中的前n个元素,而且以数组的形式
func是一个高阶函数
惰性机制:整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操做时,才会触发“从头至尾”的真正的计算。
在Spark中,RDD采用惰性求值的机制,每次遇到行动操做,都会从头开始执行计算。每次调用行动操做,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算常常须要屡次重复使用同一组数据
rdd.count()第一次动做类型,获得的结果先把它存起来,这样下面再去执行rdd.collect()又遇到第二次动做类型操做的时候,就不用再从头至尾去算了,只须要用刚才存起来的值就能够了。可是若是计算结果不须要重复用的话,就不要随便去用持久化,会很耗内存。
job的由来:每次遇到一个动做类型操做,都会生成一个job。
持久化的方式:
针对上面的实例,增长持久化语句之后的执行过程以下:
RDD是弹性分布式数据集,一般RDD很大,会被分红不少个分区,分别保存在不一样的节点上
图 RDD分区被保存到不一样节点上
u1,u2,...是放在不一样机器上的,右边也是放在不一样机器上的,会在网络上大量机器之间来回传输。
图:未分区时对UserData和Events两个表进行链接操做
图:采用分区之后对UserData和Events两个表进行链接操做
userData表:u1存储1到10号,u2存储11到20号,...
events表:挑出1到10号的扔给u1,11到20的扔给u2,...
RDD分区的一个原则是使得分区的个数尽可能等于集群中的CPU核心(core)数目,对于不一样的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),均可以经过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,通常而言:
分区个数=集群中CPU核心数目
若是分区个数和CPU核数不接近,好比说有4个分区,8个CPU,那意味着4个CPU的线程是浪费掉的,由于只有4个分区,顶多能够启动4个线程。
若是有16个分区,8个线程,那只能有8个分区去运做,剩下8个分区得等待。
在调用textFile()和parallelize()方法的时候手动指定分区个数便可,语法格式以下: sc.textFile(path, partitionNum) 其中,path参数用于指定要加载的文件的地址,partitionNum参数用于指定分区个数。
语法格式:
sc.textFile(path,partitionNum) // partitionNum表示指定分区个数
Spark提供了自带的HashPartitioner(哈希分区)与RangePartitioner(区域分区),可以知足大多数应用场景的需求。与此同时,Spark也支持自定义分区方式,即经过提供一个自定义的Partitioner对象来控制RDD的分区方式,从而利用领域知识进一步减小通讯开销。
要实现自定义分区,须要定义一个类,这个自定义类须要继承org.apache.spark.Partitioner类,并实现下面三个方法:
实例:根据key值的最后一位数字,写到不一样的文件,例如:
用单例对象定义入口函数,凡是写在单例对象中的都是静态的方法。
map(_._n)表示任意元组tuple对象,后面的数字n表示取第几个数.(n>=1的整数)
注意:partitioner自定义分区类只支持RDD为键值对的形式
假设有一个本地文件word.txt,里面包含了不少行文本,每行文本由多个单词构成,单词之间用空格分隔。可使用以下语句进行词频统计(即统计每一个单词出现的次数):
在实际应用中,单词文件可能很是大,会被保存到分布式文件系统HDFS中,Spark和Hadoop会统一部署在一个集群上。spark的wordnode和hdfs的datanode部署在一块儿的(让数据更靠近计算)
先在不一样的机器上分别并行执行词频统计,获得统计结果后,再到spark master节点上进行汇总,最终获得总的结果。