Apache Spark 的设计与实现(Broadcast功能)

Broadcast

顾名思义,broadcast 就是将数据从一个节点发送到其余各个节点上去。这样的场景不少,好比 driver 上有一张表,其余节点上运行的 task 须要 lookup 这张表,那么 driver 能够先把这张表 copy 到这些节点,这样 task 就能够在本地查表了。如何实现一个可靠高效的 broadcast 机制是一个有挑战性的问题。先看看 Spark 官网上的一段话: Broadcast variables allow the programmer to keep a read-only variable cached on each machinerather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

问题:为何只能 broadcast 只读的变量?

这就涉及一致性的问题,若是变量能够被更新,那么一旦变量被某个节点更新,其余节点要不要一块更新?若是多个节点同时在更新,更新顺序是什么?怎么作同步?还会涉及 fault-tolerance 的问题。为了不维护数据一致性问题,Spark 目前只支持 broadcast 只读变量。

问题:broadcast 到节点而不是 broadcast 到每一个 task?

由于每一个 task 是一个线程,并且同在一个进程运行 tasks 都属于同一个 application。所以每一个节点(executor)上放一份就能够被全部 task 共享。

问题: 具体怎么用 broadcast?

driver program 例子:
val data = List(1, 2, 3, 4, 5, 6)
val bdata = sc.broadcast(data)

val rdd = sc.parallelize(1 to 6, 2)
val observedSizes = rdd.map(_ => bdata.value.size)
  driver 使用 sc.broadcast() 声明要 broadcast 的 data,bdata 的类型是 Broadcast。 当 rdd.transformation(func) 须要用 bdata 时,直接在 func 中调用,好比上面的例子中的 map() 就使用了 bdata.value.size。

问题:怎么实现 broadcast?

broadcast 的实现机制颇有意思:

1. 分发 task 的时候先分发 bdata 的元信息

Driver 先建一个本地文件夹用以存放须要 broadcast 的 data,并启动一个能够访问该文件夹的 HttpServer。当调用 val bdata = sc.broadcast(data)时就把 data 写入文件夹,同时写入 driver 本身的 blockManger 中(StorageLevel 为内存+磁盘),得到一个 blockId,类型为 BroadcastBlockId。当调用 rdd.transformation(func)时,若是 func 用到了 bdata,那么 driver submitTask() 的时候会将 bdata 一同 func 进行序列化获得 serialized task, 注意序列化的时候不会序列化 bdata 中包含的 data。上一章讲到 serialized task 从 driverActor 传递到 executor 时使用 Akka 的传消息机制,消息不能太大,而实际的 data 可能很大,因此这时候还不能 broadcast data。
driver 为何会同时将 data 放到磁盘和 blockManager 里面?放到磁盘是为了让 HttpServer 访问到,放到 blockManager 是为了让 driver program 自身使用 bdata 时方便(其实我以为不放到 blockManger 里面也行)。
那么何时传送真正的 data?在 executor 反序列化 task 的时候,会同时反序列化 task 中的 bdata 对象,这时候会调用 bdata 的 readObject() 方法。该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面,若是不在就使用下面的两种 fetch 方式之一去将 data fetch 过来。获得 data 后,将其存放到 blockManager 里面,这样后面运行的 task 若是须要 bdata 就不须要再去 fetch data 了。若是在,就直接拿来用了。 下面探讨 broadcast data 时候的两种实现方式:

2. HttpBroadcast

顾名思义,HttpBroadcast 就是每一个 executor 经过的 http 协议链接 driver 并从 driver 那里 fetch data。 Driver 先准备好要 broadcast 的 data,调用 sc.broadcast(data)后会调用工厂方法创建一个 HttpBroadcast 对象。该对象作的第一件事就是将 data 存到 driver 的 blockManager 里面,StorageLevel 为内存+磁盘,blockId 类型为 BroadcastBlockId。 同时 driver 也会将 broadcast 的 data 写到本地磁盘,例如写入后获得 /var/folders/87/grpn1_fn4xq5wdqmxk31v0l00000gp/T/spark-6233b09c-3c72-4a4d-832b-6c0791d0eb9c/broadcast_0, 这个文件夹做为 HttpServer 的文件目录。
Driver 和 executor 启动的时候,都会生成 broadcastManager 对象,调用 HttpBroadcast.initialize(),driver 会在本地创建一个临时目录用来存放 broadcast 的 data,并启动能够访问该目录的 httpServer。
Fetch data:在 executor 反序列化 task 的时候,会同时反序列化 task 中的 bdata 对象,这时候会调用 bdata 的 readObject() 方法。该方法先去本地 blockManager 那里询问 bdata 的 data 在不在 blockManager 里面, 若是不在就使用 http 协议链接 driver 上的 httpServer,将 data fetch 过来。获得 data 后,将其存放到 blockManager 里面,这样后面运行的 task 若是须要 bdata 就不须要再去 fetch data 了。若是在,就直接拿来用了。 HttpBroadcast 最大的问题就是 driver 所在的节点可能会出现网络拥堵,由于 worker 上的 executor 都会去 driver 那里 fetch 数据。

3. TorrentBroadcast

为了解决 HttpBroadast 中 driver 单点网络瓶颈的问题,Spark 又设计了一种 broadcast 的方法称为 TorrentBroadcast, 这个相似于你们经常使用的 BitTorrent 技术。基本思想就是将 data 分块成 data blocks,而后假设有 executor fetch 到了一些 data blocks,那么这个 executor 就能够被看成 data server 了,随着 fetch 的 executor 愈来愈多,有更多的 data server 加入,data 就很快能传播到所有的 executor 那里去了。 HttpBroadcast 是经过传统的 http 协议和 httpServer 去传 data,在 TorrentBroadcast 里面使用在上一章介绍的 blockManager.getRemote() => NIO ConnectionManager 传数据的方法来传递,读取数据的过程与读取 cached rdd 的方式相似,能够参阅 CacheAndCheckpoint 中的最后一张图。 下面讨论 TorrentBroadcast 的一些细节: TorrentBroadcast

driver 端:

Driver 先把 data 序列化到 byteArray,而后切割成 BLOCK_SIZE(由 spark.broadcast.blockSize = 4MB 设置)大小的 data block,每一个 data block 被 TorrentBlock 对象持有。切割完 byteArray 后,会将其回收,所以内存消耗虽然能够达到 2 * Size(data),但这是暂时的。 完成分块切割后,就将分块信息(称为 meta 信息)存放到 driver 本身的 blockManager 里面,StorageLevel 为内存+磁盘,同时会通知 driver 本身的 blockManagerMaster 说 meta 信息已经存放好。 通知 blockManagerMaster 这一步很重要,由于 blockManagerMaster 能够被 driver 和全部 executor 访问到,信息被存放到 blockManagerMaster 就变成了全局信息。 以后将每一个分块 data block 存放到 driver 的 blockManager 里面,StorageLevel 为内存+磁盘。存放后仍然通知 blockManagerMaster 说 blocks 已经存放好。到这一步,driver 的任务已经完成。

Executor 端:

executor 收到 serialized task 后,先反序列化 task,这时候会反序列化 serialized task 中包含的 bdata 类型是 TorrentBroadcast,也就是去调用 TorrentBroadcast.readObject()。这个方法首先获得 bdata 对象, 而后发现 bdata 里面没有包含实际的 data。怎么办?先询问所在的 executor 里的 blockManager 是会否包含 data(经过查询 data 的 broadcastId),包含就直接从本地 blockManager 读取 data。不然,就经过本地 blockManager 去链接 driver 的 blockManagerMaster 获取 data 分块的 meta 信息,获取信息后,就开始了 BT 过程。 BT 过程:task 先在本地开一个数组用于存放将要 fetch 过来的 data blocks arrayOfBlocks = new Array[TorrentBlock](totalBlocks),TorrentBlock 是对 data block 的包装。而后打乱要 fetch 的 data blocks 的顺序,好比若是 data block 共有 5 个,那么打乱后的 fetch 顺序多是 3-1-2-4-5。而后按照打乱后的顺序去 fetch 一个个 data block。fetch 的过程就是经过 “本地 blockManager -本地 connectionManager-driver/executor 的 connectionManager-driver/executor 的 blockManager-data” 获得 data,这个过程与 fetch cached rdd 相似。 每 fetch 到一个 block 就将其存放到 executor 的 blockManager 里面,同时通知 driver 上的 blockManagerMaster 说该 data block 多了一个存储地址。这一步通知很是重要,意味着 blockManagerMaster 知道 data block 如今在 cluster 中有多份,下一个不一样节点上的 task 再去 fetch 这个 data block 的时候,能够有两个选择了,并且会随机选择一个去 fetch。这个过程持续下去就是 BT 协议,随着下载的客户端愈来愈多,data block 服务器也愈来愈多,就变成 p2p下载了。关于 BT 协议,Wikipedia 上有一个 动画)。 整个 fetch 过程结束后,task 会开一个大 Array[Byte],大小为 data 的总大小,而后将 data block 都 copy 到这个 Array,而后对 Array 中 bytes 进行反序列化获得原始的 data,这个过程就是 driver 序列化 data 的反过程。 最后将 data 存放到 task 所在 executor 的 blockManager 里面,StorageLevel 为内存+磁盘。显然,这时候 data 在 blockManager 里存了两份,不过等所有 executor 都 fetch 结束,存储 data blocks 那份能够删掉了。

问题:broadcast RDD 会怎样?

@Andrew-Xia 回答道:不会怎样,就是这个rdd在每一个executor中实例化一份。

Discussion

公共数据的 broadcast 是很实用的功能,在 Hadoop 中使用 DistributedCache,好比经常使用的 -libjars就是使用 DistributedCache 来将 task 依赖的 jars 分发到每一个 task 的工做目录。不过度发前 DistributedCache 要先将文件上传到 HDFS。这种方式的主要问题是 资源浪费,若是某个节点上要运行来自同一 job 的 4 个 mapper,那么公共数据会在该节点上存在 4 份(每一个 task 的工做目录会有一份)。可是经过 HDFS 进行 broadcast 的好处在于 单点瓶颈不明显,由于公共 data 首先被分红多个 block,而后不一样的 block 存放在不一样的节点。这样,只要全部的 task 不是同时去同一个节点 fetch 同一个 block,网络拥塞不会很严重。 对于 Spark 来说,broadcast 时考虑的不只是如何将公共 data 分发下去的问题,还要考虑如何让同一节点上的 task 共享 data。 对于第一个问题,Spark 设计了两种 broadcast 的方式,传统存在单点瓶颈问题的 HttpBroadcast,和相似 BT 方式的 TorrentBroadcast。HttpBroadcast 使用传统的 client-server 形式的 HttpServer 来传递真正的 data,而 TorrentBroadcast 使用 blockManager 自带的 NIO 通讯方式来传递 data。TorrentBroadcast 存在的问题是 慢启动占内存,慢启动指的是刚开始 data 只在 driver 上有,要等 executors fetch 不少轮 data block 后,data server 才会变得可观,后面的 fetch 速度才会变快。executor 所占内存的在 fetch 完 data blocks 后进行反序列化时须要将近两倍 data size 的内存消耗。无论哪种方式,driver 在分块时会有两倍 data size 的内存消耗。 对于第二个问题,每一个 executor 都包含一个 blockManager 用来管理存放在 executor 里的数据,将公共数据存放在 blockManager 中(StorageLevel 为内存+磁盘),能够保证在 executor 执行的 tasks 可以共享 data。 其实 Spark 以前还尝试了一种称为 TreeBroadcast 的机制,详情能够见技术报告 Performance and Scalability of Broadcast in Spark。 更深刻点,broadcast 能够用多播协议来作,不过多播使用 UDP,不是可靠的,仍然须要应用层的设计一些可靠性保障机制。
相关文章
相关标签/搜索