相较MR快的缘由:其任务中间结果存在内存中,在迭代运算中尤其明显,DAG的设置。java
架构说明:node
Dirver:负责节点通信,task分发,结果回收linux
Worker:资源管理的从节点shell
Master:资源调度的主节点apache
RDD由一系列partition组成编程
函数做用在partition上vim
RDD之间存在相互依赖数组
分区器做用在KV格式的RDD上浏览器
RDD提供了一系列最佳的计算位置,数据本地化,计算向数据移动缓存
RDD自己实际不存储数据,为了便于理解暂时认为是存数据的
RDD的弹性来源于:partition的大小和数量可变
RDD的容错来源于:RDD之间的依赖关系
RDD的分布式来源于:partation分布在不一样的节点
KV格式的数据,RDD中的数据以二元组对象的形式存储
scala:经过SparkConf对象设置参数,SparkContext接收SparkConf对象,生成上下文context,context的textFile方法载入数据源,返回第一个RDD,基于算子对RDD进行处理
val conf = new SparkConf()
conf.setMaster("local").setAppName("wc")
val sc = new SparkContext(conf)
//sc可以设置checkpiont目录,日志打印级别等
sc.setlogLevel("WARN");
sc.checkpoint("hdfs://node1:9000/spark/checkpoint");
val RDD: RDD[String] = sc.textFile("data")
java:SparkConf类设置参数,JavaSparkContext接收SparkConf对象,生成上下文context,context的textFile方法载入数据源,返回第一个RDD,基于算子对RDD进行处理
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("location").setAppName("wc");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
//可以将jsc对象转为sc对象,执行sc的方法
SparkContext sc = jsc.sc();
JavaRDD<String> data = jsc.textFile("data");
建立RDD的主要方法
textFile
JavaRDD<String> data = jsc.textFile("data");
parallelize 将容器转为RDD,可以执行分区数
JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1,2,3),3);
parallelizePair
JavaPairRDD<String, Integer> rdd1 =
jsc.parallelizePairs(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("a", 2)
));
安装
tar -zxvf spark-2.3.1-bin-hadoop2.6.tgz
mv spark-2.3.1-bin-hadoop2.6.tgz spark-2.3.1 更名
配置
cd /opt/sxt/spark-2.3.1/conf 进入目录
mv slaves.template slaves 修改slaves文件
vim slaves
node1 node2 node3
mv spark-env.sh.template spark-env.sh
vim spark-env.sh
JAVA_HOME=/usr/java/jdk1.8.0_11 配置java_home路径
SPARK_MASTER_HOST=node1 master的ip
SPARK_MASTER_PORT=7077 提交任务的端口,默认是7077
SPARK_WORKER_CORES=1 每一个worker从节点可以支配的核数
SPARK_WORKER_MEMORY=2g 每一个worker从节点可以支配的内存
拷贝到其余节点
cd /opt/sxt
scp -r spark-2.3.1 node2:`pwd`
scp -r spark-2.3.1 node3:`pwd`
启动(在主节点上)
cd /opt/sxt/spark-2.3.1/sbin
./start-all.sh (关闭: ./stop-all.sh)
UI链接:node1:8080 (修改端口:start-master.sh)
修改spark配置文件spark-env.sh
添加hadoop配置路径 HADOOP_CONF_DIR=/opt/sxt/hadoop-2.6.5/etc/hadoop
修改hadoop的配置,处理兼容
vim /opt/sxt/hadoop-2.6.5/etc/hadoop/yarn-site.xml
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
启动
yarn:start-all.sh
spark:cd /opt/sxt/spark/sbin ./start-all.sh
修改spark-env.sh配置文件
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=node1:2181,node2:2181,node3:2181
-Dspark.deploy.zookeeper.dir=/sparkmaster0821"
配置到各节点 scp spark-env.sh node2://opt/sxt/spark/conf
配置备用master节点,修改该节点的 spark-env.sh配置中的SPARK_MASTER_HOST
SPARK_MASTER_HOST=node2
HA启动
主节点启动进程:./start-all.sh
备用节点启动master进程:./start-master.sh
cd /opt/sxt/spark-2.3.1/bin
#指定任务提交地址,指定提交方式,指定主类,指定jar包位置 ,其余参数
./spark-submit
--master spark://node1:7077
--deploy-mode client
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar
100
执行流程
集群启动时,worker向master汇报资源
client提交任务,客户端中启动Driver进程
Driver向Master申请启动Application的资源
Master启动worker节点的excutor进程,excutor进程反向注册到Driver上
Driver将task发送到worker的excutor进程中执行
Driver监控task,接收worker的执行结果
适用测试环境,多任务执行时Driver网卡压力过大
cd /opt/sxt/spark-2.3.1/bin
#指定任务提交地址,指定提交方式,指定主类,指定jar包位置 ,其余参数
./spark-submit
--master spark://node1:7077
--deploy-mode cluster
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar
100
#须要jar包存放值hdfs中,或者每台节点上
执行过程
集群启动时,worker向master汇报资源
客户端将任务提交到集群,向Master请求启动Driver
Master选择随机节点建立Driver
Driver启动后向Master请求application的资源
Master启动worker节点的excutor进程,excutor进程反向注册到Driver上
Driver将task发送到worker的excutor进程中执行
Driver监控task,接收worker的执行结果
适用生产环境,客户端只负责提交任务,Driver均衡分布在集群中下降单节点网卡压力
cd /opt/sxt/spark-2.3.1/bin
#指定任务提交地址,指定提交方式,指定主类,指定jar包位置 ,其余参数
./spark-submit
--master yarn–client
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar
100
执行流程
client将任务提交到RM(ResourceManager)中,在客户端内建立Driver进程
RM随机选取NM节点(NodeManager),建立AM进程(applicationMaster)
AM进程向RM请求applictaion的资源,RM响应一批container资源
AM根据资源启动NM节点中的excutor进程
excutor反向注册到Driver进程上,接收Driver发出的task并执行
Driver监控task,接收worker的执行结果
使用测试环境,客户端过多Driver进程致使网卡压力大
注:AM只执行启动任务,不负责监控
cd /opt/sxt/spark-2.3.1/bin
#指定任务提交地址,指定提交方式,指定主类,指定jar包位置 ,其余参数
./spark-submit
--master yarn–cluster
--class org.apache.spark.examples.SparkPi
../examples/jars/spark-examples_2.11-2.3.1.jar
100
执行流程
client向RM节点提交任务
RM选择随机NM节点建立AM(ApplicationMaster)
AM向RM请求application执行的资源,并接收响应的资源
AM根据响应,将请求发送到NM中启动Excutor进程
Excutor反向注册到AM所在节点,从AM接收task并执行
AM监控task,接收worker的执行结果
适用于生产环境,执行Driver功能的AM随机分布在NM中,单点网卡压力下降
中止集群任务命令:yarn application -kill applicationID
形容RDD之间的依赖关系,基于父子RDD之间的partition关联来判断
窄依赖
父RDD中的partition与子RDD中的partition为一对一或一对多
不须要shuffle
宽依赖
父RDD中的partition指向多个子RDD中的partition,呈多对一关系
须要执行shuffle
一个application包括若干并行的job,一个触发算子对应一个job,每一个job会被拆分为多组相互关联的任务组,这些任务组就是stage
stage划分流程
spark根据RDD之间依赖关系构建DAG有向无环图,并提交给DAGScheduler
DAGScheduler将DAG划分若干相互依赖的stage,划分依据是RDD之间的宽窄依赖
逆向切分,沿DAG从后向前,遇到宽依赖划分一个出stage
stage之间存在并行和串行两种关联
stage内部由一组并行的task构成,stage内部的并行度由最后一个RDD的分区数决定
task被送到executor上执行的工做单元
RDD的分区数主要在如下状况中能够指定
读取数据时指定
具有宽依赖RDD位置
stage内部计算采用管道计算模式
每一个task至关于一个管道,同一时间一次处理一条数据
task以高阶函数 f4(f3(f2(f1(x)))) 的形式处理stage内多个RDD的代码逻辑
一个stage内部的task能够具有不一样逻辑
管道中数据落地的环节
指定持久化的RDD节点
执行shuffle write 的过程当中
spark集群启动,worker节点向Master节点汇报资源,Master掌握了集群的资源。
客户端向spark提交application,Master根据app的RDD依赖关系构建DAG有向无环图。
Master建立Driver进程,Driver进程中建立DAGScheduler和TaskScheduler调度器。
TaskScheduler建立后向Master节点请求app的资源,Master根据请求在worker节点上启动Executor进程,Executor进程反向注册到Driver进程中。
DAGScheduler根据DAG的宽窄依赖划分stage,将stage封装TaskSet为交给TaskScheduler,TaskSet中封装了stage中并行的task。
TaskScheduler遍历TaskSet,将task分配给Executor执行,即发送到Executor中的线程池ThreadPool中
TaskScheduler监控task执行,Executor的ThreadPool状态会响应给TaskScheduler。
监控过程
若task执行失败,TaskScheduler从新发送task到Executor中,默认重试3次
若task重试失败,则对应stage执行失败,DAGScheduler从新发送stage到TaskScheduler中从新执行,上述重试默认4次。若重试失败,则job失败,app失败
TaskScheduler还不然重试执行缓慢(straggling)的task,TaskScheduler会发送新的task并行执行。关于执行结果采用推测执行机制,两个task以先执行完的结果为准,默认是关闭的,配置属性为spark.speculation。推测执行机制不适应ETL等数据插入的操做(数据冲恢复插入)和数据倾斜的状况
app的资源申请是粗粒度的,application申请的资源,须要等待所有task执行完毕才会释放。优势是:不须要每一个task反复请求资源,任务执行效率高;缺点:资源没法充分利用。
注:MR使用细粒度资源申请方法,task本身申请资源执行任务,每一个task执行完毕释放资源,资源充分利用,但task启动变慢。
静态内存管理
60% spark.storage.memoryFraction 存储内存分区
90%存储+序列化
80% RDD存储+广播变量
20% 解压序列化
10% 预留OOM
20% spark.shuffle.memoryFraction shuffle内存分区
80% shuffle聚合内存
20% OOM预留
剩余 task计算
统一内存管理
300M JVM
75% spark.memory.storageFraction 存储内存分区
RDD存储+广播变量
shuffle聚合
二者动态调用
剩余 执行task
注:spark1.6之后默认统一内存管理,设置spark.memory.useLegacyMode置为true,修改成静态内存管理
提交任务的参数 ./spark-submit ...
--master MASTER_URL
spark://host:port mesos://host:port yarn local (默认)
--deploy-mode DEPLOY_MODE client(默认)/cluster
--class CLASS_NAME 主类(包+类)
--name NAME 任务名
--jars JARS 依赖jar包,逗号分隔
--files FILES 相关文件
--conf PROP=VALUE 配置属性
--driver-memory Driver的内存,默认1024M
--executor-memory executor的最大内存,默认1G
适用 standalone + cluster
--driver-cores driver的核数,默认1
适用 standalone/Mesos + cluster
--supervise 失败重启Driver
适用 standalone and Mesos
--total-executor-cores executor的总核数
适用 standalone 或 YARN
--executor-cores 每一个executor的核数,默认1
经过--total-executor-cores和--executor-cores限定executor的数量
使用yarn
--driver-cores driver的核数
--queue 资源队列名,默认default
--num-executors 指定executors 数量
Spark自带的一个快速原型开发工具,支持scala语言交互式编程。
启动命令 ./spark-shell --master spark://node1:7077 (win/linux)
执行任务 sc为默认建立的上下文环境
#指定hdfs的文件进程wordcount ,shsxt是hdfs的集群名
sc.textFile("hdfs://shsxt:9000/spark/aaa.txt")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.foreach(println)
浏览器访问启动spark的主机 node1:8080 范围UI页面
能够查看当前运行状况和历史运行状况
history+UI
对应须要临时保存日志:在shell启动或命令提交时配置以下属性
--conf spark.eventLog.enabled=true 保存日志
--conf spark.eventLog.dir=hdfs://shsxt/spark/test 指定hdfs存放路径,shsxt是hdfs集群名
对于须要对全部任务都须要保存日志,须要配置spark-default.conf
配置文件中加入,须要复制到全部节点
#开启记录事件日志的功能
spark.eventLog.enabled true
#设置事件日志存储的目录,shsxt是hdfs的集群名
spark.eventLog.dir hdfs://shsxt/spark/log
#日志优化选项,压缩日志
spark.eventLog.compress true
#历史日志在hdfs的目录,shsxt是hdfs的集群名
spark.history.fs.logDirectory hdfs://shsxt/spark/log
在sbin中启动HistoryServer:./start-history-server.sh
UI地址为启动HistoryServer节点+端口: node4:18080
广播变量
spark代码,RDD之外部分由Driver端执行,RDD之内的部分executor端的各task中执行。
为了不向每一个task重复发送公用的变量,使用广播变量。
Driver将广播变量发送到executor端中,executor中的task公用一个变量。
//scala
val mybroadcase: Broadcast[String] =sc.broadcast("广播变量")
sc.textFile("/data").foreach(x=>{
print( mybroadcase.value )
})
//java
Broadcast<String> xxx = jsc.broadcast("xxx");
jsc.textFile("/data").foreach(new VoidFunction<String>(){
计数器
在广播变量的基础上,不借助count算子,只能实现execute层面的变量计数,没法实现全局的事件计数。
计数器实现:RDD外定义计数器(Driver中),在RDD内进行累加(Executor中),task完成汇总到Driver中实现计数。
注:计数结果必须在Driver端解析,计数器默认从0开始计数,每一个Executor独立计数后汇总到Driver累加。
//scala
val count = sc.longAccumulator
sc.textFile("/data").foreach(x=>{
count.add(1)
})
val num: lang.Long =count.value
//java
SparkContext sc = jsc.sc();
LongAccumulator count = sc.longAccumulator();
jsc.textFile("/data").foreach(new VoidFunction<String>(){
在宽依赖的RDD之间存在shuffle过程,将父RDD的分区的数据shuffle进入子RDD中不一样的分区。相似reducebykey算子,将相同ke进入一个分区进行处理。
Shuffle Writer:上游stage的map task保证当前分区中相同的key写入一个分区文件中
Shuffle Read:下游stage的reduce task在全部机器中获取属于本身分区的分区文件
Spark2以后使用SortShuffle,1.2以前使用HashShuffle,二者之间并用
普通机制
每一个map task处理后数据经过hash分区器写入不一样的buffer(默认32K)
每一个buffer对应一个磁盘小文件,每一个buffer或小磁盘文件对应一个reduce task
reduce task拉取对应的磁盘小文件的数据
小文件数量:map task数*reduce task数
小文件过多致使:内存建立过多对象,容易OOM;拉取过多,通信波动和故障易致使拉取失败(shuffle file cannot find) ,这种失败须要DAGScheduler重试stager,容易致使任务失败。
合并机制
一个executor进程中,全部map task公用一组buffer,减小磁盘小文件的数量
小文件数量:executor数*reduce task数
普通机制
map task 将计算结果写入本身的内存数据结构(默认5M)
shuffle设置定时器对内存数据结构的容量进行监控,若监控到大小达到阈值,向内存数据结构分配一倍的容量,直到节点剩余容量不够分配。此时启动内存数据结构溢写
内存数据结构对内部数据排序分区,写出到磁盘小文件,溢写以batch形式去写,一个batch对应1w条数据,batch做为写出缓存
maptask完毕后,磁盘小文件合并为:一个数据文件+一个索引文件
reduce task经过索引拉取对应部分的数据
生成 2*map task数的文件
bypass
取消排序,直接将数据写出都小文件中
bypass的触发条件:reduce task数小于 spark.shuffle.sort.bypassMergeThreshold的参数值(默认200),使得小批量数据不排序
MapOutputTracker管理磁盘小文件
MapOutputTrackerMaster主,driver进程中
MapOutputTrackerWorker从,Executor进程中
BlockManager 块管理者
分为主从架构
BlockManagerMaster,driver进程中,在使用广播变量和缓存数据时,BlockManagerSlave执行
BlockManagerWorker,Executor进程中,做为从节点
DiskStore 磁盘管理
MemoryStore 内存管理
ConnectionManager 负责链接其余BlockManagerWorker
BlockTransferService 负责数据传输
寻址流程
每一个maptask执行完毕将小文件地址封装到MpStatus对象,经过MapOutputTrackerWorker向Driver的MapOutputTrackerMaster汇报
全部maptask执行完毕,reducetask执行前,Excutor的MapOutputTrackerWorker向Driver端的MapOutputTrackerMaster请求磁盘小文件地址数据
Excutor的ConnectionManager链接其余节点的ConnectionManager,再借助BlockTransferService进行数据传输
BlockTransferService一次启动5个task拉取数据,一个task拉取最多48M数据
reduce的OOM优化
减小每次数据的拉取量,spark.reducer.maxSizeInFlight (64M)
增大shuffle的内存分配
提升executor总内存
优化方式
代码指定配置信息,优先级最高,硬编码不推荐
任务提交命令中 --conf 后指定参数 (推荐)
在spark-default.conf,适用全部任务,优先级最低,范围太广不推荐
全部优化属性:
spark.shuffle.file.buffer 默认32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件以前,会先写入buffer缓冲中,待缓冲写满以后,才会溢写到磁盘。
调优建议:内存资源充足能够适当增长(好比64k),从而减小shuffle write过程当中溢写次数,减小磁盘IO次数。性能提高范围1%~5%。
spark.reducer.maxSizeInFlight 默认48m
参数说明:设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次可以拉取多少数据。
调优建议:内存充足能够适当增长(好比96m),从而减小拉取数据的次数,减小网络IO,进而提高性能。性能提高范围1%~5%。
spark.shuffle.io.maxRetries 默认3
参数说明:shuffle read task从shuffle write task所在节点拉取属于本身的数据时,失败重试的最大次数。若是3次之后未成功,致使shuffle file not find错误和stage执行失败。
调优建议:对于那些包含了特别耗时的shuffle操做,增长重试最大次数(好比60次),以免因为JVM的full gc或者网络不稳定等因素致使的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数能够大幅度提高稳定性。 shuffle file not find taskScheduler不负责重试task,由DAGScheduler负责重试stage
spark.shuffle.io.retryWait 默认值:5s 参数说明:具体解释同上,该参数表明了每次重试拉取数据的等待间隔,默认是5s。 调优建议:建议加大间隔时长(好比60s),以增长shuffle操做的稳定性。
spark.shuffle.memoryFraction 默认值:0.2 参数说明:该参数表明了Executor内存中,分配给shuffle read task进行聚合操做的内存比例,默认是20%。 调优建议:在资源参数调优中讲解过这个参数。若是内存充足,并且不多使用持久化操做,建议调高这个比例,给shuffle read的聚合操做更多内存,以免因为内存不足致使聚合过程当中频繁读写磁盘。在实践中发现,合理调节该参数能够将性能提高10%左右。
spark.shuffle.manager 默认值:sort|hash 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5之后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默认选项,可是Spark 1.2以及以后的版本默认都是SortShuffleManager了。tungsten-sort与sort相似,可是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。 调优建议:因为SortShuffleManager默认会对数据进行排序,所以若是你的业务逻辑中须要该排序机制的话,则使用默认的SortShuffleManager就能够;而若是你的业务逻辑不须要对数据进行排序,那么建议参考后面的几个参数调优,经过bypass机制或优化的HashShuffleManager来避免排序操做,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,由于以前发现了一些相应的bug。
spark.shuffle.sort.bypassMergeThreshold 默认值:200 参数说明:当ShuffleManager为SortShuffleManager时,若是shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程当中不会进行排序操做,而是直接按照未经优化的HashShuffleManager的方式去写数据,可是最后会将每一个task产生的全部临时磁盘文件都合并成一个文件,并会建立单独的索引文件。 调优建议:当你使用SortShuffleManager时,若是的确不须要排序操做,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减小了排序的性能开销。可是这种方式下,依然会产生大量的磁盘文件,所以shuffle write性能有待提升。
spark.shuffle.consolidateFiles 默认值:false 参数说明:若是使用HashShuffleManager,该参数有效。若是设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的状况下,这种方法能够极大地减小磁盘IO开销,提高性能。 调优建议:若是的确不须要SortShuffleManager的排序机制,那么除了使用bypass机制,还能够尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
PV-UV
数据结构:用户ip+地址+时间+电话+会话+域名 146.1.30.98 河南 2017-10-10 1512012307080 5263761960810313758 www.mi.com View
pv:每一个网站的当日访问数
uv:每一个网站的当日独立访客,以会话为准
//配置再也不重复,提供RDD0,值为字符串格式的数据 val RDD1: RDD[(String, String)] = RDD0.map(x => { val y = x.split("\\s+") //会话+域名 (y(5), y(4)) }) //groupByKey取出相同的网站的数据 set去重,count计数 val RDD2: RDD[(String, Iterable[String])] = RDD1.groupByKey() RDD2.foreach(x => { println("网站:"+x._1) val it = x._2.iterator var count=0 val set = Set[String]() while(it.hasNext){ count+=1 set.add(it.next()) } println("PV:"+count+" UV:"+ set.size ) })
二次排序
设置数据的封装类,该实现Serializable与Comparable<当前类>
例子中比较 2个数字组成的数据
public class SecondSortKey implements Serializable, Comparable<SecondSortKey> { //序列化版本 private static final long serialVersionUID = 1L; private int first; private int second; //set,get public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } //构造器 public SecondSortKey(int first, int second) { super(); this.first = first; this.second = second; } //重写比较方法,返回+-0数字 @Override public int compareTo(SecondSortKey o1) { //对两组数据分别比较得出比价结果 if (getFirst() - o1.getFirst() == 0) { return getSecond() - o1.getSecond(); } else { return getFirst() - o1.getFirst(); } } }
数据载入RDD是,转为KV结构的RDD,key为封装类,value为实际数据
使用sortByKey算子排序,获得所需排序效果
val result: RDD[(SecondSortKey, String)] =RDD.map(x => { val a = x.split(" ") val b=new SecondSortKey(a(1).toInt, a(2).toInt) new Tuple2(b,x) }).sortByKey()
分组取topN
//获取一组数字中最大的3个值,使用一个3元素的数组,比较并写入数据 Integer[] top3 = new Integer[3]; //数据经过迭代器输出,分别填入数组的三个位置,其中处理了最早3个置入的数据和后面添加删除数据的过程 while (iterator.hasNext()) { Integer x = iterator.next(); for (int i = 0; i < top3.length; i++) { if(top3[i] == null){ top3[i] = x; break; }else if(score > top3[i]){ for (int j = 2; j > i; j--) { top3[j] = top3[j-1]; } top3[i] = x; break; } } }
若设置 --total-executor-core 参数,则使用指定的核数,若未设置则压榨集群性能,使用全部剩余核数
若没有设置 --executor-core 参数 ,则每一个worker节点默认为这个application只开启一个executor进程。若设置了,一个worker可设置多个executor。集群会进一步考虑内存因素
DAGScheduler类的getMessingParentStages()方法是切割job划分stage,其中使用了递归的方式实现
DAGScheduler将stage封装后发送到TaskScheduler中,TaskScheduler遍历stage中的task并发送到executor中执行