本系列文章源自JerryLead的SparkInternals,本文只是在做者的原文基础上加入本身的理解,批注,和部分源码,做为学习之用
注:原文是基于Spark 1.0.2 , 而本篇笔记是基于spark 2.2.0, 对比后发现核心部分变化不大,依旧值得参考node
顾名思义,broadcast 就是将数据从一个节点发送到其余各个节点上去。这样的场景不少,好比 driver 上有一张表,其余节点上运行的 task 须要 lookup 这张表,那么 driver 能够先把这张表 copy 到这些节点,这样 task 就能够在本地查表了。如何实现一个可靠高效的 broadcast 机制是一个有挑战性的问题。先看看 Spark 官网上的一段话:git
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.github
这就涉及一致性的问题,若是变量能够被更新,那么一旦变量被某个节点更新,其余节点要不要一块更新?若是多个节点同时在更新,更新顺序是什么?怎么作同步?还会涉及 fault-tolerance 的问题。为了不维护数据一致性问题,Spark 目前只支持 broadcast 只读变量。spring
由于每一个 task 是一个线程,并且同在一个进程运行 tasks 都属于同一个 application。所以每一个节点(executor)上放一份就能够被全部 task 共享。数组
driver program 例子:缓存
val data = List(1, 2, 3, 4, 5, 6)
val bdata = sc.broadcast(data)
val rdd = sc.parallelize(1 to 6, 2)
val observedSizes = rdd.map(_ => bdata.value.size)
复制代码
driver 使用 sc.broadcast()
声明要 broadcast 的 data,bdata 的类型是 Broadcast。bash
当 rdd.transformation(func)
须要用 bdata 时,直接在 func 中调用,好比上面的例子中的 map() 就使用了 bdata.value.size。服务器
broadcast 的实现机制颇有意思:markdown
Driver 先建一个本地文件夹用以存放须要 broadcast 的 data,并启动一个能够访问该文件夹的 HttpServer。当调用val bdata = sc.broadcast(data)
时就把 data 写入文件夹,同时写入 driver 本身的 blockManger 中(StorageLevel 为内存+磁盘),得到一个 blockId,类型为 BroadcastBlockId。网络
//initialize
sparkSession.build()#env.broadcastManager.initialize()
new TorrentBroadcastFactory.initialize()
//use broadcast
sc.broadcast()
broadcastManager.newBroadcast()
//Divide the object into multiple blocks and put those blocks in the block manager.
new TorrentBroadcast[T](value_, nextBroadcastId.getAndIncrement()).writeBlocks()
//保存一份到driver上
SparkEnv.get.blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)
doPutIterator()#memoryStore.putIteratorAsValues()#diskStore.put(blockId)
//以4m分别保存block("spark.broadcast.blockSize", "4m"),并获得meta
block MetaDatas = TorrentBroadcast.blockifyObject(value, blockSize..)
foreach block MetaData :
blockManager.putBytes(BroadcastBlockId, MEMORY_AND_DISK_SER...)
doPutBytes()#memoryStore.putIteratorAsValues()#diskStore.putBytes()
//异步复制数据,sc.broadcast()应该只会在driver端保留一份数据,replication=1,后面executorfetch数据时才慢慢增长broadcast的副本数量
if level.replication > 1 :ThreadUtils.awaitReady(replicate(ByteBufferBlockData(bytes, false)...)
//复制副本规则,做为参考
blockManager.replicate()
//请求得到其余BlockManager的id
val initialPeers = getPeers(false)
blockManagerMaster.getPeers(blockManagerId).sortBy(_.hashCode)
//从driver上获取其余节点
driverEndpoint.askSync[Seq[BlockManagerId]](GetPeers(blockManagerId))
//BlockManagerMasterEndpoint中返回非driver和非当前节点的blockManagerId
blockManagerInfo.keySet.contains(blockManagerId)#blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
foreach block replicate replication-1 nodes: blockTransferService.uploadBlockSync()
//后面就是发送信息给blockManager,再保存数据通知driver
blockManager.putBytes()#reportBlockStatus(blockId, putBlockStatus)
blockManagerMasterEndpoint.updateBlockInfo() //driver端更新信息
复制代码
当调用rdd.transformation(func)
时,若是 func 用到了 bdata,那么 driver submitTask() 的时候会将 bdata 一同 func 进行序列化获得 serialized task,注意序列化的时候不会序列化 bdata 中包含的 data。
//TorrentBroadcast.scala 序列化的时候不会序列化 bdata 中包含的 data
// @transient代表不序列化_value
@transient private lazy val _value: T = readBroadcastBlock()
/** Used by the JVM when serializing this object. */
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
assertValid()
out.defaultWriteObject()
}
复制代码
上一章讲到 serialized task 从 driverEndPoint 传递到 executor 时使用 RPC 的传消息机制,消息不能太大,而实际的 data 可能很大,因此这时候还不能 broadcast data。
driver 为何会同时将 data 放到磁盘和 blockManager 里面?放到磁盘是为了让 HttpServer 访问到,放到 blockManager 是为了让 driver program 自身使用 bdata 时方便(其实我以为不放到 blockManger 里面也行)。
那么何时传送真正的 data?在 executor 反序列化 task 的时候,会同时反序列化 task 中的 bdata 对象,这时候会调用 bdata 的 readObject() 方法。该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面,若是不在就使用下面的两种 fetch 方式之一去将 data fetch 过来。获得 data 后,将其存放到 blockManager 里面,这样后面运行的 task 若是须要 bdata 就不须要再去 fetch data 了。若是在,就直接拿来用了。
//runjob()
dagScheduler.submitMissingTasks(stage: Stage, jobId: Int)
val taskIdToLocations = getPreferredLocs(stage.rdd, id)-----
getCacheLocs()//从本地或者driver获取缓存rdd位置
rdd.preferredLocations()//也会从checkpointrdd中寻找
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage => //把func也序列化了,func里面包含broadcast变量
//不会序列化 broadcast变量 中包含的 data
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
taskBinary = sc.broadcast(taskBinaryBytes)//广播task
taskScheduler.submitTasks(new TaskSet(...))
...
复制代码
//TorrentBroadcast.scala
//使用lazy方式,真正反序列化使用_value才调用方法读值
@transient private lazy val _value: T = readBroadcastBlock()
TorrentBroadcast.readBroadcastBlock()
blockManager.getLocalValues()//本地读取
memoryStore.getValues(blockId)#diskStore.getBytes(blockId)
readBlocks() //本地无则从driver/其余executor读取
foreach block :
blockManager.getRemoteBytes(BroadcastBlockId(id, "piece" + pid))
blockManager.putBytes()//保存在本地
//整个broadcast保存在本地
blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)
blocks.foreach(_.dispose()) //去重,把以前分开保存的block删除
复制代码
下面探讨 broadcast data 时候的两种实现方式:
spark 2.2 的Broadcast package中已经去除了HttpBroadcast,只留下了TorrentBroadcast
顾名思义,HttpBroadcast 就是每一个 executor 经过的 http 协议链接 driver 并从 driver 那里 fetch data。
Driver 先准备好要 broadcast 的 data,调用sc.broadcast(data)
后会调用工厂方法创建一个 HttpBroadcast 对象。该对象作的第一件事就是将 data 存到 driver 的 blockManager 里面,StorageLevel 为内存+磁盘,blockId 类型为 BroadcastBlockId。
同时 driver 也会将 broadcast 的 data 写到本地磁盘,例如写入后获得 /var/folders/87/grpn1_fn4xq5wdqmxk31v0l00000gp/T/spark-6233b09c-3c72-4a4d-832b-6c0791d0eb9c/broadcast_0
, 这个文件夹做为 HttpServer 的文件目录。
Driver 和 executor 启动的时候,都会生成 broadcastManager 对象,调用 HttpBroadcast.initialize(),driver 会在本地创建一个临时目录用来存放 broadcast 的 data,并启动能够访问该目录的 httpServer。
Fetch data:在 executor 反序列化 task 的时候,会同时反序列化 task 中的 bdata 对象,这时候会调用 bdata 的 readObject() 方法。该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面,若是不在就使用 http 协议链接 driver 上的 httpServer,将 data fetch 过来。获得 data 后,将其存放到 blockManager 里面,这样后面运行的 task 若是须要 bdata 就不须要再去 fetch data 了。若是在,就直接拿来用了。
HttpBroadcast 最大的问题就是 driver 所在的节点可能会出现网络拥堵,由于 worker 上的 executor 都会去 driver 那里 fetch 数据。
为了解决 HttpBroadast 中 driver 单点网络瓶颈的问题,Spark 又设计了一种 broadcast 的方法称为 TorrentBroadcast,这个相似于你们经常使用的 BitTorrent 技术。基本思想就是将 data 分块成 data blocks,而后假设有 executor fetch 到了一些 data blocks,那么这个 executor 就能够被看成 data server 了,随着 fetch 的 executor 愈来愈多,有更多的 data server 加入,data 就很快能传播到所有的 executor 那里去了。
HttpBroadcast 是经过传统的 http 协议和 httpServer 去传 data,在 TorrentBroadcast 里面使用在上一章介绍的 blockManager.getRemoteValues() => NIO ShuffleClient 传数据的方法来传递,读取数据的过程与读取 cached rdd 的方式相似,能够参阅 CacheAndCheckpoint 中的最后一张图。
下面讨论 TorrentBroadcast 的一些细节:
Driver 先把 data 序列化到 byteArray,而后切割成 BLOCK_SIZE(由 spark.broadcast.blockSize = 4MB
设置)大小的 data block,每一个 data block 被 TorrentBlock 对象持有。切割完 byteArray 后,会将其回收,所以内存消耗虽然能够达到 2 * Size(data),但这是暂时的。
完成分块切割后,就将分块信息(称为 meta 信息)存放到 driver 本身的 blockManager 里面,StorageLevel 为内存+磁盘,同时会通知 driver 本身的 blockManagerMaster 说 meta 信息已经存放好。通知 blockManagerMaster 这一步很重要,由于 blockManagerMaster 能够被 driver 和全部 executor 访问到,信息被存放到 blockManagerMaster 就变成了全局信息。
以后将每一个分块 data block 存放到 driver 的 blockManager 里面,StorageLevel 为内存+磁盘。存放后仍然通知 blockManagerMaster 说 blocks 已经存放好。到这一步,driver 的任务已经完成。
executor 收到 serialized task 后,先反序列化 task,这时候会反序列化 serialized task 中包含的 bdata 类型是 TorrentBroadcast,也就是去访问 TorrentBroadcast._value,调用其readBroadcastBlock()
方法。这个方法首先获得 bdata 对象,**而后发现 bdata 里面没有包含实际的 data。怎么办?**先询问本地所在的 executor 里的 blockManager 是会否包含 data(经过查询 data 的 broadcastId),包含就直接从本地 blockManager 读取 data。不然,就经过本地 blockManager 去链接 driver 的 blockManagerMaster 获取 data 分块的 meta 信息,获取信息后,就开始了 BT 过程。
**BT 过程:**task 先在本地开一个数组用于存放将要 fetch 过来的 data blocks val blocks = new Array[BlockData](numBlocks)
,而后打乱要 fetch 的 data blocks 的顺序,for (pid <- Random.shuffle(Seq.range(0, numBlocks)))
好比若是 data block 共有 5 个,那么打乱后的 fetch 顺序多是 3-1-2-4-5。而后按照打乱后的顺序去 fetch 一个个 data block。**每 fetch 到一个 block 就将其存放到 executor 的 blockManager 里面,同时通知 driver 上的 blockManagerMaster 说该 data block 多了一个存储地址。**这一步通知很是重要,意味着 blockManagerMaster 知道 data block 如今在 cluster 中有多份,下一个不一样节点上的 task 再去 fetch 这个 data block 的时候,能够有两个选择了,并且会随机选择一个去 fetch。这个过程持续下去就是 BT 协议,随着下载的客户端愈来愈多,data block 服务器也愈来愈多,就变成 p2p下载了。关于 BT 协议,Wikipedia 上有一个动画。
整个 fetch 过程结束后,task 会开一个大 Array[Byte],大小为 data 的总大小,而后将 data block 都 copy 到这个 Array,而后对 Array 中 bytes 进行反序列化获得原始的 data,这个过程就是 driver 序列化 data 的反过程。
最后将 data 存放到 task 所在 executor 的 blockManager 里面,StorageLevel 为内存+磁盘。显然,这时候 data 在 blockManager 里存了两份,不过等所有 executor 都 fetch 结束,存储 data blocks 那份能够删掉了。
@Andrew-Xia 回答道:不会怎样,就是这个rdd在每一个executor中实例化一份。
公共数据的 broadcast 是很实用的功能,在 Hadoop 中使用 DistributedCache,好比经常使用的-libjars
就是使用 DistributedCache 来将 task 依赖的 jars 分发到每一个 task 的工做目录。不过度发前 DistributedCache 要先将文件上传到 HDFS。这种方式的主要问题是资源浪费,若是某个节点上要运行来自同一 job 的 4 个 mapper,那么公共数据会在该节点上存在 4 份(每一个 task 的工做目录会有一份)。可是经过 HDFS 进行 broadcast 的好处在于单点瓶颈不明显,由于公共 data 首先被分红多个 block,而后不一样的 block 存放在不一样的节点。这样,只要全部的 task 不是同时去同一个节点 fetch 同一个 block,网络拥塞不会很严重。
对于 Spark 来说,broadcast 时考虑的不只是如何将公共 data 分发下去的问题,还要考虑如何让同一节点上的 task 共享 data。
对于第一个问题,Spark 设计了两种 broadcast 的方式,传统存在单点瓶颈问题的 HttpBroadcast,和相似 BT 方式的 TorrentBroadcast。HttpBroadcast 使用传统的 client-server 形式的 HttpServer 来传递真正的 data,而 TorrentBroadcast 使用 blockManager 自带的 NIO 通讯方式来传递 data。TorrentBroadcast 存在的问题是慢启动和占内存,慢启动指的是刚开始 data 只在 driver 上有,要等 executors fetch 不少轮 data block 后,data server 才会变得可观,后面的 fetch 速度才会变快。executor 所占内存的在 fetch 完 data blocks 后进行反序列化时须要将近两倍 data size 的内存消耗。无论哪种方式,driver 在分块时会有两倍 data size 的内存消耗。
对于第二个问题,每一个 executor 都包含一个 blockManager 用来管理存放在 executor 里的数据,将公共数据存放在 blockManager 中(StorageLevel 为内存+磁盘),能够保证在 executor 执行的 tasks 可以共享 data。
其实 Spark 以前还尝试了一种称为 TreeBroadcast 的机制,详情能够见技术报告 Performance and Scalability of Broadcast in Spark。
更深刻点,broadcast 能够用多播协议来作,不过多播使用 UDP,不是可靠的,仍然须要应用层的设计一些可靠性保障机制。