广播变量容许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每一个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减小通讯的开销。 Spark的动做经过一系列的步骤执行,这些步骤由分布式的洗牌操做分开。Spark自动地广播每一个步骤每一个任务须要的通用数据。这些广播数据被序列化地缓存,在运行任务以前被反序列化出来。这意味着当咱们须要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地建立广播变量才有用。html
不是每一个task一份副本,而是变成每一个节点Executor上一个副本。
50个Executor 1000个task。
一个map10M
默认状况下,1000个task 1000个副本java
1000 * 10M = 10 000M = 10 G程序员
10G的数据,网络传输,在集群中,耗费10G的内存资源。算法
若是使用 广播变量,apache
50个Executor ,50个副本,10M*50 = 500M的数据。json
网络传输,并且不必定是从Drver传输到各个节点,还多是从就近的节点
的Executor的BlockManager上获取变量副本,网络传输速度大大增长。缓存
以前 10000M 如今 500M。网络
20倍网络传输性能的消耗。20倍内存消耗的减小。
开始使用broadcast变量,使用完后,程序结束记得释放app
sc = SparkContext(appName=AppNames.JOURNEY_AGGREGATOR_APP_NAME) broadCastForLog = None try: broadCastForLog = ELogForDistributedApp.setLogConf2BroadCast(sc) elogging.initLogFromDict(broadCastForLog.value) except StandardError: pass ....... #执行完程序逻辑,记得释放该变量 if broadCastForLog is not None: broadCastForLog.unpersist(False)
#获取要被共享的大变量,这里是log配置jvm
class ELogForDistributedApp(object): LOGHDFSPATH = "/user/hdfs/test/logging/logging_hdfs.json" @staticmethod def setLogConf2BroadCast(sc): logFilePath = ELogForDistributedApp.LOGHDFSPATH if sc is not None: configDict = HDFSOperation.getConfigFromHDFS(logFilePath,sc) broadCast = sc.broadcast(configDict) #globals()['broadCast'] = broadCast #elogging.initLogFromDict(broadCast.value) return broadCast #print broadCast.value else: return None
def initLogFromDict(self):
elogging.initLogFromDict(self.eloggingConfig)
从hdfs中找到相应配置文件
class HDFSOperation(object): @staticmethod def getConfigFromHDFS(hdfsPath,sc): if sc is not None: filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem hadoop_configuration = sc._jsc.hadoopConfiguration() fs =filesystem_class.get(hadoop_configuration) path_class = sc._gateway.jvm.org.apache.hadoop.fs.Path pathObj = path_class(hdfsPath) try: hdfsInStream = fs.open(pathObj) bufferedReader_class = sc._gateway.jvm.java.io.BufferedReader inputStreamReader_class = sc._gateway.jvm.java.io.InputStreamReader bufferedReader = bufferedReader_class(inputStreamReader_class(hdfsInStream)) except IOError,msg: print str(msg) return None else: return None configStr = '' while True: tmpStr = bufferedReader.readLine() if tmpStr == None: break configStr += tmpStr try: confDict = json.loads(configStr) except IOError,msg: print str(msg) return None return confDict