spark 广播变量 之广播表(dataframe)

Broadcast variables(广播变量)容许程序员将一个 read-only(只读的)变量缓存到每台机器上,而不是给任务传递一个副本。它们是如何来使用呢,例如,广播变量能够用一种高效的方式给每一个节点传递一份比较大的 input dataset(输入数据集)副本。在使用广播变量时,Spark 也尝试使用高效广播算法分发 broadcast variables(广播变量)以下降通讯成本。程序员

Spark 的 action(动做)操做是经过一系列的 stage(阶段)进行执行的,这些 stage(阶段)是经过分布式的 "shuffle" 操做进行拆分的。Spark 会自动广播出每一个 stage(阶段)内任务所须要的公共数据。这种状况下广播的数据使用序列化的形式进行缓存,并在每一个任务运行前进行反序列化。这也就意味着,只有在跨越多个 stage(阶段)的多个任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的状况下,使用广播变量会有比较好的效果。(来自官网描述)算法

一、若是广播一个rdd的值的话,能够把rdd.collect 汇总到driver端,而后经过sc广播到excutorsql

二、可是若是广播一个表(datafram),上面的那种方式就不能这样作了缓存

例如:我广播一个手机号码归属地的表,结构是(1353763,深圳市)分布式

代码以下ui

val df_operate = sqlContext.sparkContext
      .textFile(inpath_hdfs)
      .map(t => {
        val arr = t.split("\001")
        arr
      }).filter(t => {
      t.length == 7
    }).map(t => {
      (t(1), t(6))
    }).toDF("cell_seg", "cell_city")
    
//   进行广播
    val broadcast_df_standard = sc.broadcast(df_operate.as("t_base_phone_to_operate"))

 

sparkDS 的代码以下url

contact_DStream.foreachRDD { rdd =>

      if (!rdd.isEmpty) {
        //        val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)

        val sparkSession: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
        import sparkSession.implicits._

        //        val logger = Logger.getLogger(Contact.getClass.getName)
        val es_url = broadcast_es_uri1.value
        val es_port = broadcast_es_port.value
        val es_index = broadcast_es_index.value
        val es_type = broadcast_es_type.value

        //广播变量的值
        val url_config = broadcast_url_config.value
        val user_config = broadcast_user_config.value
        val passwd_config = broadcast_passwd_config.value
        val table_config_name = broadcast_table_config.value


        val index2kinesisStreamName = broadcast_index2kinesis.value


        val df_standard = broadcast_df_standard.value

        //外部数据归属地
//        df_standard.createGlobalTempView("t_base_phone_to_operate")
        df_standard.createOrReplaceTempView("t_base_phone_to_operate")
//      数据cache到内存里,防止大量的oom
        sparkSession.catalog.cacheTable("t_base_phone_to_operate")

       
//      cache后的数据记得要释放,否则太占内存了       
        sparkSession.catalog.clearCache()

      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
相关文章
相关标签/搜索