Apache Pulsar 在腾讯 Angel PowerFL 联邦学习平台上的实践

腾讯 Angel PowerFL 联邦学习平台

联邦学习做为新一代人工智能基础技术,经过解决数据隐私与数据孤岛问题,重塑金融、医疗、城市安防等领域。git

腾讯 Angel PowerFL 联邦学习平台构建在 Angel 机器学习平台上,利用 Angel-­PS 支持万亿级模型训练的能力,将不少在 Worker 上的计算提高到 PS(参数服务器) 端;Angel PowerFL 为联邦学习算法提供了计算、加密、存储、状态同步等基本操做接口,经过流程调度模块协调参与方任务执行状态,而通讯模块完成了任务训练过程当中全部数据的传输。Angel PowerFL 联邦学习已经在腾讯金融云、腾讯广告联合建模等业务中开始落地,并取得初步的效果。github

Angel 机器学习平台:https://github.com/Angel-ML 算法

image

Angel PowerFL 对联邦通讯服务的要求

Angel PowerFL 联邦学习平台在训练任务过程中,对参与方之间的消息通讯要求极高,要求消息系统必须稳定可靠、保持高性能且能保证数据安全。Angel PowerFL 的学习任务在训练过程中,参与方之间会有大量的加密数据经过通讯模块传输,Angel PowerFL 对通讯服务有如下需求:docker

➡️ 稳定可靠apache

Angel PowerFL 的学习任务时长从几分钟到几小时,算法执行对数据的准确性要求很高,不一样算法的数据传输峰值也不同,这须要通讯模块的服务足够稳定,而且不能丢数据。安全

➡️ 高性能传输服务器

Angel PowerFL 底层经过 Spark 进行计算,Executor 并发执行会产生不少待传输的中间数据,通讯模块须要将这些加密后的数据及时传输给对方,这就要求通讯服务作到低延时、高吞吐量。网络

➡️ 数据安全架构

虽然 Angel PowerFL 全部数据都经过加密模块进行了加密,但参与联邦学习的业务可能分布在不一样公司;跨公网进行传输,须要通讯模块足够安全,不易被攻击。并发

为何选择 Pulsar

联邦通讯服务在作技术预研的时候,考虑过 RPC 直连、HDFS 同步、MQ 同步三种技术方案。考虑到对安全和性能的要求比较高,排除了 RPC 直连和 HDFS 同步方案,肯定采用 MQ 同步方案。

MQ 可选的服务不少,好比 Pulsar、Kafka、RabbitMQ、TubeMQ 等。考虑到 Angel PowerFL 对稳定性、可靠性、高性能传输和数据安全有很高的需求,咱们咨询了腾讯数据平台部 MQ 团队,他们向咱们推荐了 Pulsar。

随后,咱们对 Pulsar 开展了深刻调研,发现 Pulsar 内置的诸多特性,正好知足了咱们对消息系统的要求。Pulsar broker 和 bookie 采用了计算存储分层架构,保证了数据稳定可靠,性能良好;Pulsar 支持跨地域复制(geo­-replication),解决了 PowerFL 跨联邦同步 MQ 问题;而 Pulsar 的验证和受权模式也能保证传输安全。

云原生的计算与存储分层架构

Apache Pulsar 是下一代云原生分布式消息和事件流平台,采用了计算和存储分层的架构:在 Broker 上进行 Pub/Sub 相关的计算,在 Apache BookKeeper 上存储数据。

和传统的消息平台(如 Kafka)相比,这种架构有明显的优点:

  • Broker 和 bookie 相互独立,能够独立扩展和容错,提高系统的可用性。
  • 分区存储不受单个节点存储容量的限制,数据分布更均匀。
  • BookKeeper 存储安全可靠,保证消息不丢失,同时支持批量刷盘以得到更高吞吐量。

image

Pulsar Geo­-replication

Pulsar 原生支持跨地域复制(Geo­-replication),能够在多个数据中心的多个 Pulsar 集群中同时同步/异步复制数据。还能够在消息级别,经过 setReplicationClusters 控制消息复制到哪些集群。

image

在上图中,不管 Producer P一、P2 和 P3 在何时分别将消息发布给 Cluster A、Cluster B 和 Cluster C 中的 topic T1,这些消息均会马上复制到整个集群。一旦完成复制,Consumer C1 和 C2 便可从本身所在的集群消费这些消息。

水平扩展

因为 Pulsar 的存储设计基于分片,Pulsar 把主题分区划分为更小的块,称其为分片。每一个分片都做为 Apache BookKeeper ledger 来存储,这样构成分区的分片集合分布在 Apache BookKeeper 集群中。这样设计方便咱们管理容量和水平扩展,而且知足高吞吐量的需求。

  • 容量管理简单:主题分区的容量能够扩展至整个 BookKeeper 集群的容量,不受单个节点容量的限制。
  • 扩容简单:扩容无需从新平衡或复制数据。添加新存储节点时,新节点仅用于新分片或其副本,Pulsar 自动平衡分片分布和集群中的流量。
  • 高吞吐量:写入流量分布在存储层中,不会出现分区写入争用单个节点资源的状况。

通过深刻调研后,咱们决定在腾讯 Angel PowerFL 联邦学习平台上使用 Apache Pulsar。

基于 Apache Pulsar 的联邦通讯方案

联邦学习的各个业务(Angel PowerFL 称之为 Party,每一个 Party 有不一样的 ID,如 10000/20000),可能分布在同个公司的不一样部门(无网络隔离),也可能分布在不一样公司(跨公网),各个 Party 之间经过 Pulsar 跨地域复制功能进行同步复制,整体设计方案以下:

image

联邦学习的每一个训练任务,经过消息的 producer 和 consumer 链接所在 Party 的 Pulsar 集群,集群名以 fl-pulsar-[partyID] 进行区分,训练任务产生须要传输的中间数据后,生产者将这些数据发送给本地 Pulsar 集群。

Pulsar 集群收到数据后,经过 Pulsar proxy 创建的同步复制网络通道,将数据发送给使用方 Party。而使用方 Party 的消费者,会一直监听该训练任务对应的 topic,当有数据到达后,直接消费数据进行下一步的计算。

image

在 Angel PowerFL 执行训练任务时,driver 和每一个 partition 会建立一个 channel 类型变量,该变量和 Pulsar 当中具体的 topic 一一对应,须要交换的数据都会通过生产者发送到这个 topic。

Angel PowerFL 支持多方联邦,所以会有 2+ 个 Pulsar 集群须要同步复制数据。每一个联邦学习任务经过各自的 parties 任务参数指定了参与方,生产者在发送消息时调用 setReplicationClusters 接口,确保数据只在参与 Party 之间传输。

在 Angel PowerFL 的通讯模块中,咱们充分利用了 Pulsar 的 geo-­replication、topic 限流、Token Authentication 等功能。下面我来详细介绍如何在 Angel PowerFL 联邦学习平台中使用 Pulsar。

Geo­-replication 去掉Global ZooKeeper 依赖

在 Angel PowerFL 联邦学习平台上,部署一套完整的 Pulsar 依赖两个 ZooKeeper 集群,分别是 Local ZooKeeper 和 Global ZooKeeper。Local ZooKeeper 和 Kafka 中的 ZooKeeper 做用相似,用来存储元数据。而 Global ZooKeeper 则在 Pulsar 多个集群间中共享配置信息。

image

在 Angel PowerFL 场景中,每一个 Party 加入前,都要先部署一个 Global ZooKeeper 的子节点,或者共用一套跨公司或跨地域的公共 ZooKeeper,这样不只会增长部署的难度,也会增长被攻击的风险,不利于新 Party 加入。

Global ZooKeeper 中存储的元数据,主要是集群名/服务地址/namespace 权限等信息。Pulsar 支持建立和加入新集群。咱们经过如下两个步骤注册联邦 Pulsar 集群的信息到 local ZooKeeper,就去除了对 Global ZooKeeper 的依赖:

步骤 1: 注册新加入 Party 的 Pulsar 集群

# OTHER_CLUSTER_NAME 为待注册 Party 的 Pulsar 集群名
# OTHER_CLUSTER_BROKER_URL为 Pulsar 集群对应的 broker 地址
./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} 
 --url http://${OTHER_CLUSTER_HTTP_URL} 
 --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}

步骤 2: 授予训练用到的 namespace 访问集群权限

./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} 
 -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}

对于新加入的 Party,只用提供与其对应的 Pulsar 的集群名/服务地址便可完成注册,geo-replication 就能够经过注册信息同步复制数据。

Client 增长 Token 认证

Pulsar 做为 Angel PowerFL 的通讯模块,没有加入用户级别的权限控制。为了进一步保证 client 生产和消费数据的安全,咱们参考 Pulsar Client authentication using tokens based on JSON Web Tokens 增长了 token 认证,Angel PowerFL 的训练任务除了配置当前 Party 使用的服务地址外,还须要配置 admin token。

https://pulsar.apache.org/doc...
因为 Angel PowerFL 整套系统部署在 Kubernetes 上,咱们经过容器准备 Pulsar 集群须要的 Public/Private keys 等文件,而后注册到 K8S secret 中。

# 生成 fl-private.key 和 fl-public.key
docker run --rm -v "$(pwd)":/tmp 
 apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create-key-pair --output-private-key 
 /tmp/fl-private.key --output-public-key /tmp/fl-public.key
# 生成 admin-token.txt token 文件
echo -n `docker run --rm -v 
 "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 
 /pulsar/bin/pulsar tokens create --private-key 
 file:///tmp/fl-private.key --subject admin`
# 将认证相关的文件注册到 K8S
kubectl create secret generic token-symmetric-key 
 --from-file=TOKEN=admin-token.txt 
 --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}

开启多集群 topic 自动回收

Pulsar 集群开启了 geo-­replication 功能后,没法经过命令直接删除用过的 topic,而 Angel PowerFL 训练任务每次使用的任务是一次性的,任务结束后这些 topic 就没用了,若是不及时删除会出现大量累积。

对于经过 geo­-replication 开启复制的 topic,能够配置 brokerDeleteInactivetopicsEnabled 参数,开启 topic 自动回收。自动回收无用的 topic,需知足如下几个条件:

  • 当前 topic 没有生产者( producer)或者消费者(consumer)链接
  • 当前 topic 没有被订阅
  • 当前 topic 没有须要保留的信息

Angel PowerFL 部署的 Pulsar 集群,经过 brokerDeleteInactivetopicsEnabled 开启 topic 自动回收。在执行训练任务的过程当中,使用后对每一个 topic 按回收条件进行处理。同时,咱们增长了

brokerDeleteInactivetopicsFrequencySeconds 配置,将回收的频率设置为 3 小时。

优化 topic 限流

Angel PowerFL 中的训练任务,在不一样的数据集/算法/执行阶段,生产数据的流量峰值也不一样。目前生产环境中单个任务最大的数据量超过 200G/小时。训练过程当中,若是 Pulsar 链接中断或者生产和消费过程出现异常,须要从新开始整个训练任务。

为了规避 Pulsar 集群被单个训练任务冲垮的风险,咱们使用了 Pulsar 的限流功能。Pulsar 支持 message-rate 和 byte-rate 两种生产限流策略,前者限制每秒生产消息的数量,后者限制每秒生产消息的大小。Angel PowerFL 将数据切分红多个 4M 的消息,经过 message-­rate 限制生产消息的数量。在 Angel PowerFL 中,咱们将 namespace 的消息限制为 30 条(小于<30*4=120M/s):

./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30

刚开始测试 message-rate 的限流功能时,出现了限不住的状况(限流设置失效)。腾讯数据平台部 MQ 团队负责 Pulsar 的同事帮忙一块儿排查,发现设置 topicPublisherThrottlingTickTimeMillis 参数后,限制不能生效。

所以咱们想办法在 broker 端启用了精确的 topic 发布频率限制,优化了限流功能并贡献回社区,详情见 PR-7078: introduce precise topic publish rate limiting。
https://github.com/apache/pul...

优化 topic unloading 配置

Pulsar 根据 broker 集群负载情况,能够将 topic 动态分配到 broker上。若是拥有该 topic 的broker 宕机,或者拥有该 topic 的 broker 负载过大,则该 topic 会当即从新分配给另外一个 broker ;而从新分配的过程就是 topic 的 unloading,该操做意味着关闭 topic,释放全部者(owner)。

理论上,topic unloading 由负载均衡调整,客户端将经历极小的延迟抖动,一般耗时 10ms 左右。但 Angel PowerFL 初期在执行训练任务时,日志爆出大量由于 unloading topic 致使的链接异常。日志显示 topic unloading 在不断的重试,但都不成功:

[sub] Could not get connection to broker: topic is temporarily unavailable -- Will try again in 0.1 s

先来看 broker/namespace/bundle/topic 这四者的关系。Bundle 是 Pulsar namespace 的一个分片机制,namespace 被分片为 bundle 列表,每一个 bundle 包含 namespace 的整个哈希范围的一部分。Topic 不直接分配给 broker,而是经过计算 topic 的哈希码将 topic 分配给特定的 bundle;每一个 bundle 互相独立,再被分配到不一样的 broker 上。

Angel PowerFL 早期的任务 topic 没有复用,一个 LR 算法训练任务建立了 2000 多个 topic,每一个 topic 生产的数据负载也不一样,咱们判断上述断连问题是因为短期内(最小任务十分钟内能结束,同时会有多个任务在运行)大量建立和使用 topic,致使负载不均衡,topic unloading 频繁发生。为了下降 topic unloading 的频率,咱们调整了 Pulsar Bundle 的相关参数:

# 增长 broker 可最大分配 topic 数量
loadBalancerBrokerMaxTopics=500000
# 启用自动拆分namespace bundle
loadBalancerAutoBundleSplitEnabled=true
# 增长触发拆分 bundle 的 topic 数量
loadBalancerNamespaceBundleMaxTopics=10000
# 增长触发拆分 bundle 的消息数
loadBalancerNamespaceBundleMaxMsgRate=10000

同时,在建立 namespace 时,把 bundle 数量默认设置为 64。

./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64

通过以上调整,Angel PowerFL 在任务执行期间没有再出现过因为 topic unloading 致使的断连。

Pulsar on Kubernetes

Angel PowerFL 的全部服务均经过 Helm 部署在 Kubernetes 上。Pulsar 做为其中的一个 chart,能够很好的利用 K8S 的资源隔离、快速扩缩容等特性。在 Angel PowerFL 使用 Helm 部署 Pulsar 的实践中,咱们总结了如下经验:

🎙️ 使用 Local Persistent Volume 做为存储

Pulsar 是 IO 敏感的服务,尤为 bookie 组件,在生产环境中建议使用 SSD 或独立的磁盘。Angel PowerFL 在跑一些大数据集任务时,Pulsar 常常出现 “No Bookies Available” 的异常。这期间磁盘的 IO 使用率很高。

咱们经过 Local Persistent Volume 将 bookie 和 ZooKeeper 等其它组件挂载到单独的磁盘,减缓了磁盘 IO 竞争。咱们也测试过将 Pulsar 的 PV 存储换成 Ceph 和 NFS,性能都没有直接使用 Local Persistent Volume 好。

🎙️ 使用 NodeSelector

Geo-replication 同步复制数据期间,broker 须要访问对方的 Pulsar proxy 容器。Angel PowerFL 将网关机单独打了标签,经过 NodeSelector 将 broker 安装在可访问外网的网关机上。

🎙️ 配置 useHostNameAsBookieID

Bookie 是有状态的组件,为了 bookie pod 重建后服务正常,须要配置 useHostNameAsBookieID,确保向 ZooKeeper 注册的 ID 是 pod 的 hostname。

将来计划

Angel PowerFL 目前使用 Pulsar 快一年了,稳定运行时间最长的集群已经超过半年,将来对Pulsar 的使用计划主要有两个。

👍 升级 Pulsar 到 2.6.x 版本

咱们目前使用的是 Pulsar 2.5.2 版本,因为最近会使用 Pulsar Key_Shared 功能作 Angel-PS 的容灾恢复。2.6.0 版本恰好有加强 Key_Shared 订阅模式,因此咱们预计将来一个月升级到 Pulsar 2.6.x。
https://github.com/apache/pul...

👍 Pulsar on K8S 支持多磁盘挂载

Angel PowerFL 全部服务都运行在 Kubernetes 上(除了任务使用的 YARN 计算资源),Pulsar 做为其中的一个 chart 和其它服务一块儿部署,使用 Local Persistent Volume 做为存储。但目前 bookie 只支持挂载一块磁盘(目录),对于多磁盘的机器没有更充分的利用,咱们计划增长该特性。

总结

咱们介绍了在人工智能应用场景下,使用 Pulsar 做为 Angel PowerFL 通讯模块的相关实践。在方案实现过程中,咱们充分使用了 Pulsar 诸多内置特性,并根据自身需求作了相关优化,如 geo-­replication 去掉 Global ZooKeeper 依赖,为 client 增长 token 认证,开启多集群 topic 自动回收,优化 topic 限流功能和 topic unloading 配置等。

Pulsar 做为下一代云原生分布式消息和流平台,有众多吸引人的功能,在直播与短视频、零售与电子商务、媒体、金融等行业有普遍应用,期待 Pulsar 在不一样的应用场景下不断有新的案例落地。

致 谢

特别感谢腾讯数据平台部 MQ 团队,在 Angel PowerFL 平台使用 Pulsar 过程当中给与的技术指导。该团队在 Apache Pulsar 和 TubeMQ 上有多年的技术积累,积极为 Pulsar 社区作出了巨大贡献。Pulsar 社区十分活跃,正处于快速成长之中。咱们会持续关注并和 Apache Pulsar 社区深刻合做,把优化的功能奉献给 Pulsar 社区,和社区其余用户一块儿进一步完善、优化 Pulsar 的特性和功能,共同建设一个更强大完善的 Pulsar 社区。

做者简介

张超,腾讯数据平台部高级工程师,负责 Angel PowerFL 联邦通讯/PowerFL on K8S 等工做。他和腾讯数据平台部 MQ 团队一块儿将 Apache Pulsar 引入 PowerFL 联邦学习平台,开启了 Pulsar 在机器学习领域的应用。

相关文章
相关标签/搜索