以scala为例,咱们经过IDE编写Spark应用后,将其打包成jar包,而后使用spark-submit程序进行部署java
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
复制代码
优先级从高到低依次是:node
直接在代码中经过SparkConf控制,好比指定cluster manager的master参数,能够在代码中配置python
val conf = new SparkConf().setAppName("WordCount").setMaster("local");sql
在命令中指定,好比:apache
./bin/spark-submit
--class org.apache.spark.examples.SparkPi
--master yarn
--deploy-mode cluster \ # can be client for client mode --executor-memory 20G
--num-executors 50
/path/to/examples.jar
1000缓存
在spark的安装目录下,经过spark-defaults.conf配置。性能优化
RDD是一个统一分布式数据抽象数据集。其下对应实际的数据存储介质,多是文件,也能够是hadoop。经过RDD能够进行tranformation和action操做,从而实现分布式计算。网络
一个RDD具备如下固定的数据结构数据结构
总结来讲,一个RDD的关键信息无非是,定义了数据来源,数据分布存储的状况,以及准备执行的计算逻辑。经过这些新,咱们能够构建一个图,图的两个vertex分别是RDD,edge为computation架构
private[spark] def conf = sc.conf
// =======================================================================
// Methods that should be implemented by subclasses of RDD
// =======================================================================
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]//当前RDD须要执行的计算
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]//当前RDD对应的分区
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps//当前RDD依赖的父亲数据集
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
// =======================================================================
// Methods and fields available on all RDDs
// =======================================================================
/** The SparkContext that created this RDD. */
def sparkContext: SparkContext = sc
/** A unique ID for this RDD (within its SparkContext). */
val id: Int = sc.newRddId()
/** A friendly name for this RDD */
@transient var name: String = null //当前RDD的名称
复制代码
####3.2 RDD 特色
###5、RDD Transformation 将RDD进行一系列变换,生成新的RDD的过程,叫作Transformation。全部那些能够就地计算,而不须要数据迁移的transformation叫作Narrow Transformation。
####5.1 transformation大概源码 以map操做为例
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
//将传进来的函数f进行clean,这里先不深究,只须要知道clean后的函数,跟原函数功能相同
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
//这里返回MapPartitionsRDD对象,其构造参数为当前RDD和一个将f应用于迭代器的函数定义
}
复制代码
map操做是将迭代RDD中的每一个元素,而后将其作必定加工,返回的的依然是一个元素。而flapMap接受的函数参数的入参是RDD中的每一个元素,但对该元素处理后,返回的是一个集合,而不是一个元素。flatMap源码以下:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
复制代码
总结来讲,map和flapMap的异同点以下: - map接受的函数参数签名是:(f: T => U)而flatMap接受的函数参数签名为:(f: T => TraversableOnce[U]),能够看到返回的是集合
Narrow Transformation操做有
有些计算,须要依赖其余节点数据,这种计算会致使数据移动,成为Wide Transformations。好比,基于某个key分类的操做GroupByKey,这个Key可能散落在不一样的work node上,为了进行GroupByKey计算,须要计算节点间进行数据移动,好比将某个Key对应的数据,统一移动到一个节点上。Wide Transformation操做有以下:
全部Tranformation操做,都不会真正执行,直到Action操做被调用,Action操做返回是具体值,而不是RDD。这种特性成为Lazy Computing. Action操做触发后,会将执行结果发给Driver 或者写如到外部存储。如下操做属于Action操做: First(), take(), reduce(), collect(), count()
全部action操做,最终都会调用SparkContext的runJob方法。runJob有需多重载方法,以其中一个为例
def runJob[T, U: ClassTag](
rdd: RDD[T],//须要处理的RDD数据
processPartition: Iterator[T] => U,//须要在每一个数据分区上进行的操做
resultHandler: (Int, U) => Unit)//如何将上述每一个分区处理后的结果进行处理
复制代码
能够看到runJob中体现了全部分布式计算理论架构,即MapReduce。其中processPartition定义每一个分区要须要作的map操做,这一步将减小数据量,将map操做的结果作为输入,传进reduce操做,进行汇总处理。
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())//1
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)//2
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)//3
sc.runJob(this, aggregatePartition, mergeResult)//4
jobResult//5
}
复制代码
举例:
val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
val result = inputrdd.aggregate(3)(
(acc, value) => {
println(acc+":"+value)
(acc + value._2)
},
(acc1, acc2) => (acc1 * acc2)
)
println(result)//输出4032
复制代码
解释:
上述RDD,被切分红两个分区。第一个分区数据是("maths", 21) ,另外一个是:("english", 22),("science", 31)
(acc + value._2)是每一个分区要执行的操做,迭代器带入zeroValue=3后,两个分片的计算中间值以下
3+21=24//分区1 3+22+31=56//分区2
最后将每一个分区结果带入(acc1 * acc2)函数,从aggregate源码得知,结果计算也要运用zeroValue,在这里也就是3.因而最终步执行的计算以下:
32456=4032
fold函数同aggregate相似,一样是调用SparkContext的runJob函数,只不过fold只接受一个值参数,和一个函数参数,其内部在调用runJob时,分区计算和结果计算都使用一样的函数。源码以下:
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
复制代码
举例:
val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)//1
val result = inputrdd.fold(("test",3))(
(acc, ele) => {
println(acc+":"+ele)
("result",acc._2 + ele._2)
}
)
println(result)//输出:(result,83)
复制代码
假设注释1中切分的2个分区为("maths", 21)和("english", 22),("science", 31),那么执行过程以下:
reduce一样调用了SparkContext的runJob函数,但reduce接收的参数在fold上进一步简化,少了zeroValue参数,只接收一个函数参数便可。一样该参数,在调用runJob时,即做为分区收敛的函数,记做为分区汇总计算的函数
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
复制代码
举例:
val inputrdd = sc.parallelize(List(("maths", 21),("english", 22),("science", 31)),2)
val result = inputrdd.reduce(
(acc, ele) => {
println(acc+":"+ele)
("result",acc._2 + ele._2)
}
)
println(result)//结果为:(result,74)
复制代码
collect和top方法都会将数据收集到driver本地,前者是收集所有,后者是收集指定条数。因此最好知道收集的数据集较小时使用。不然会有很大的性能问题,好比大数量的传输,以及driver本地的内存压力
前者是action操做,后者是transformation操做
###7、RDD cache优化 RDD的数据,来至于外部存储介质,好比磁盘。而每一次用该RDD,都要去磁盘加载,这有时间和性能上的损耗。可使用rdd的cahce方法,将该RDD缓存到内存,这样后续重复使用该RDD时,直接去内存拿。 cache的几个级别
按数据是否在分区间迁移,来划分stage。一个stage有多个task,他们会并发的在不一样的分区上执行相同的计算代码。好比紧邻的map和filter就会被划在同一个stage,由于他们能够并发在各分区上执行,而不须要数据移动。而reduceByKey则会单独成为一个stage,由于其涉及到数据移动
RDD 从一个RDD转化成另外一个RDD时,每一步都会记录上一个RDD关系。因而这造成一个血统谱系。具体
val wordCount1 = sc.textFile("InputText").flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)
println(wordCount1.toDebugString)
复制代码
最终输出:
(1) ShuffledRDD[4] at reduceByKey at SparkTest.scala:124 []
+-(1) MapPartitionsRDD[3] at map at SparkTest.scala:124 []
| MapPartitionsRDD[2] at flatMap at SparkTest.scala:124 []
| InputText MapPartitionsRDD[1] at textFile at SparkTest.scala:124 []
| InputText HadoopRDD[0] at textFile at SparkTest.scala:124 []
复制代码
能够看到结果以倒序的方式输出,有点像java异常时,打出的依赖栈。从最近的依赖点,一直回溯
在RDD上进一步封装的数据结构。这种数据结构可使用SparkSql去操做处理数据,这下降了对分布式数据集的使用难度。由于你只要会sql,就能够进行一些处理
###11、 如何调优 一个Spark应用最会对应多个JVM进程。分布式driver,以及该应用在每一个worknode上起的JVM进程,因为driver担任的协调者角色,实际执行是worknode上的EXECUTOR,因此对于JVM的调优,主要指对Executor的调优。这些JVM进程彼此会通讯,好比数据shuffle。因此优化Spark应用的思路主要从如下个方面入手:
经过sparkConf conf.set(“spark.serializer”, “org.apache.spark.serializer.KyroSerializer”)来配置,指定数据对象的序列化方式