一:图RDD性能优化
1.上图groupBy,Join会产生shuffle,shuffle能够作性能优化。
2.stage1和stage2的数据要计算完成才shuffle。函数
二:Spark Core图oop
SparkContext:应用程序通往集群的惟一通道,会构建DAGSchedler,TaskSchedler
广播变量:在每个Executor中的全局变量,对全部的Executor只发送一次,Executor中的每个task能够获取
累加器:全局累加器只能增长和读取源码分析
三:和MapReduce的区别
性能
spark全在内存作计算,Hadoop会保存在磁盘
pipeline:算子沿着数据通道计算下去 ,全在内存作计算,优化执行效率优化
四:几个重要的RDD
spa
五:源码分析scala
一:RDD 必须 * - A list of partitions 一系列partitions集合 * - A function for computing each split 为每一个分区提供一个computing的函数 * - A list of dependencies on other RDDs RDD会依赖其余RDDs, 这种特性叫作:lineage(生命线);特例:第一个RDD不依赖其余RDD 可选 * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) 两个Partitioner:range Partitioner, Hash Partitioner abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { protected def getPartitions: Array[Partition]: ===> 获取当前RDD全部的分区 def compute(split: Partition, context: TaskContext): Iterator[T] ===> 对每一个分区上的数据进行计算操做 protected def getDependencies: Seq[Dependency[_]]: ===> 获取依赖的RDD,依赖的RDD是一个集合 protected def getPreferredLocations(split: Partition): Seq[String] ===> 数据计算本地化专用 val partitioner: Option[Partitioner] ===> 获取分区器 } 二:HadoopRDD 1:分区:每一个HDFS block 2:依赖:无依赖,由于直接从hdfs读取数据 3:函数:读取每个block 4:最佳位置:Hdfs block所在的位置 5:分区:无 @DeveloperApi class HadoopRDD[K, V]( sc: SparkContext, broadcastedConf: Broadcast[SerializableConfiguration], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int) extends RDD[(K, V)](sc, Nil) with Logging { //读取数据getInputFormat数据有不一样的Format方式 protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) .asInstanceOf[InputFormat[K, V]] newInputFormat match { case c: Configurable => c.setConf(conf) case _ => } newInputFormat } 」