本文源码基于spark 2.2.0html
用户编写的Spark程序,经过一个有main方法的类执行,完成一个计算任务的处理。它是由一个Driver程序和一组运行于Spark集群上的Executor组成java
弹性分布式数据集。RDD是Spark的核心数据结构,能够经过一系列算子进行操做。当RDD遇到Action算子时,将以前的全部算子造成一个有向无环图(DAG)。再在Spark中转化为Job,提交到集群执行node
spark2.x后就使用DataFrame/DateSet了web
SparkContext是Spark的入口,负责链接Spark集群,建立RDD,累积量和广播量等。从本质上来讲,SparkContext是Spark的对外接口,负责向调用者提供Spark的各类功能。 算法
Only one SparkContext may be active per JVM. You must
stop()
the active SparkContext before creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details.
每一个JVM只有一个SparkContext,一台服务器能够启动多个JVMshell
The entry point to programming Spark with the Dataset and DataFrame API.
包含了SQLContext和HiveContextapache
运行main方法的Java虚拟机进程,负责监听spark application的executor进程发来的通讯和链接,将工程jar发送到全部的executor进程中
Driver与Master、Worker协做完成application进程的启动、DAG划分、计算任务封装、分配task到executor上、计算资源的分配等调度执行做业等
driver调度task给executor执行,因此driver最好和spark集群在一片网络内,便以通讯
driver进程一般在worker节点中,和Cluster Manager不在同一个节点上json
Cluster Manager做用对象是整个saprk集群(集群资源分配),全部应用,而Driver是做用于某一个应用(协调已经分配给application的资源),管理层面不同api
集群中的工做节点,启动并运行executor进程,运行做业代码的节点
standalone模式下:Worker进程所在节点
yarn模式下: yarn的nodemanager进程所在的节点数组
运行在worker节点上,负责执行做业的任务,并将数据保存在内存或磁盘中
每一个spark application,都有属于本身的executor进程,spark application不会共享一个executor进程
在启动参数中有
executor-cores
,executor-memory
,每一个executor都会占用cpu core和内存,又spark application间不会复用executor,则很容易致使worker资源不足
executor在整个spark application运行的生命周期内,executor能够动态增长/释放,见动态资源分配一节
executor使用多线程运行SparkContext分配过来的task,来一批task就执行一批
一个spark application可能会被分为多个job,每次调用Action时,逻辑上会生成一个Job,一个Job包含了一个或多个Stage。
每一个job都会划分为一个或多个stage(阶段),每一个stage都会有对应的一批task(即一个taskset),分配到executor上去执行
Stage包括两类:ShuffleMapStage和ResultStage,若是用户程序中调用了须要进行Shuffle计算的Operator,如groupByKey等,就会以Shuffle为边界分红ShuffleMapStage和ResultStage。
若是一次shuffle都没执行,那就只有一个stage
一组关联的,但相互之间没有Shuffle依赖关系的Task集合;Stage能够直接映射为TaskSet,一个TaskSet封装了一次须要运算的、具备相同处理逻辑的Task,这些Task能够并行计算,粗粒度的调度是以TaskSet为单位的。
一个stage对应一个taskset
driver发送到executor上执行的计算单元,每一个task负责在一个阶段(stage),处理一小片数据,计算出对应的结果
Task是在物理节点上运行的基本单位,Task包含两类:ShuffleMapTask和ResultTask,分别对应于Stage中ShuffleMapStage和ResultStage中的一个执行基本单元。
InputSplit-task-partition有一一对应关系,Spark会为每个partition运行一个task来进行处理(见本文知识点-Spark集群中的节点个数、RDD分区个数、cpu内核个数三者与并行度的关系一节)
手动设置task数量spark.default.parallelism
集群管理器,为每一个spark application在集群中调度和分配资源的组件,如Spark Standalone、YARN、Mesos等
不管是standalone/yarn,都分为两种模式,client和cluster,区别在于driver运行的位置
client模式下driver运行在提交spark做业的机器上,能够实时看到详细的日志信息,方便追踪和排查错误,用于测试
cluster模式下,spark application提交到cluster manager,cluster manager(好比master)负责在集群中某个节点上,启动driver进程,用于生产环境
一般状况下driver和worker在同一个网络中是最好的,而client极可能就是driver worker分开布置,这样网络通讯很耗时,cluster没有这样的问题
master作集群管理
Master进程和Worker进程组成的集群, 能够不须要yarn集群,不须要HDFS
standalone模式下,集群管理器(Cluster Manager)的一种,为每一个spark application在集群中调度和分配资源的组件
注意和driver的区别,即Cluster Manager和driver的区别
yarn作集群管理
ResourceManager进程和NodeManager进程组成的集群
根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler。
将Taskset提交给Worker node集群运行并返回结果。
Driver向Master申请资源;
Master让Worker给程序分配具体的Executor
Driver把划分好的Task传送给Executor,Task就是咱们的Spark程序的业务逻辑代码
job生成,stage划分和task分配都是发生在driver端?是
Spark和MapReduce最大不一样:迭代式计算
全称为Resillient Distributed Dataset,即弹性分布式数据集。
提供了容错性,能够自动历来源数据从新计算,从节点失败中恢复过来
默认是在内存中,内存不足则写入磁盘
一个RDD是分布式的,数据分布在一批节点上,每一个节点存储了RDD部分partition
RDD内存不足会自动写入磁盘,调用cache()和persist()会将RDD数据按storelevel存储
SparkContext.wholeTextFiles()
能够针对一个目录中的大量小文件,返回<filename,fileContent>
组成的个PairRDDSparkContext.sequenceFile[K,V]()
能够针对SequenceFile建立RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化类型,好比IntWritable、Text等。SparkContext.hadoopRDD()
能够针对Hadoop的自定义输入类型建立RDD。该方法接收JobConf、InputFormatClass、Key和Value的Class。SparkContext.objectFile()
方法,能够针对以前调用RDD.saveAsObjectFile()
建立的对象序列化的文件,反序列化文件中的数据,并建立一个RDD。并行化建立RDD
调用parallelize()
方法,能够指定要将集合切分红多少个partition(实际上应该是指定了InputSplit数量,InputSplit-task-partition),Spark会为每个partition运行一个task来进行处理(见本文知识点-Spark集群中的节点个数、RDD分区个数、cpu内核个数三者与并行度的关系一节)
Spark官方建议为集群中的每一个CPU建立2~4个partition,避免CPU空载
若是集群中运行了多个任务,包括spark hadoop任务,是否也是以一个cpu core负载2-4个计算任务来配置?
针对已有的RDD建立一个新的RDD
transformation具备lazy特性,只是记录了对RDD所作的操做,可是不会自发地执行。只有Action操做后,全部的transformation才会执行,能够避免产生过多中间结果
操做 | 介绍 |
---|---|
map | 将RDD中的每一个元素传入自定义函数,获取一个新的元素,而后用新的元素组成新的RDD |
filter | 对RDD中每一个元素进行判断,若是返回true则保留,返回false则剔除。 |
flatMap | 与map相似,是先映射后扁平化 |
gropuByKey | 根据key进行分组,每一个key对应一个Iterable |
reduceByKey | 对每一个key对应的value进行reduce操做。 |
sortByKey | 对每一个key对应的value进行排序操做。 |
join | 对两个包含<key,value>对的RDD进行join操做,每一个key join上的pair,都会传入自定义函数进行处理。 |
cogroup | 同join,可是每一个key对应的Iterable都会传入自定义函数进行处理。 |
map与flatMap的区别
map对rdd之中的元素逐一进行函数操做映射为另一个rdd。
flatMap对集合中每一个元素进行操做而后再扁平化。一般用来切分单词
实验:flatMap是否会将多层嵌套的元素再拍扁
实验结论:只往下一层作flatten操做,不会递归进去作flatten操做
val arr = sc.parallelize(Array(("A", 1), ("B", 2), ("C", 3)))
arr.flatMap(x => (x._1 + x._2)).foreach(print) //A1B2C3
val arr2 = sc.parallelize(Array(
Array(
("A", 1), ("B", 2), ("C", 3)),
Array(
("C", 1), ("D", 2), ("E", 3)),
Array(
("F", 1), ("G", 2), ("H", 3))))
arr2.flatMap(x => x).foreach(print) //(A,1)(B,2)(C,3)(C,1)(D,2)(E,3)(F,1)(G,2)(H,3)
val arr3 = sc.parallelize(Array(
Array(
Array(("A", 1), ("B", 2), ("C", 3))),
Array(
Array(("C", 1), ("D", 2), ("E", 3))),
Array(
Array(("F", 1), ("G", 2), ("H", 3)))))
arr3.flatMap(x => x).foreach(print) //[Lscala.Tuple2;@11074bf8 [Lscala.Tuple2;@c10a22d [Lscala.Tuple2;@40ef42cd
复制代码
map和flatMap源码
def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
//直接遍历元素,对元素应用f方法
def next() = f(self.next())
}
/** Creates a new iterator by applying a function to all values produced by this iterator * and concatenating the results. * * @return the iterator resulting from applying the given iterator-valued function * `f` to each value produced by this iterator and concatenating the results. */
def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
private var cur: Iterator[B] = empty
//这一步只是取当前元素的Iterator,没有递归往下层取
private def nextCur() { cur = f(self.next()).toIterator }
def hasNext: Boolean = {
while (!cur.hasNext) {
if (!self.hasNext) return false
nextCur()
}
true
}
//在调用next方法时,最终会调用到nextCur方法
def next(): B = (if (hasNext) cur else empty).next()
}
复制代码
join VS cogroup VS fullOuterJoin VS leftOuterJoin VS rightOuterJoin
val studentList = Array(
Tuple2(1, "leo"),
Tuple2(2, "jack"),
Tuple2(3, "tom"));
val scoreList = Array(
Tuple2(1, 100),
Tuple2(2, 90),
Tuple2(2, 90),
Tuple2(4, 60));
val students = sc.parallelize(studentList);
val scores = sc.parallelize(scoreList);
/* * (4,(CompactBuffer(),CompactBuffer(60))) * (1,(CompactBuffer(leo),CompactBuffer(100))) * (3,(CompactBuffer(tom),CompactBuffer())) * (2,(CompactBuffer(jack),CompactBuffer(90, 90))) */
val studentCogroup = students.cogroup(scores) //union key数组延长
/* * (1,(leo,100)) * (2,(jack,90)) * (2,(jack,90)) */
val studentJoin = students.join(scores) //交集
/* * (4,(None,Some(60))) * (1,(Some(leo),Some(100))) * (3,(Some(tom),None)) * (2,(Some(jack),Some(90))) * (2,(Some(jack),Some(90))) */
val studentFullOuterJoin = students.fullOuterJoin(scores) //some可为空 union
/* * (1,(leo,Some(100))) * (3,(tom,None)) * (2,(jack,Some(90))) * (2,(jack,Some(90))) */
val studentLeftOuterJoin = students.leftOuterJoin(scores) //左不为空
/* * (4,(None,60)) * (1,(Some(leo),100)) * (2,(Some(jack),90)) * (2,(Some(jack),90)) */
val studentRightOuterJoin = students.rightOuterJoin(scores) //右不为空
复制代码
对RDD进行最后的操做,如遍历,reduce,save等,启动计算操做,并向用户程序返回值或向外部存储写数据
会触发一个spark job的运行,从而触发这个action以前全部的transformation的执行 对于操做key-value对的Tuple2 RDD,如groupByKey,scala是经过隐式转换为PairRDDFunction,再提供对应groupByKey方法实现的,须要手动导入Spark的相关隐式转换,import org.apache.spark.SparkContext._
对groupByKey,saprk2.2显式使用HashPartitioner,没有看到隐式转换为PairRDDFunction Action操做必定会将结果返回给driver?是的,见下文的runJob方法
Action操做特征
Action操做在源码上必调用runJob()
方法,多是直接或间接调用
//直接调用了runJob
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. * * @param resultHandler callback to pass each result to */
//会把结果传递给handler function,handle function就是对返回结果进行处理的方法
//如上文的collect方法的handler function就是 (iter: Iterator[T]) => iter.toArray
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
复制代码
操做 | 介绍 |
---|---|
reduce | 将RDD中的全部元素进行聚合操做。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。 |
collect | 将RDD中全部元素获取到本地客户端。注意数据传输问题,spark.driver.maxResultSize 能够限制action算子返回driver的结果集最大数量 |
count | 获取RDD元素总数。 |
take(n) | 获取RDD中前n个元素。 |
saveAsTextFile | 将RDD元素保存到文件中,对每一个元素调用toString方法 |
countByKey | 对每一个key对应的值进行count计数。 |
foreach | 遍历RDD中的每一个元素。 |
//从本地文件建立
val lines = spark.sparkContext.textFile("hello.txt")
//Transformation,返回(key,value)的RDD
val linePairs = lines.map(line => (line, 1))
//Transformation,隐式装换为PairRDDFunction,提供reduceByKey等方法
//源码中是用HashPartitioner
val lineCounts = linePairs.reduceByKey(_ + _)
//Action,发送到driver端执行
lineCounts.foreach(lineCount => println(lineCount._1 + " appears " + lineCount._2 + " times."))
复制代码
map:一次处理一个partition中的一条数据
mapPartitions:一次处理一个partition中全部的数据
使用场景:
RDD的数据量不是特别大,建议采用mapPartitions算子替代map算子,能够加快处理速度,若是RDD的数据量特别大,则不建议用mapPartitions,可能会内存溢出
val studentScoresRDD = studentNamesRDD.mapPartitions { it =>
var studentScoreList = Array("a")
while (it.hasNext) {
...
}
studentScoreList.iterator
}
复制代码
mapPartitionsWithIndex:加上了partition的index
studentNamesRDD.mapPartitionsWithIndex{(index:Int,it:Iterator[String])=>
...
}
复制代码
rdd.coalesce(3)
:压缩成3个partitioncoalesce和repartition区别
repartition是coalesce的简化版
/** * 返回一个通过简化到numPartitions个分区的新RDD。这会致使一个窄依赖 * 例如:你将1000个分区转换成100个分区,这个过程不会发生shuffle,相反若是10个分区转换成100个分区将会发生shuffle。 * 然而若是你想大幅度合并分区,例如合并成一个分区,这会致使你的计算在少数几个集群节点上计算(言外之意:并行度不够) * 为了不这种状况,你能够将第二个shuffle参数传递一个true,这样会在从新分区过程当中多一步shuffle,这意味着上游的分区能够并行运行。 */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {...}
/** * 返回一个刚好有numPartitions个分区的RDD,能够增长或者减小此RDD的并行度。 * 在内部,这将使用shuffle从新分布数据,若是你减小分区数,考虑使用coalesce,这样能够避免执行shuffle */
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
复制代码
设RDD分区数从N变动为M
分区数关系 | shuffle = true | shuffle = false |
---|---|---|
N < M | N个分区有数据分布不均匀的情况,利用HashPartitioner函数将数据从新分区为M个 | coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系 |
N > M | 将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区 | |
N >> M | shuffle = true,在从新分区过程当中多一步shuffle,上游的分区能够并行运行,使coalesce以前的操做有更好的并行度 | 父子RDD是窄依赖关系,在同一个Stage中,可能形成Spark程序的并行度不够(计算在少数几个集群节点上进行),从而影响性能 |
cache()和persist()
/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/** * Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
复制代码
若是须要从内存中清除缓存,那么可使用unpersist()方法。
class StorageLevel private( private var _useDisk: Boolean, //磁盘 private var _useMemory: Boolean,//内存 private var _useOffHeap: Boolean,//内存满就存磁盘 private var _deserialized: Boolean,//序列化储存 private var _replication: Int = 1)//冗余备份,默认1,只本身储存一份
extends Externalizable {
复制代码
持久化级别 | 含义 |
---|---|
MEMORY_ONLY | 以非序列化的Java对象的方式持久化在JVM内存中。若是内存没法彻底存储RDD全部的partition,那么那些没有持久化的partition就会在下一次须要使用它的时候,从新被计算。 |
MEMORY_AND_DISK | 同上,可是当某些partition没法存储在内存中时,会持久化到磁盘中。下次须要使用这些partition时,须要从磁盘上读取。 |
MEMORY_ONLY_SER | 同MEMORY_ONLY,可是会使用Java序列化方式,将Java对象序列化后进行持久化。能够减小内存开销,可是须要进行反序列化,所以会加大CPU开销。 |
MEMORY_AND_DSK_SER | 同MEMORY_AND_DSK。可是使用序列化方式持久化Java对象。 |
DISK_ONLY | 使用非序列化Java对象的方式持久化,彻底存储到磁盘上。 |
MEMORY_ONLY_2 MEMORY_AND_DISK_2 等等 | 若是是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其余节点,从而在数据丢失时,不须要再次计算,只须要使用备份数据便可。 |
优先级排序(内存优先)
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
val factorBroadcast = sc.broadcast(3)
val sumAccumulator = new DoubleAccumulator()
//Accumulator must be registered before send to executor
sc.register(sumAccumulator)
val multipleRdd = rdd.map(num => num * factorBroadcast.value)
//不能获取值,只能在driver端获取
val accumulator = rdd.map(num2 => sumAccumulator.add(num2.toDouble))
//action:3,6,9,12,15
multipleRdd.foreach(num => println(num))
//要先执行action操做才能获取值
accumulator.collect() //15
println(sumAccumulator.value)
accumulator.count() //30,再次加15
println(sumAccumulator.value)
复制代码
standalone模式下
TaskScheduler把taskSet里每个task提交到executor上执行
窄依赖(narrow dependency):每一个parent RDD 的 partition 最多被 child RDD的一个partition使用
宽依赖(wide dependency):每一个parent RDD 的 partition 被多个 child RDD的partition使用
区别:
在spark提交脚本中设置
--master参数值为yarn-cluster / yarn-client
默认是standalone 模式
/usr/local/spark/bin/spark-submit \
--class com.feng.spark.spark1.StructuredNetworkWordCount \
--master spark://spark1:7077 \ #standalone模式
--num-executors 3 \ // #分配3个executor
--driver-memory 500m \
--executor-memory 500m \ # //每一个executor500m内存
--executor-cores 2 \ // # 每一个executor2个core
/usr/local/test_data/spark1-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
复制代码
整个应用须要3*500=1500m
内存,3*2=6
个core
--master local[8]
:进程中用8个线程来模拟集群的执行
--total-executor-cores
:指定全部executor的总cpu core数量
--supervise
:指定了spark监控driver节点,若是driver挂掉,自动重启driver
按优先级从高到低排序
SparkConf.set("spark.default.parallelism", "100")
spark-submit: --conf spark.default.parallelism=50
spark-defaults.conf: spark.default.parallelism 10
复制代码
在spark-submit脚本中,可使用--verbose,打印详细的配置属性的信息
能够先在程序中建立一个空的SparkConf对象,如
val sc = new SparkContext(new SparkConf())
复制代码
而后在spark-submit脚本中用--conf
设置属性值,如
--conf spark.eventLog.enabled=false
复制代码
--jars
:额外依赖的jar包会自动被发送到集群上去
指定关联的jar:
文件和jar都会被拷贝到每一个executor的工做目录中,这就会占用很大一片磁盘空间,所以须要在以后清理掉这些文件
在yarn上运行spark做业时,依赖文件的清理都是自动进行的
使用standalone模式,须要配置spark.worker.cleanup.appDataTtl
属性,来开启自动清理依赖文件和jar包
相关参数见conf/spark-evnsh参数一节
--packages
:绑定maven的依赖包
--repositories
:绑定额外的仓库
用于生产模式,driver运行在nodeManager,没有网卡流量激增问题,但查看log麻烦,调试不方便
yarn-client用于测试,driver运行在本地客户端,负责调度application,会与yarn集群产生超大量的网络通讯,从而致使网卡流量激增
yarn-client能够在本地看到全部log,方便调试
基本要点:
橙色:提交用户Spark程序
用户提交一个Spark程序,主要的流程以下所示:
结合4,5,应该是表示用户程序已经在master注册,但driver可能并未启动
MasterEndPoint应该会向DriverClient返回一个相似DriverStatusResponse的应答?周期性应答,当获知driver已启动,则致使5
紫色:启动Driver进程
当用户提交用户Spark程序后,须要启动Driver来处理用户程序的计算逻辑,完成计算任务,这时Master须要启动一个Driver:
当前worker节点运行driver进程
红色:注册Application
Dirver启动之后,它会建立SparkContext对象,初始化计算过程当中必需的基本组件,并向Master注册Application,流程描述以下:
SparkEnv Holds all the runtime environment objects for a running Spark instance (either master or worker), including the serializer, RpcEnv, block manager, map output tracker, etc. Currently Spark code finds the SparkEnv through a global variable, so all the threads can access the same SparkEnv
只是建立,还未启动
蓝色:启动Executor进程
executor还未真实启动,master只是发出一个启动executor的消息给worker而已. 这一步代表master才是负责启动和分配executor,driver只是提交task到executor
这里master才真正告诉driver executor已经启动
粉色:启动Task执行
以前已经建立,但未启动,以前和master的通讯都是StandaloneSchedulerBackend完成的
启动一个driver-revive-thread后台线程,周期性地发送ReviveOffers给本身,让本身检查executor状态
DriverEndpoint是CoarseGrainedSchedulerBackend内部的一个持有对象
绿色:Task运行完成
StandaloneSchedulerBackend父类CoarseGrainedSchedulerBackend内部持有DriverEndpoint(内部类),DriverEndpoint收到StatusUpdate信息后,直接调用
scheduler.statusUpdate(taskId, state, data.value)
用start-all.sh
脚本能够启动master进程和全部worker进程,快速启动整个spark standalone集群
分别启动能够经过命令行参数,为进程配置一些独特的参数
如监听端口号、web ui端口号、使用的cpu和内存
如同一台机器上不只运行了saprk程序,还运行了storm程序,就能够限制spark worker进程使用更少的资源(cpu core,memory),而非机器上全部资源
参数 | 含义 | 对象 | 使用频率 |
---|---|---|---|
-h HOST, --ip HOST | 在哪台机器上启动,默认就是本机 | master & worker | 不经常使用 |
-p PORT, --port PORT | 在机器上启动后,使用哪一个端口对外提供服务,master默认是7077,worker默认是随机的 | master & worker | 不经常使用 |
--webui-port PORT | web ui的端口,master默认是8080,worker默认是8081 | master & worker | 不经常使用 |
-c CORES, --cores CORES | 总共能让spark做业使用多少个cpu core,默认是当前机器上全部的cpu core | worker | 经常使用 |
-m MEM, --memory MEM | 总共能让spark做业使用多少内存,是100M或者1G这样的格式,默认是1g | worker | 经常使用 |
-d DIR, --work-dir DIR | 工做目录,默认是SPARK_HOME/work目录 | worker | 经常使用 |
--properties-file FILE | master和worker加载默认配置文件的地址,默认是conf/spark-defaults.conf | master & worker | 不经常使用 |
先启动master,再启动worker,由于worker启动之后,须要向master注册
关闭顺序1.worker(
./stop-slave.sh
) ;2. master(./stop-master);3. 关闭集群./stop-all.sh
start-master.sh
启动spark://HOST:PORT
,这就是master的URL地址,worker进程就会经过这个URL地址来链接到master进程,并进行注册
可使用
SparkSession.master()
设置master地址
http://MASTER_HOST:8080
来访问master集群的监控web ui,web ui上, 会显示master的URL地址使用start-slave.sh <master-spark-URL>
在当前节点上启动worker进程
http://MASTER_HOST:8080
web ui上会显示该节点的cpu和内存资源等信息
eg:./start-slave.sh spark://192.168.0.001:8080 --memory 500m
参数 | 含义 |
---|---|
sbin/start-all.sh | 根据配置,在集群中各个节点上,启动一个master进程和多个worker进程 |
sbin/stop-all.sh | 在集群中中止全部master和worker进程 |
sbin/start-master.sh | 在本地启动一个master进程 |
sbin/stop-master.sh | 关闭master进程 |
sbin/start-slaves.sh | 根据conf/slaves文件中配置的worker节点,启动全部的worker进程 |
sbin/stop-slaves.sh | 关闭全部worker进程 |
sbin/start-slave.sh | 在本地启动一个worker进程 |
配置做为worker节点的机器,如hostname/ip地址,一个机器是一行
配置后,全部的节点上,都拷贝这份文件
默认状况下,没有conf/slaves文件,只有一个空conf/slaves.template, 此时,就只是在当前主节点上启动一个master进程和一个worker进程,此时就是master进程和worker进程在一个节点上,也就是伪分布式部署
conf/slaves文件样本
spark1
spark2
spark3
复制代码
是对整个spark的集群部署,配置各个master和worker
和启动脚本--参数的效果同样
./start-slave.sh spark://192.168.0.001:8080 --memory 500m
,临时修改参数时这种脚本命令更适合
命令行参数优先级更高,会覆盖spark-evnsh参数
参数 | 含义 |
---|---|
SPARK_MASTER_IP | 指定master进程所在的机器的ip地址 |
SPARK_MASTER_PORT | 指定master监听的端口号(默认是7077) |
SPARK_MASTER_WEBUI_PORT | 指定master web ui的端口号(默认是8080) |
SPARK_MASTER_OPTS | 设置master的额外参数,使用"-Dx=y"设置各个参数 |
SPARK_LOCAL_DIRS | spark的工做目录,包括了shuffle map输出文件,以及持久化到磁盘的RDD等 |
SPARK_WORKER_PORT | worker节点的端口号,默认是随机的 |
SPARK_WORKER_WEBUI_PORT | worker节点的web ui端口号,默认是8081 |
SPARK_WORKER_CORES | worker节点上,容许spark做业使用的最大cpu数量,默认是机器上全部的cpu core |
SPARK_WORKER_MEMORY | worker节点上,容许spark做业使用的最大内存量,格式为1000m,2g等,默认最小是1g内存 |
SPARK_WORKER_INSTANCES | 当前机器上的worker进程数量,默认是1,能够设置成多个,可是这时必定要设置SPARK_WORKER_CORES,限制每一个worker的cpu数量 |
SPARK_WORKER_DIR | spark做业的工做目录,包括了做业的日志等,默认是spark_home/work |
SPARK_WORKER_OPTS | worker的额外参数,使用"-Dx=y"设置各个参数 |
SPARK_DAEMON_MEMORY | 分配给master和worker进程本身自己的内存,默认是1g |
SPARK_DAEMON_JAVA_OPTS | 设置master和worker本身的jvm参数,使用"-Dx=y"设置各个参数 |
SPARK_PUBLISC_DNS | master和worker的公共dns域名,默认是没有的 |
SPARK_MASTER_OPTS
设置master的额外参数,使用-Dx=y
设置各个参数
eg:export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=1"
参数名 | 默认值 | 含义 |
---|---|---|
spark.deploy.retainedApplications | 200 | 在spark web ui上最多显示多少个application的信息 |
spark.deploy.retainedDrivers | 200 | 在spark web ui上最多显示多少个driver的信息 |
spark.deploy.spreadOut | true | 资源调度策略,spreadOut会尽可能将application的executor进程分布在更多worker上,适合基于hdfs文件计算的状况,提高数据本地化几率;非spreadOut会尽可能将executor分配到一个worker上,适合计算密集型的做业 |
spark.deploy.defaultCores | 无限大 | 每一个spark做业最多在standalone集群中使用多少个cpu core,默认是无限大,有多少用多少 |
spark.deploy.timeout | 60 | 单位秒,一个worker多少时间没有响应以后,master认为worker挂掉了 |
SPARK_WORKER_OPTS
worker的额外参数
参数名 | 默认值 | 含义 |
---|---|---|
spark.worker.cleanup.enabled | false | 是否启动自动清理worker工做目录,默认是false |
spark.worker.cleanup.interval | 1800 | 单位秒,自动清理的时间间隔,默认是30分钟 |
spark.worker.cleanup.appDataTtl | 7 * 24 * 3600 | 默认将一个spark做业的文件在worker工做目录保留多少时间,默认是7天 |
主要用于本机测试
/usr/local/spark/bin/spark-submit \
--class cn.spark.study.core.xxx \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 2 \
/usr/local/test/xxx.jar \
复制代码
standalone模式与local区别,就是要将master设置成spark://master_ip:port
,如spark://192.168.0.103:7077
val spark = SparkSession.builder().master("spark://IP:PORT")...
spark-submit: --master spark://IP:PORT --deploy-mode client/cluster
spark-shell: --master spark://IP:PORT
:用于实验和测试/usr/local/spark/bin/spark-submit \
--class cn.spark.study.core.xxx \
--master spark://192.168.0.103:7077 \
--deploy-mode client \
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 1 \
/usr/local/test/xxx.jar \
复制代码
--master
:
spark://xxx
:standalone模式,会提交到指定的URL的Master进程上去yarn-xxx
:yarn模式,会读取hadoop配置文件,而后链接ResourceManager提交运行做业后,当即使用jps查看进程,能够看到启动了以下进程
standalone cluster模式支持监控driver进程,而且在driver挂掉的时候,自动重启该进程,主要是用于spark streaming中的HA高可用性,spark-submit脚本中,使用--supervise
标识便可
要杀掉反复挂掉的driver进程
bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
,经过http://<maser url>:8080
可查看到driver id
yarn下杀掉application
yarn application -kill applicationid
进程:
...
--deploy-mode cluster \
--num-executors 1 \
--executor-cores 1 \
...
复制代码
cluster模式下
cpu core太少,可能致使executor没法启动,一直waiting,好比只有一个worker,一个cpu core时
在 cluster 模式下,driver 是在集群中的某个 Worker中的进程中启动,而且 client进程将会在完成提交应用程序的任务以后退出,而不须要等待应用程序完成再退出
默认提交的每个spark做业都会尝试使用集群中全部可用的cpu资源,此时只能支持做业串行起来运行,因此standalone集群对于同时提交上来的多个做业,仅仅支持FIFO调度策略
spark.cores.max
参数,限制每一个做业可以使用的最大的cpu core数量,让做业不会使用全部的cpu资源,后面提交上来的做业就能够获取到资源运行,默认状况下,它将获取集群中的 all cores (核),这只有在某一时刻只容许一个应用程序运行时才有意义spark.conf.set("spark.cores.max", "num")
spark-submit: --master spark://IP:PORT --conf spark.cores.max=num
spark-env.sh
全局配置:export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=num" 默认数量
spark standalone模式默认在master机器上的8080端口提供web ui,能够经过配置spark-env.sh
文件等方式,来配置web ui的端口,地址如spark://192.168.0.103:8080
spark yarn模式下应该在YARN web ui上查看,如
http://192.168.0.103:8088/
application detail ui在做业的driver所在的机器的4040端口
System.out.println
;System.err.println
和系统级别log
做业运行完,信息消失,须要启动history server
前提:spark-env.sh
文件中,配置HADOOP_CONF_DIR
或者YARN_CONF_DIR
属性,值为hadoop的配置文件目录HADOOP_HOME/etc/hadoop
,其中包含了hadoop和yarn全部的配置文件,好比hdfs-site、yarn-site等
用途:spark读写hdfs,链接到yarn resourcemanager上
日志散落在集群中各个机器上,参数配置yarn-site.xml
属性设置 | 含义 |
---|---|
yarn.log-aggregation-enable=true |
container的日志会拷贝到hdfs上去,并从机器中删除 |
yarn.nodemanager.remote-app-log-dir |
当应用程序运行结束后,日志被转移到的HDFS目录(启用日志汇集功能时有效) |
yarn.nodemanager.remote-app-log-dir-suffix |
远程日志目录子目录名称(启用日志汇集功能时有效) |
yarn.log-aggregation.retain-seconds |
聚合后的日志在HDFS上保存多长时间,单位为s |
yarn logs -applicationId <app ID> |
查看日志,yarn web ui上能够查看到applicationId(也能够直接在hdfs上查看日志文件) |
yarn.nodemanager.log.retain-second |
当不启用日志聚合此参数生效,日志文件保存在本地的时间,单位为s |
yarn.log-aggregation.retain-check-interval-seconds |
隔多久删除过时的日志 |
YARN_APP_LOGS_DIR
目录下,如/tmp/logs
或者$HADOOP_HOME/logs/userlogs
system.out
日志,须要在yarn-site.xml
文件中设置yarn.log.aggregation-enable
值为ture(将日志拷贝到hdfs上),查看时经过yarn logs -applicationId xxx
在机器上查看/usr/local/spark/bin/spark-submit \
--class xxx \
# 自动从hadoop配置目录中的配置文件中读取cluster manager地址
--master yarn-cluster/yarn-client \
--num-executors 1 \
--driver-memory 100m \
--executor-memory 100m \
--executor-cores 1 \
--conf <key>=<value> \
# 指定不一样的hadoop队列,项目或部门之间队列隔离
--queue hadoop队列 \
/usr/local/test/xxx.jar \
${1}
复制代码
--conf
: 配置全部spark支持的配置属性,使用key=value
的格式;若是value中包含了空格,那么须要将key=value包裹的双引号中--conf "<key>=<value>"
application-jar
: 打包好的spark工程jar包,在当前机器上的全路径名
application-arguments
: 传递给主类的main方法的参数; 在shell中用${1}
占位符接收传递给shell的参数;在java中能够经过main方法的args[0]
等参数获取,提交spark应用程序时,用 ./脚本.sh 参数值
能够在提交脚本上--conf
设置属性
属性名称 | 默认值 | 含义 |
---|---|---|
spark.yarn.am.memory | 512m | client模式下,YARN Application Master使用的内存总量 |
spark.yarn.am.cores | 1 | client模式下,Application Master使用的cpu数量 |
spark.driver.cores | 1 | cluster模式下,driver使用的cpu core数量,driver与Application Master运行在一个进程中,因此也控制了Application Master的cpu数量 |
spark.yarn.am.waitTime | 100s | cluster模式下,Application Master要等待SparkContext初始化的时长; client模式下,application master等待driver来链接它的时长 |
spark.yarn.submit.file.replication | hdfs副本数 | 做业写到hdfs上的文件的副本数量,好比工程jar,依赖jar,配置文件等,最小必定是1 |
spark.yarn.preserve.staging.files | false | 若是设置为true,那么在做业运行完以后,会避免工程jar等文件被删除掉 |
spark.yarn.scheduler.heartbeat.interval-ms | 3000 | application master向resourcemanager发送心跳的间隔,单位ms |
spark.yarn.scheduler.initial-allocation.interval | 200ms | application master在有pending住的container分配需求时,当即向resourcemanager发送心跳的间隔 |
spark.yarn.max.executor.failures | executor数量*2,最小3 | 整个做业断定为失败以前,executor最大的失败次数 |
spark.yarn.historyServer.address | 无 | spark history server的地址 |
spark.yarn.dist.archives | 无 | 每一个executor都要获取并放入工做目录的archive |
spark.yarn.dist.files | 无 | 每一个executor都要放入的工做目录的文件 |
spark.executor.instances | 2 | 默认的executor数量 |
spark.yarn.executor.memoryOverhead | executor内存10% | 每一个executor的堆外内存大小,用来存放诸如常量字符串等东西 |
spark.yarn.driver.memoryOverhead | driver内存7% | 同上 |
spark.yarn.am.memoryOverhead | AM内存7% | 同上 |
spark.yarn.am.port | 随机 | application master端口 |
spark.yarn.jar | 无 | spark jar文件的位置 |
spark.yarn.access.namenodes | 无 | spark做业能访问的hdfs namenode地址 |
spark.yarn.containerLauncherMaxThreads | 25 | application master能用来启动executor container的最大线程数量 |
spark.yarn.am.extraJavaOptions | 无 | application master的jvm参数 |
spark.yarn.am.extraLibraryPath | 无 | application master的额外库路径 |
spark.yarn.maxAppAttempts | 提交spark做业最大的尝试次数 | |
spark.yarn.submit.waitAppCompletion | true | cluster模式下,client是否等到做业运行完再退出 |
standalone模式下调度器依托于master进程来作出调度决策,这可能会形成单点故障:若是master挂掉了,就无法提交新的应用程序了。
为了解决这个问题,spark提供了两种高可用性方案,分别是基于zookeeper的HA方案(推荐)以及基于文件系统的HA方案。
使用zookeeper来提供leader选举以及一些状态存储,能够在集群中启动多个master进程,让它们链接到zookeeper实例。其中一个master进程会被选举为leader,其余的master会被指定为standby模式。
若是当前的leader master进程挂掉了,其余的standby master会被选举,从而恢复旧master的状态。
在启动一个zookeeper集群以后,在多个节点上启动多个master进程,而且给它们相同的zookeeper 配置(zookeeper url和目录)。master就能够被动态加入master集群,并能够在任什么时候间被移除掉
在spark-env.sh
文件中,设置SPARK_DAEMON_JAVA_OPTS
选项:
spark.deploy.recoveryMode
:设置为ZOOKEEPER来启用standby master恢复模式(默认为NONE)spark.deploy.zookeeper.url
:zookeeper集群urlspark.deploy.zookeeper.dir
:zookeeper中用来存储恢复状态的目录(默认是/spark
)export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.0.103:2181,192.168.0.104:2181 -Dspark.deploy.zookeeper.dir=/spark"
复制代码
若是在集群中启动了多个master节点,可是没有正确配置master去使用zookeeper,master在挂掉进行恢复时是会失败的,由于无法发现其余master,而且都会认为本身是leader。这会致使集群的状态不是健康的,由于全部master都会自顾自地去调度。
为了调度新的应用程序或者向集群中添加worker节点,它们须要知道当前leader master的ip地址,这能够经过传递一个master列表来完成。能够将SparkSession 的master链接的地址指向spark://host1:port1,host2:port2
。这就会致使SparkSession尝试去注册全部的master,若是host1挂掉了,那么配置仍是正确的,由于会找到新的leader master
当一个应用程序启动的时候,或者worker须要被找到而且注册到当前的leader master的时候。一旦它成功注册了,就被保存在zookeeper中了。若是故障发生了,new leader master会去联系全部的以前注册过的应用程序和worker,而且通知它们master的改变。应用程序甚至在启动的时候都不须要知道new master的存在。
故而,new master能够在任什么时候间被建立,只要新的应用程序和worker能够找到而且注册到master便可
在其余节点启动备用master:./start-master.sh
FILESYSTEM模式:当应用程序和worker都注册到master以后,master就会将它们的信息写入指定的文件系统目录中,以便于重启时恢复注册的应用程序和worker状态;
须要手动重启
在spark-env.sh
中设置SPARK_DAEMON_JAVA_OPTS
spark.deploy.recoveryMode
:设置为FILESYSTEM来启用单点恢复(默认值为NONE)spark.deploy.recoveryDirectory
:spark存储状态信息的文件系统目录,必须是master能够访问的目录eg:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/usr/local/spark_recovery"
复制代码
stop-master.sh
脚本杀掉一个master进程是不会清理它的恢复状态,当重启一个新的master进程时,它会进入恢复模式。须要等待以前全部已经注册的worker等节点先timeout才能恢复。做业监控方式:Spark Web UI,Spark History Web UI,RESTFUL API以及Metrics
每提交Spark做业并启动SparkSession后,会启动一个对应的Spark Web UI服务。默认状况下Spark Web UI的访问地址是driver进程所在节点的4040端口,如http://<driver-node>:4040
Spark Web UI包括了如下信息:
若是多个driver在一个机器上运行,它们会自动绑定到不一样的端口上。默认从4040端口开始,若是发现已经被绑定,那么会选择404一、4042等端口,以此类推。
这些信息默认状况下在做业运行期间有效,一旦做业完毕,driver进程以及对应的web ui服务也会中止。若是要在做业完成以后,也能够看到其Spark Web UI以及详细信息,须要启用Spark的History Server。
hdfs://ip:port/dirName
hdfs dfs -mkidr /dirName
spark-defaults.conf
spark.eventLog.enabled true #启用
spark.eventLog.dir hdfs://ip:port/dirName
spark.eventLog.compress true #压缩
复制代码
spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=50 -Dspark.history.fs.logDirectory=hdfs://ip:port/dirName"
复制代码
spark.eventLog.dir
指定做业事件记录地址
spark.history.fs.logDirectory
指定从哪一个目录中去读取做业数据
两个目录地址要相同
./sbin/start-history-server.sh
在启动界面能够看到history-server的访问地址,经过访问地址打开History Web UI提供了RESTFUL API来返回关于日志的json数据
API | 含义 |
---|---|
/applications | 获取做业列表 |
/applications/[app-id]/jobs | 指定做业的job列表 |
/applications/[app-id]/jobs/[job-id] | 指定job的信息 |
/applications/[app-id]/stages | 指定做业的stage列表 |
/applications/[app-id]/stages/[stage-id] | 指定stage的全部attempt列表 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] | 指定stage attempt的信息 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary | 指定stage attempt全部task的metrics统计信息 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList | 指定stage attempt的task列表 |
/applications/[app-id]/executors | 指定做业的executor列表 |
/applications/[app-id]/storage/rdd | 指定做业的持久化rdd列表 |
/applications/[app-id]/storage/rdd/[rdd-id] | 指定持久化rdd的信息 |
/applications/[app-id]/logs | 下载指定做业的全部日志的压缩包 |
/applications/[app-id]/[attempt-id]/logs | 下载指定做业的某次attempt的全部日志的压缩包 |
eg:http://192.168.0.103:18080/api/v1/applications
默认的做业间资源分配策略为静态资源分配,在这种方式下,每一个做业都会被给予一个它能使用的 最大资源量的限额,而且能够在运行期间持有这些资源。这是spark standalone集群和YARN集群使用的默认方式。
Standalone集群
默认状况下,提交到standalone集群上的多个做业,会经过FIFO的方式来运行,每一个做业都会尝试获取全部的资源。
spark.cores.max
:限制每一个做业可以使用的cpu core最大数量
spark.deploy.defaultCores
:设置每一个做业默认cpu core使用量
spark.executor.memory
:设置每一个做业最大内存。
YARN
--num-executors
:配置做业能够在集群中分配到多少个executor
--executor-memory
和--executor-cores
能够控制每一个executor可以使用的资源。
没有一种cluster manager能够提供多个做业间的内存共享功能,须要共享内存,能够单独使用一个服务(例如:alluxio),这样就能实现多应用访问同一个RDD的数据。
当资源被分配给了一个做业,但资源有空闲,能够将资源还给cluster manager的资源池,被其余做业使用。在spark中,动态资源分配在executor粒度上被实现,启用时设置spark.dynamicAllocation.enabled
为true,在每一个节点上启动external shuffle service,并将spark.shuffle.service.enabled设为true。external shuffle service 的目的是在移除executor的时候,可以保留executor输出的shuffle文件。
spark application会在它有pending(等待执行)的task等待被调度时,申请额外的executor
task已提交但等待调度->executor数量不足
spark.dynamicAllocation.schedulerBacklogTimeout
有pending的task时,就会触发真正的executor申请spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
,若是又有pending的task了,则再次触发申请操做。一个spark做业会在它的executor出现了空闲超过必定时间后(spark.dynamicAllocation.executorIdleTimeout
),被移除掉。
这意味着没有task被pending住,executor有空闲,和申请条件互斥。
spark使用一个外部的shuffle服务来保存每一个executor的中间写状态,这个服务是一个长时间运行的进程,集群的每一个节点上都会运行一个,若是服务被启用,那么spark executor会在shuffle write和read时,将数据写入该服务,并从该服务获取数据。这意味着全部executor写的shuffle数据均可以在executor声明周期以外继续使用。
多了个中间数据存储角色,也改变了executor的读写方式
除了写shuffle文件,executor也会在内存或磁盘中持久化数据。当一个executor被移除掉时,全部缓存的数据都会消失。
shuffle服务写入的数据和executor持久化数据不是一个概念?executor移除后/挂掉后,其持久化的数据将消失,而shuffle服务保存的数据还将存在
--conf spark.dynamicAllocation.enabled=true \
复制代码
$SPARK_HOME/sbin/start-mesos-shuffle-service.sh
,并设置 spark.shuffle.service.enabled
为true--conf spark.dynamicAllocation.enabled=true \
复制代码
须要配置yarn的shuffle service(external shuffle service),用于保存executor的shuffle write文件,从而让executor能够被安全地移除.
$SPARK_HOME/lib
下的spark-<version>-yarn-shuffle.jar
加入到全部NodeManager的classpath中,即hadoop/yarn/lib
目录中yarn-site.xml
<propert>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle</value>
<!-- <value>mapreduce_shuffle</value> -->
</property>
<propert>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
复制代码
--conf spark.shuffle.service.enabled=true \
--conf spark.shuffle.service.port=7337 \
--conf spark.dynamicAllocation.enabled=true \
复制代码
job是一个spark action操做触发的计算单元,在一个spark做业内部,多个并行的job是能够同时运行的 。
默认状况下,spark的调度会使用FIFO的方式来调度多个job。每一个job都会被划分为多个stage,并且第一个job会对全部可用的资源获取优先使用权,而且让它的stage的task去运行,而后第二个job再获取资源的使用权,以此类推
在公平的资源共享策略下,spark会将多个job的task使用一种轮询的方式来分配资源和执行,因此全部的job都有一个基本公平的机会去使用集群的资源
conf.set("spark.scheduler.mode", "FAIR")
复制代码
--conf spark.scheduler.mode=FAIR
复制代码
fair scheduler也支持将job分红多个组并放入多个池中,以及为每一个池设置不一样的调度优先级。这个feature对于将重要的和不重要的job隔离运行的状况很是有用,能够为重要的job分配一个池,并给予更高的优先级; 为不重要的job分配另外一个池,并给予较低的优先级。
在代码中设置sparkContext.setLocalProperty("spark.scheduler.pool", "poolName")
,全部在这个线程中提交的job都会进入这个池中,设置是以线程为单位保存的,很容易实现用同一线程来提交同一用户的全部做业到同一个资源池中。设置为null则清空池子。
默认状况下,每一个池子都会对集群资源有相同的优先使用权,可是在每一个池内,job会使用FIFO的模式来执行。
能够经过配置文件来修改池的属性
配置文件默认地址spark/conf/fairscheduler.xml
,自定义文件conf.set("spark.scheduler.allocation.file", "/path/to/file")
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
复制代码
没有在配置文件中配置的资源池都会使用默认配置(schedulingMode : FIFO,weight : 1,minShare : 0)。
不一样之处
reduceByKey,中间多了一个MapPartitionsRDD,是本地数据聚合后的rdd,能够减小网络数据传输。
相同之处
read和聚合的过程基本和groupByKey相似。都是ShuffledRDD作shuffle read再聚合,获得最终的rdd
cogroup算子是其余算子的基础,如join,intersection操做
先按RDD分区聚合结果,(hello,[(1,1),(1,1)]):第1个(1,1)是第一个RDD 的helo聚合结果,第二个(1,1)是第2个RDD聚合结果 若第一个RDD的第一个partition没有hello,则(1),不是(,1)
filter:过滤掉两个集合中任意一个集合为空的key
笛卡尔乘积
通常用于减小partition数量
repartition算子=coalesce(true)
repartition操做在中间生成的隐式RDD中会给值计算出前缀做为key,在最后作Shuffle操做时一个partition就放特定的一些key值对应的tuple,完成重分区操做
若是partition的数量多,能起实例的资源也多,那天然并发度就多
若是partition数量少,资源不少,则task数量不足,它也不会有不少并发
若是partition的数量不少,可是资源少(如core),那么并发也不大,会算完一批再继续起下一批
Task被执行的并发度 = Executor数目 * 每一个Executor核数
复制代码
这里的core是虚拟的core而不是机器的物理CPU核,能够理解为就是Executor的一个工做线程?
每一个executor的core数目经过spark.executor.cores
参数设置。这里的cores实际上是指的工做线程。cpu info里看到的核数是物理核(或者通常机器开了超线程之后是的物理核数*2),和spark里的core不是一个概念,可是通常来讲spark做业配置的executor核数不该该超过机器的物理核数。
partition的数目
sc.textFile
,输入文件被划分为多少InputSplit就会须要多少初始Task