Spark内核以及源码解析

一:图RDD性能优化

1.上图groupBy,Join会产生shuffle,shuffle能够作性能优化。
2.stage1和stage2的数据要计算完成才shuffle。函数

 

二:Spark Core图oop

SparkContext:应用程序通往集群的惟一通道,会构建DAGSchedler,TaskSchedler
广播变量:在每个Executor中的全局变量,对全部的Executor只发送一次,Executor中的每个task能够获取
累加器:全局累加器只能增长和读取源码分析

三:和MapReduce的区别
性能

spark全在内存作计算,Hadoop会保存在磁盘
pipeline:算子沿着数据通道计算下去 ,全在内存作计算,优化执行效率优化

 

四:几个重要的RDD
spa

 

五:源码分析scala

一:RDD
 必须 
 *  - A list of partitions  一系列partitions集合
 *  - A function for computing each split  为每一个分区提供一个computing的函数
 *  - A list of dependencies on other RDDs  RDD会依赖其余RDDs, 这种特性叫作:lineage(生命线);特例:第一个RDD不依赖其余RDD
 可选
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
 *    an HDFS file)

两个Partitioner:range Partitioner, Hash Partitioner
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

  protected def getPartitions: Array[Partition]: ===> 获取当前RDD全部的分区
  def compute(split: Partition, context: TaskContext): Iterator[T] ===> 对每一个分区上的数据进行计算操做
  protected def getDependencies: Seq[Dependency[_]]: ===> 获取依赖的RDD,依赖的RDD是一个集合
  protected def getPreferredLocations(split: Partition): Seq[String] ===> 数据计算本地化专用
  val partitioner: Option[Partitioner] ===> 获取分区器

}



二:HadoopRDD
1:分区:每一个HDFS block
2:依赖:无依赖,由于直接从hdfs读取数据
3:函数:读取每个block
4:最佳位置:Hdfs block所在的位置
5:分区:无

@DeveloperApi
class HadoopRDD[K, V](
    sc: SparkContext,
    broadcastedConf: Broadcast[SerializableConfiguration],
    initLocalJobConfFuncOpt: Option[JobConf => Unit],
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int)
  extends RDD[(K, V)](sc, Nil) with Logging {

//读取数据getInputFormat数据有不一样的Format方式
 protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
    val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
      .asInstanceOf[InputFormat[K, V]]
    newInputFormat match {
      case c: Configurable => c.setConf(conf)
      case _ =>
    }
    newInputFormat
  }

」
相关文章
相关标签/搜索