联邦学习做为新一代人工智能基础技术,经过解决数据隐私与数据孤岛问题,重塑金融、医疗、城市安防等领域。git
腾讯 Angel PowerFL 联邦学习平台构建在 Angel 机器学习平台上,利用 Angel-PS 支持万亿级模型训练的能力,将不少在 Worker 上的计算提高到 PS(参数服务器) 端;Angel PowerFL 为联邦学习算法提供了计算、加密、存储、状态同步等基本操做接口,经过流程调度模块协调参与方任务执行状态,而通讯模块完成了任务训练过程当中全部数据的传输。Angel PowerFL 联邦学习已经在腾讯金融云、腾讯广告联合建模等业务中开始落地,并取得初步的效果。github
Angel 机器学习平台:https://github.com/Angel-ML 算法
Angel PowerFL 联邦学习平台在训练任务过程中,对参与方之间的消息通讯要求极高,要求消息系统必须稳定可靠、保持高性能且能保证数据安全。Angel PowerFL 的学习任务在训练过程中,参与方之间会有大量的加密数据经过通讯模块传输,Angel PowerFL 对通讯服务有如下需求:docker
➡️ 稳定可靠apache
Angel PowerFL 的学习任务时长从几分钟到几小时,算法执行对数据的准确性要求很高,不一样算法的数据传输峰值也不同,这须要通讯模块的服务足够稳定,而且不能丢数据。安全
➡️ 高性能传输服务器
Angel PowerFL 底层经过 Spark 进行计算,Executor 并发执行会产生不少待传输的中间数据,通讯模块须要将这些加密后的数据及时传输给对方,这就要求通讯服务作到低延时、高吞吐量。网络
➡️ 数据安全架构
虽然 Angel PowerFL 全部数据都经过加密模块进行了加密,但参与联邦学习的业务可能分布在不一样公司;跨公网进行传输,须要通讯模块足够安全,不易被攻击。并发
联邦通讯服务在作技术预研的时候,考虑过 RPC 直连、HDFS 同步、MQ 同步三种技术方案。考虑到对安全和性能的要求比较高,排除了 RPC 直连和 HDFS 同步方案,肯定采用 MQ 同步方案。
MQ 可选的服务不少,好比 Pulsar、Kafka、RabbitMQ、TubeMQ 等。考虑到 Angel PowerFL 对稳定性、可靠性、高性能传输和数据安全有很高的需求,咱们咨询了腾讯数据平台部 MQ 团队,他们向咱们推荐了 Pulsar。
随后,咱们对 Pulsar 开展了深刻调研,发现 Pulsar 内置的诸多特性,正好知足了咱们对消息系统的要求。Pulsar broker 和 bookie 采用了计算存储分层架构,保证了数据稳定可靠,性能良好;Pulsar 支持跨地域复制(geo-replication),解决了 PowerFL 跨联邦同步 MQ 问题;而 Pulsar 的验证和受权模式也能保证传输安全。
Apache Pulsar 是下一代云原生分布式消息和事件流平台,采用了计算和存储分层的架构:在 Broker 上进行 Pub/Sub 相关的计算,在 Apache BookKeeper 上存储数据。
和传统的消息平台(如 Kafka)相比,这种架构有明显的优点:
Pulsar 原生支持跨地域复制(Geo-replication),能够在多个数据中心的多个 Pulsar 集群中同时同步/异步复制数据。还能够在消息级别,经过 setReplicationClusters 控制消息复制到哪些集群。
在上图中,不管 Producer P一、P2 和 P3 在何时分别将消息发布给 Cluster A、Cluster B 和 Cluster C 中的 topic T1,这些消息均会马上复制到整个集群。一旦完成复制,Consumer C1 和 C2 便可从本身所在的集群消费这些消息。
因为 Pulsar 的存储设计基于分片,Pulsar 把主题分区划分为更小的块,称其为分片。每一个分片都做为 Apache BookKeeper ledger 来存储,这样构成分区的分片集合分布在 Apache BookKeeper 集群中。这样设计方便咱们管理容量和水平扩展,而且知足高吞吐量的需求。
通过深刻调研后,咱们决定在腾讯 Angel PowerFL 联邦学习平台上使用 Apache Pulsar。
联邦学习的各个业务(Angel PowerFL 称之为 Party,每一个 Party 有不一样的 ID,如 10000/20000),可能分布在同个公司的不一样部门(无网络隔离),也可能分布在不一样公司(跨公网),各个 Party 之间经过 Pulsar 跨地域复制功能进行同步复制,整体设计方案以下:
联邦学习的每一个训练任务,经过消息的 producer 和 consumer 链接所在 Party 的 Pulsar 集群,集群名以 fl-pulsar-[partyID] 进行区分,训练任务产生须要传输的中间数据后,生产者将这些数据发送给本地 Pulsar 集群。
Pulsar 集群收到数据后,经过 Pulsar proxy 创建的同步复制网络通道,将数据发送给使用方 Party。而使用方 Party 的消费者,会一直监听该训练任务对应的 topic,当有数据到达后,直接消费数据进行下一步的计算。
在 Angel PowerFL 执行训练任务时,driver 和每一个 partition 会建立一个 channel 类型变量,该变量和 Pulsar 当中具体的 topic 一一对应,须要交换的数据都会通过生产者发送到这个 topic。
Angel PowerFL 支持多方联邦,所以会有 2+ 个 Pulsar 集群须要同步复制数据。每一个联邦学习任务经过各自的 parties 任务参数指定了参与方,生产者在发送消息时调用 setReplicationClusters
接口,确保数据只在参与 Party 之间传输。
在 Angel PowerFL 的通讯模块中,咱们充分利用了 Pulsar 的 geo-replication、topic 限流、Token Authentication 等功能。下面我来详细介绍如何在 Angel PowerFL 联邦学习平台中使用 Pulsar。
在 Angel PowerFL 联邦学习平台上,部署一套完整的 Pulsar 依赖两个 ZooKeeper 集群,分别是 Local ZooKeeper 和 Global ZooKeeper。Local ZooKeeper 和 Kafka 中的 ZooKeeper 做用相似,用来存储元数据。而 Global ZooKeeper 则在 Pulsar 多个集群间中共享配置信息。
在 Angel PowerFL 场景中,每一个 Party 加入前,都要先部署一个 Global ZooKeeper 的子节点,或者共用一套跨公司或跨地域的公共 ZooKeeper,这样不只会增长部署的难度,也会增长被攻击的风险,不利于新 Party 加入。
Global ZooKeeper 中存储的元数据,主要是集群名/服务地址/namespace 权限等信息。Pulsar 支持建立和加入新集群。咱们经过如下两个步骤注册联邦 Pulsar 集群的信息到 local ZooKeeper,就去除了对 Global ZooKeeper 的依赖:
步骤 1: 注册新加入 Party 的 Pulsar 集群
# OTHER_CLUSTER_NAME 为待注册 Party 的 Pulsar 集群名 # OTHER_CLUSTER_BROKER_URL为 Pulsar 集群对应的 broker 地址 ./bin/pulsar-admin clusters create ${OTHER_CLUSTER_NAME} --url http://${OTHER_CLUSTER_HTTP_URL} --broker-url pulsar://${OTHER_CLUSTER_BROKER_URL}
步骤 2: 授予训练用到的 namespace 访问集群权限
./bin/pulsar-admin namespaces set-clusters fl-tenant/${namespace} -clusters ${LOCAL_CLUSTR_NAME},${OTHER_CLUSTER_NAME}
对于新加入的 Party,只用提供与其对应的 Pulsar 的集群名/服务地址便可完成注册,geo-replication 就能够经过注册信息同步复制数据。
Pulsar 做为 Angel PowerFL 的通讯模块,没有加入用户级别的权限控制。为了进一步保证 client 生产和消费数据的安全,咱们参考 Pulsar Client authentication using tokens based on JSON Web Tokens 增长了 token 认证,Angel PowerFL 的训练任务除了配置当前 Party 使用的服务地址外,还须要配置 admin token。
https://pulsar.apache.org/doc...
因为 Angel PowerFL 整套系统部署在 Kubernetes 上,咱们经过容器准备 Pulsar 集群须要的 Public/Private keys 等文件,而后注册到 K8S secret 中。
# 生成 fl-private.key 和 fl-public.key docker run --rm -v "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 /pulsar/bin/pulsar tokens create-key-pair --output-private-key /tmp/fl-private.key --output-public-key /tmp/fl-public.key # 生成 admin-token.txt token 文件 echo -n `docker run --rm -v "$(pwd)":/tmp apachepulsar/pulsar-all:2.5.2 /pulsar/bin/pulsar tokens create --private-key file:///tmp/fl-private.key --subject admin` # 将认证相关的文件注册到 K8S kubectl create secret generic token-symmetric-key --from-file=TOKEN=admin-token.txt --from-file=PUBLICKEY=fl-public.key -n ${PARTY_NAME}
Pulsar 集群开启了 geo-replication 功能后,没法经过命令直接删除用过的 topic,而 Angel PowerFL 训练任务每次使用的任务是一次性的,任务结束后这些 topic 就没用了,若是不及时删除会出现大量累积。
对于经过 geo-replication 开启复制的 topic,能够配置 brokerDeleteInactivetopicsEnabled
参数,开启 topic 自动回收。自动回收无用的 topic,需知足如下几个条件:
Angel PowerFL 部署的 Pulsar 集群,经过 brokerDeleteInactivetopicsEnabled 开启 topic 自动回收。在执行训练任务的过程当中,使用后对每一个 topic 按回收条件进行处理。同时,咱们增长了
brokerDeleteInactivetopicsFrequencySeconds 配置,将回收的频率设置为 3 小时。
Angel PowerFL 中的训练任务,在不一样的数据集/算法/执行阶段,生产数据的流量峰值也不一样。目前生产环境中单个任务最大的数据量超过 200G/小时。训练过程当中,若是 Pulsar 链接中断或者生产和消费过程出现异常,须要从新开始整个训练任务。
为了规避 Pulsar 集群被单个训练任务冲垮的风险,咱们使用了 Pulsar 的限流功能。Pulsar 支持 message-rate 和 byte-rate 两种生产限流策略,前者限制每秒生产消息的数量,后者限制每秒生产消息的大小。Angel PowerFL 将数据切分红多个 4M 的消息,经过 message-rate 限制生产消息的数量。在 Angel PowerFL 中,咱们将 namespace 的消息限制为 30 条(小于<30*4=120M/s):
./bin/pulsar-admin namespaces set-publish-rate fl-tenant/${namespace} -m 30
刚开始测试 message-rate 的限流功能时,出现了限不住的状况(限流设置失效)。腾讯数据平台部 MQ 团队负责 Pulsar 的同事帮忙一块儿排查,发现设置 topicPublisherThrottlingTickTimeMillis 参数后,限制不能生效。
所以咱们想办法在 broker 端启用了精确的 topic 发布频率限制,优化了限流功能并贡献回社区,详情见 PR-7078: introduce precise topic publish rate limiting。
https://github.com/apache/pul...
Pulsar 根据 broker 集群负载情况,能够将 topic 动态分配到 broker上。若是拥有该 topic 的broker 宕机,或者拥有该 topic 的 broker 负载过大,则该 topic 会当即从新分配给另外一个 broker ;而从新分配的过程就是 topic 的 unloading,该操做意味着关闭 topic,释放全部者(owner)。
理论上,topic unloading 由负载均衡调整,客户端将经历极小的延迟抖动,一般耗时 10ms 左右。但 Angel PowerFL 初期在执行训练任务时,日志爆出大量由于 unloading topic 致使的链接异常。日志显示 topic unloading 在不断的重试,但都不成功:
[sub] Could not get connection to broker: topic is temporarily unavailable -- Will try again in 0.1 s
先来看 broker/namespace/bundle/topic 这四者的关系。Bundle 是 Pulsar namespace 的一个分片机制,namespace 被分片为 bundle 列表,每一个 bundle 包含 namespace 的整个哈希范围的一部分。Topic 不直接分配给 broker,而是经过计算 topic 的哈希码将 topic 分配给特定的 bundle;每一个 bundle 互相独立,再被分配到不一样的 broker 上。
Angel PowerFL 早期的任务 topic 没有复用,一个 LR 算法训练任务建立了 2000 多个 topic,每一个 topic 生产的数据负载也不一样,咱们判断上述断连问题是因为短期内(最小任务十分钟内能结束,同时会有多个任务在运行)大量建立和使用 topic,致使负载不均衡,topic unloading 频繁发生。为了下降 topic unloading 的频率,咱们调整了 Pulsar Bundle 的相关参数:
# 增长 broker 可最大分配 topic 数量 loadBalancerBrokerMaxTopics=500000 # 启用自动拆分namespace bundle loadBalancerAutoBundleSplitEnabled=true # 增长触发拆分 bundle 的 topic 数量 loadBalancerNamespaceBundleMaxTopics=10000 # 增长触发拆分 bundle 的消息数 loadBalancerNamespaceBundleMaxMsgRate=10000
同时,在建立 namespace 时,把 bundle 数量默认设置为 64。
./bin/pulsar-admin namespaces create fl-tenant/${namespace} --bundles 64
通过以上调整,Angel PowerFL 在任务执行期间没有再出现过因为 topic unloading 致使的断连。
Angel PowerFL 的全部服务均经过 Helm 部署在 Kubernetes 上。Pulsar 做为其中的一个 chart,能够很好的利用 K8S 的资源隔离、快速扩缩容等特性。在 Angel PowerFL 使用 Helm 部署 Pulsar 的实践中,咱们总结了如下经验:
🎙️ 使用 Local Persistent Volume 做为存储
Pulsar 是 IO 敏感的服务,尤为 bookie 组件,在生产环境中建议使用 SSD 或独立的磁盘。Angel PowerFL 在跑一些大数据集任务时,Pulsar 常常出现 “No Bookies Available” 的异常。这期间磁盘的 IO 使用率很高。
咱们经过 Local Persistent Volume 将 bookie 和 ZooKeeper 等其它组件挂载到单独的磁盘,减缓了磁盘 IO 竞争。咱们也测试过将 Pulsar 的 PV 存储换成 Ceph 和 NFS,性能都没有直接使用 Local Persistent Volume 好。
🎙️ 使用 NodeSelector
Geo-replication 同步复制数据期间,broker 须要访问对方的 Pulsar proxy 容器。Angel PowerFL 将网关机单独打了标签,经过 NodeSelector 将 broker 安装在可访问外网的网关机上。
🎙️ 配置 useHostNameAsBookieID
Bookie 是有状态的组件,为了 bookie pod 重建后服务正常,须要配置 useHostNameAsBookieID,确保向 ZooKeeper 注册的 ID 是 pod 的 hostname。
Angel PowerFL 目前使用 Pulsar 快一年了,稳定运行时间最长的集群已经超过半年,将来对Pulsar 的使用计划主要有两个。
👍 升级 Pulsar 到 2.6.x 版本
咱们目前使用的是 Pulsar 2.5.2 版本,因为最近会使用 Pulsar Key_Shared 功能作 Angel-PS 的容灾恢复。2.6.0 版本恰好有加强 Key_Shared 订阅模式,因此咱们预计将来一个月升级到 Pulsar 2.6.x。
https://github.com/apache/pul...
👍 Pulsar on K8S 支持多磁盘挂载
Angel PowerFL 全部服务都运行在 Kubernetes 上(除了任务使用的 YARN 计算资源),Pulsar 做为其中的一个 chart 和其它服务一块儿部署,使用 Local Persistent Volume 做为存储。但目前 bookie 只支持挂载一块磁盘(目录),对于多磁盘的机器没有更充分的利用,咱们计划增长该特性。
咱们介绍了在人工智能应用场景下,使用 Pulsar 做为 Angel PowerFL 通讯模块的相关实践。在方案实现过程中,咱们充分使用了 Pulsar 诸多内置特性,并根据自身需求作了相关优化,如 geo-replication 去掉 Global ZooKeeper 依赖,为 client 增长 token 认证,开启多集群 topic 自动回收,优化 topic 限流功能和 topic unloading 配置等。
Pulsar 做为下一代云原生分布式消息和流平台,有众多吸引人的功能,在直播与短视频、零售与电子商务、媒体、金融等行业有普遍应用,期待 Pulsar 在不一样的应用场景下不断有新的案例落地。
特别感谢腾讯数据平台部 MQ 团队,在 Angel PowerFL 平台使用 Pulsar 过程当中给与的技术指导。该团队在 Apache Pulsar 和 TubeMQ 上有多年的技术积累,积极为 Pulsar 社区作出了巨大贡献。Pulsar 社区十分活跃,正处于快速成长之中。咱们会持续关注并和 Apache Pulsar 社区深刻合做,把优化的功能奉献给 Pulsar 社区,和社区其余用户一块儿进一步完善、优化 Pulsar 的特性和功能,共同建设一个更强大完善的 Pulsar 社区。
张超,腾讯数据平台部高级工程师,负责 Angel PowerFL 联邦通讯/PowerFL on K8S 等工做。他和腾讯数据平台部 MQ 团队一块儿将 Apache Pulsar 引入 PowerFL 联邦学习平台,开启了 Pulsar 在机器学习领域的应用。