scala版 ,基本名词概念及 rdd的基本建立及使用apache
var conf = new SparkConf()网络
var sc: SparkContext = new SparkContext(conf)多线程
val rawRDDA = sc.parallelize(List("!! bb ## cc","%% cc bb %%","cc && ++ aa"),3) app
# sc.parallelize(,3) 将数据并行加载到三台机器上框架
var tmpRDDA1 = rawRDDA.flatMap(line=>line.split(" "))spa
var tmpRDDA2 = tmpRDDA1.filter(allWord=>{allWord.contains("aa") || allWord.contains("bb")})线程
var tmpRDDA3 = tmpRDDA2.map(word=>(word,1))scala
import org.apache.spark.HashPartitioner对象
var tmpRDDA4 = tmpRDDA.partitionBy(new HashPartitioner(2)).groupByKey()接口
#partitionBy(new HashPartitioner(2)).groupByKey 将以前的3台机器Shuffle成两台机器
var tmpResultRDDA = tmpRDDA4.map((P:(String,Iterable[Int]))=>(P._1,P._2.sum))
#对相同的key的value进行求和
Partition :某机上一个固定数据块 , 一系列相关Partition组合为一个RDD 。
如tmpRDDA2拥有3个Partition ,而 tmpResultRDDA拥有两个Partition
RDD :数据统一操做所在地, 代码中任意一个操做(如faltMap,filter,map), RDD内的全部Partition都会执行
如在rawRDDA->tmpRDDA1时 ,执行flatMap(line=>line.split(" ")),则rawRDD 的三个Partition (分别为 cslave0上的“!! bb ## cc”,
cslave1上的“-- cc bb $$”和cslave2上的“cc ^^ ++ aa”都要执行flatMap操做)
RDD 是数据并行化所在地 ,隶属于某RDD的全部Partition都要执行相同操做,当这些Partition存在于不一样机器,就会由不一样机器同时执
行,也就是并行执行
RDD并行化范式主要有Map和Shuffle
Map 范式 :只对本Partition上的数据进行操做, 操做的数据对象不跨越多个Partition,即不跨越网络 。
Shuffle范式 : 对不一样Partition上的数据进行重组,其操做的数据对象跨越多个甚至是全部Partition ,即跨越网络
场景 :多输入源
两个原始文件rawFile1 和 rawFile2,要求将rawFile1的内容均匀加载到cslave3,cslave4上,接着对rawFile1进行数据去重,
要求将rawFile2加载到cslave5,而后将rawFile1的处理结果中 去掉rawFile2中所含的条目
var conf = new SparkConf()
var sc: SparkContext = new SparkContext(conf)
var rawRDDB = sc.parallelize(List(("xx",99),("yy",88),("xx",99),("zz",99)),2)
var rawRDDC = sc.parallelize(List(("yy",88)),1)
var tmpResultRDDBC = rawRDDB.distinct.subtract(rawRDDC)
subtract()就是两个RDD相减,而这两个RDD来自不一样的输入文件
场景:复杂状况
初始化多个rdd,相互取并集或差集
多输入源,去重,装换,再合并
var conf = new SparkConf()
var sc:SparkContext = new SparkContext(conf)
var rawRDDA = sc.parallelize(List("!! bb ## cc","%% cc bb %%","cc && ++ aa"),3)
var rawRDDB = sc.paralleliz(List(("xx,99),("yy",88),("xx",99),("zz",99)),2)
var rawRDDC = sc.parallelize(List(("yy",88)),1)
import org.apache.spark.HashPartitioner
var tmpResultRDDA = rawRDDA.flatMap(line=>line.split(" ")).filter(allWord=>{allWord.contains("aa")||allWord.contains("bb")}).map(word=>(word,1)).partitionBy(new HashPartitioner(2)).groupByKey().map((P:String,Iterable[Int]))=>(P._1,P._2.sum))
var tmpResultRDDBC = rawRDDB.distinct.subtract(rawRDDC)
var resultRDDABC = tmpResultRDDA.union(tmpResultRDDBC)
resultRDDABC.saveAsTextFile("HDFS路径")
map范式做用于RDD时,不会改变先后两个RDD内Partition数量, 当partitionBy,union做用于RDD时,会改变先后两个RDD内Partition数量
RDD持久化到HDFS时,RDD对应一个文件夹,属于该RDD的每一个Partition对应一个独立文件
RDD之间的中间数据不存入本地磁盘或HDFS
RDD的多个操做能够用点‘.’链接,如 RDD1.map().filter().groupBy()
RDD能够对指定的某个Partition进行操做,而不更改其余Partition
Spark-app执行流程:
1.用户调用RDD API接口,编写rdd转换应用代码
2.使用spark提交job到Master
3.Master收到job,通知各个Worker启动Executor
4.各个Executor向Driver注册 (用户编写的代码和提交任务的客户端统一称Driver)
5.RDD Graph将用户的RDD串组织成DAG-RDD
6.DAGSchedule 以Shuffle为原则(即遇Shuffle就拆分)将DAG-RDD拆分红一系列StageDAG-RDD(StageDAG-RDD0->StageDAG-RDD1->StageDAG-rdd2->...)
7.RDD经过访问NameNode,将DataNode上的数据块装入RDD的Partition
8.TaskSchedule将StageDAG-RDD0发往隶属于本RDD的全部Partition执行,在Partition执行过程当中,Partition上的Executor优先执行本Partition.
9.TaskSchedule将StageDAG-RDD1发往隶属于本RDD(已改变)的全部Partition执行
10.重复上面8,9步的步骤,直至执行完全部Stage-DAG-RDD
资源隔离性
每一个执行的Spark-APP都有本身一系列的Executor进程(分布在不一样的机器上或内核上),这些Executor会协做完成该任务。
单个Executor会以多线程复用方式运行该Spark-APP分配来的全部Task .
一个Executor只属于一个Spark-APP,一个Spark-APP能够有多个Executor
这与MapReduce不一样。 好比某个由Map->Reduce1->Reduce2构成的ML-App,有十个Slave同时执行该任务,从某一个slave机器上来看,
MapReduce框架执行时会启动Map进程,Reduce1进程,Reduce2进程,三个进程顺序执行该任务
而Spark则使用一个Executor进程完成这四个操做。
spark-APP自己感知不到集群的存在