SparkContext、SparkConf和SparkSession的初始化

SparkContextSparkConfweb

  任何 Spark程序都是SparkContext开始的,SparkContext的初始化须要一个SparkConf对象,SparkConf包含了Spark集群配置的各类参数。
初始化后,就可使用SparkContext对象所包含的各类方法来建立和操做RDD和共享变量。
 
val conf = new SparkConf().setMaster("master").setAppName("appName")
val sc = new SparkContext(conf)
或者
val sc = new SparkContext("master","appName")
  Note:
  Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified by the user.
  也就是说一旦设置完成SparkConf,就不可被使用者修改
 
  对于单元测试,您也能够调用SparkConf(false)来跳过加载外部设置,并得到相同的配置,不管系统属性如何。
 
  我们再看看setMaster()和setAppName()源码:
 
 
  根据上面的解释,setMaster主要是链接主节点,若是参数是"local",则在本地用单线程运行spark,若是是 local[4],则在本地用4核运行,若是设置为spark://master:7077,就是做为单节点运行,而setAppName就是在web端显示应用名而已,它们说到底都调用了set()函数,让咱们看看set()是何方神圣
 
 
  logDeprecation(key)是日志输出函数,防止输入参数名无效, 看看settings,是个HashMap结构,追溯一下:
 
 
  果真,是个ConcurrentHashMap对象,ConcurrentHashMap主要做用是解决多线程并发下数据段访问效率,该类相对于hashMap而言具备同步map中的数据,对于hashTable而言,该同步数据对于并发程序提升了极高的效率,因此在使用缓存机制的时候若是对map中的值具备高并发的状况的话,那么咱们就须要使用ConcurrentHashMap,ConcurrentHashMap中主要实体类就是三个:ConcurrentHashMap(整个Hash表),Segment(桶),HashEntry(节点)
,CurrentHashMap的初始化一共有三个参数,一个initialCapacity,表示初始的容量,一个loadFactor,表示负载参数,最后一个是concurrentLevel,表明ConcurrentHashMap内部的Segment的数量,ConcurrentLevel一经指定,不可改变,这也是为何SparkConf配置好了就没法更改的缘由。
 
  ConcurrentHashMap应用了锁分段技术, HashTable容器在竞争激烈的并发环境下表现出效率低下的缘由,是由于全部访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不一样数据段的数据时,线程间就不会存在锁竞争,从而能够有效的提升并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分红一段一段的存储,而后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其余段的数据也能被其余线程访问。
 
  另外,若是ConcurrentHashMap的元素数量增长致使ConrruentHashMap须要扩容,ConcurrentHashMap是不会增长Segment的数量的,而只会增长Segment中链表数组的容量大小,这样的好处是扩容过程不须要对整个ConcurrentHashMap作rehash,而只须要对Segment里面的元素作一次rehash就能够了。
 
SparkSession: SparkSession实质上是SQLContext和HiveContext的组合(将来可能还会加上StreamingContext),因此在SQLContext和HiveContext上可用的API在SparkSession上一样是可使用的。SparkSession内部封装了sparkContext,因此计算其实是由sparkContext完成的。
 
val sparkSession = SparkSession.builder
                    .master("master")
                    .appName("appName")
                    .getOrCreate()
或者
SparkSession.builder.config(conf=SparkConf())
     
  上面代码相似于建立一个SparkContext,master设置为"xiaojukeji",而后建立了一个SQLContext封装它。若是你想建立hiveContext,可使用下面的方法来建立SparkSession,以使得它支持Hive(HiveContext):
 
val sparkSession = SparkSession.builder
                    .master("master")
                    .appName("appName")
                    .enableHiveSupport()
                    .getOrCreate()
//sparkSession 从csv读取数据:
 
val dq = sparkSession.read.option("header", "true").csv("src/main/resources/scala.csv")
 
getOrCreate():有就拿过来,没有就建立,相似于单例模式:
 
s1 = SparkSession().builder.config("k1", "v1").getORCreat()
s2 = SparkSession().builder.config("k2", "v2").getORCreat()
return s1.conf.get("k1") == s2.conf.get("k2")

True
相关文章
相关标签/搜索