Spark 广播变量BroadCast

 

1、 广播变量

 

广播变量容许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每一个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减小通讯的开销。 Spark的动做经过一系列的步骤执行,这些步骤由分布式的洗牌操做分开。Spark自动地广播每一个步骤每一个任务须要的通用数据。这些广播数据被序列化地缓存,在运行任务以前被反序列化出来。这意味着当咱们须要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地建立广播变量才有用。html

 

2、为何使用广播变量

假如咱们要共享的变量map,1M
在默认的,task执行的算子中,使用了外部的变量,每一个task都会获取一份变量的副本
在什么状况下,会出现性能上的恶劣的影响呢?
1000个task。大量task的确都在并行运行。这些task里面都用到了占用1M内存的map,那么首先,map会拷贝1000份副本,经过网络传输到各个task中去,给task使用。总计有1G的数据,会经过网络传输。网络传输的开销,不容乐观啊!!!网络传输,也许就会消耗掉你的spark做业运行的总时间的一小部分。
map副本,传输到了各个task上以后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,一会儿就耗费掉1G的内存。对性能会有什么影响呢?没必要要的内存的消耗和占用,就致使了,你在进行RDD持久化到内存,也许就无法彻底在内存中放下;就只能写入磁盘,最后致使后续的操做在磁盘IO上消耗性能;
你的task在建立对象的时候,也许会发现堆内存放不下全部对象,也许就会致使频繁的垃圾回收器的回收,GC。GC的时候,必定是会致使工做线程中止,也就是致使Spark暂停工做那么一点时间。频繁GC的话,对Spark做业的运行的速度会有至关可观的影响。
 
若是说,task使用大变量(1m~100m),明知道会致使性能出现恶劣的影响。那么咱们怎么来解决呢?
广播,Broadcast,将大变量广播出去。而不是直接使用。
 
广播变量的好处,不是每一个task一份变量副本,而是变成每一个节点的executor才一份副本。这样的话,就可让变量产生的副本大大减小。
广播变量,初始的时候,就在Drvier上有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在本身本地的Executor对应的
BlockManager中,尝试获取变量副本;若是本地没有,BlockManager,也许会从远程的Driver上面去获取变量副本;也有可能从距离比较近的其余
节点的Executor的BlockManager上去获取,并保存在本地的BlockManager中;BlockManager负责管理某个Executor对应的内存和磁盘上的数据,
此后这个executor上的task,都会直接使用本地的BlockManager中的副本。

优势:
    不是每一个task一份副本,而是变成每一个节点Executor上一个副本。

 

1.举例来讲:

50个Executor 1000个task。 
一个map10M 

默认状况下,1000个task 1000个副本java

1000 * 10M = 10 000M = 10 G程序员

10G的数据,网络传输,在集群中,耗费10G的内存资源算法

若是使用 广播变量,apache

50个Executor ,50个副本,10M*50 = 500M的数据json

网络传输,并且不必定是从Drver传输到各个节点,还多是从就近的节点 
的Executor的BlockManager上获取变量副本,网络传输速度大大增长。缓存

以前 10000M 如今 500M网络

20倍网络传输性能的消耗。20倍内存消耗的减小。

3、如何使用

开始使用broadcast变量,使用完后,程序结束记得释放app

  sc = SparkContext(appName=AppNames.JOURNEY_AGGREGATOR_APP_NAME)
    broadCastForLog = None
    try:
        broadCastForLog = ELogForDistributedApp.setLogConf2BroadCast(sc)
        elogging.initLogFromDict(broadCastForLog.value)
    except StandardError:
        pass

.......
    #执行完程序逻辑,记得释放该变量

    if broadCastForLog is not None:
        broadCastForLog.unpersist(False)

#获取要被共享的大变量,这里是log配置jvm

 

class ELogForDistributedApp(object):

    LOGHDFSPATH = "/user/hdfs/test/logging/logging_hdfs.json"
    @staticmethod
    def setLogConf2BroadCast(sc):
        logFilePath = ELogForDistributedApp.LOGHDFSPATH
        if sc is not None:
            configDict = HDFSOperation.getConfigFromHDFS(logFilePath,sc)
            broadCast = sc.broadcast(configDict)
            #globals()['broadCast'] = broadCast
            #elogging.initLogFromDict(broadCast.value)
            return broadCast
            #print broadCast.value
        else:
            return None

 

    def initLogFromDict(self):
        elogging.initLogFromDict(self.eloggingConfig)

 

从hdfs中找到相应配置文件

class HDFSOperation(object):

    @staticmethod
    def getConfigFromHDFS(hdfsPath,sc):
        if sc is not None:
            filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
            hadoop_configuration = sc._jsc.hadoopConfiguration()
            fs =filesystem_class.get(hadoop_configuration)
            path_class = sc._gateway.jvm.org.apache.hadoop.fs.Path
            pathObj = path_class(hdfsPath)
            try:
                hdfsInStream = fs.open(pathObj)
                bufferedReader_class = sc._gateway.jvm.java.io.BufferedReader
                inputStreamReader_class = sc._gateway.jvm.java.io.InputStreamReader
                bufferedReader = bufferedReader_class(inputStreamReader_class(hdfsInStream))
            except IOError,msg:
                print str(msg)
                return None

        else:
            return None
        configStr = ''
        while True:
            tmpStr = bufferedReader.readLine()
            if tmpStr == None:
                break
            configStr += tmpStr
        try:
            confDict = json.loads(configStr)
        except IOError,msg:
            print str(msg)
            return None
        return confDict

 

参考文档

  1. Spark Programming Guide1.6.3
  2. How can I update a broadcast variable in spark streaming?
  3. Spark踩坑记——共享变量

相关文章
相关标签/搜索