Broadcast variables(广播变量)容许程序员将一个 read-only(只读的)变量缓存到每台机器上,而不是给任务传递一个副本。它们是如何来使用呢,例如,广播变量能够用一种高效的方式给每一个节点传递一份比较大的 input dataset(输入数据集)副本。在使用广播变量时,Spark 也尝试使用高效广播算法分发 broadcast variables(广播变量)以下降通讯成本。程序员
Spark 的 action(动做)操做是经过一系列的 stage(阶段)进行执行的,这些 stage(阶段)是经过分布式的 "shuffle" 操做进行拆分的。Spark 会自动广播出每一个 stage(阶段)内任务所须要的公共数据。这种状况下广播的数据使用序列化的形式进行缓存,并在每一个任务运行前进行反序列化。这也就意味着,只有在跨越多个 stage(阶段)的多个任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的状况下,使用广播变量会有比较好的效果。(来自官网描述)算法
一、若是广播一个rdd的值的话,能够把rdd.collect 汇总到driver端,而后经过sc广播到excutorsql
二、可是若是广播一个表(datafram),上面的那种方式就不能这样作了缓存
例如:我广播一个手机号码归属地的表,结构是(1353763,深圳市)分布式
代码以下ui
val df_operate = sqlContext.sparkContext .textFile(inpath_hdfs) .map(t => { val arr = t.split("\001") arr }).filter(t => { t.length == 7 }).map(t => { (t(1), t(6)) }).toDF("cell_seg", "cell_city") // 进行广播 val broadcast_df_standard = sc.broadcast(df_operate.as("t_base_phone_to_operate"))
sparkDS 的代码以下url
contact_DStream.foreachRDD { rdd => if (!rdd.isEmpty) { // val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) val sparkSession: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() import sparkSession.implicits._ // val logger = Logger.getLogger(Contact.getClass.getName) val es_url = broadcast_es_uri1.value val es_port = broadcast_es_port.value val es_index = broadcast_es_index.value val es_type = broadcast_es_type.value //广播变量的值 val url_config = broadcast_url_config.value val user_config = broadcast_user_config.value val passwd_config = broadcast_passwd_config.value val table_config_name = broadcast_table_config.value val index2kinesisStreamName = broadcast_index2kinesis.value val df_standard = broadcast_df_standard.value //外部数据归属地 // df_standard.createGlobalTempView("t_base_phone_to_operate") df_standard.createOrReplaceTempView("t_base_phone_to_operate") // 数据cache到内存里,防止大量的oom sparkSession.catalog.cacheTable("t_base_phone_to_operate") // cache后的数据记得要释放,否则太占内存了 sparkSession.catalog.clearCache() } } ssc.start() ssc.awaitTermination() }