导语:RocketMQ 用户能够无缝迁移到 Apache Pulsar 了。自此,Apache Pulsar 补齐了兼容主流消息队列协议的能力。java
咱们很高兴地宣布腾讯云中间件开源 RoP!RoP 将 RocketMQ 协议处理插件引入 Pulsar broker,这样 Pulsar 就能支持原生 RocketMQ 协议了。git
做者 | 冉小龙 腾讯高级工程师,Apache Pulsar Committer,Apache BookKeeper Contributorgithub
什么是RoP
什么是高可用性
与 KoP、MoP 和 AoP 类似,RoP 是一种可插拔的协议处理插件。apache
将 RoP 协议处理插件添加到现有 Pulsar 集群后,用户无需修改代码,便能将现有的 RocketMQ 应用程序和服务迁移到 Pulsar,同时还能使用 Pulsar 的强大功能,例如:微信
•计算与存储分离架构
•多租户app
•跨地域复制框架
•分层分片分布式
•轻量化计算框架 -- Pulsar Functions函数
•...
为何开发RoP
Apache Pulsar 是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体。自 2016 年开源以来,Pulsar 已被普遍采用,并于 2018 年被指定为 Apache 顶级项目。
RocketMQ 是一款强大的开源分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。
Pulsar 和 RocketMQ 拥有普遍的用户群体和强劲的开发支持,全球许多头部公司都在使用这两种消息服务。同时,咱们也收到了用户的需求,但愿能在 Pulsar 与 RocketMQ 之间传输数据,并充分利用这两种消息系统的优点。
Apache Pulsar 经过对 Consumer 层的抽象,提供了队列和流两种消费模型的统一抽象。在 Client 与 Broker 的交互中,Pulsar 基于 Protobuf 的二进制协议,提供更高的性能和更低的延迟。除此以外,经过 Protobuf 协议,Pulsar 能够更容易地支持并实现多语言的客户端,好比:Java、CPP、Python 和 Go 语言等客户端。
可是,对于使用其余消息传输协议编写的应用程序(例如,RocketMQ),因为使用的消息处理协议和 Pulsar 不一样,若是 Pulsar 想要兼容 RocketMQ 协议,为了将 RocketMQ 的协议适配到 Pulsar 的消息协议层中,用户须要重写整个协议层,这给用户的迁移和切换带来了很大的成本。
为了解决这个问题,最直观的处理方式是使用相似 Pulsar Connector 的形式,将用户在 RocketMQ 中的现存数据经过 RocketMQ Wrapper 的方式导入到 Pulsar 集群,可是这须要业务端更改本身的业务代码逻辑,同时须要确保两边的数据可以保证一致,这给使用 RocketMQ 的用户带来了很大的技术挑战。因此,可否给用户提供一个开箱即用的迁移策略和方案而且用户无需作任何代码修改呢?这即是 RoP 诞生的最初目的。
Apache Pulsar 在 PIP-41中介绍了一种全新的接入方式。经过在 Broker 端暴露 Protocol Handler 插件,将 Netty 的 channel 和 Pulsar 的 Broker Service) 对象暴露给用户。这容许用户直接操做和调用 Pulsar 中比较低阶的 API(例如:PersistentTopic 和 ManagerLedger)。基于这个协议,用户无需更改代码,只需将服务请求转发到 RoP 中,RoP 利用 Protocol Handler 的插件将用户的请求转发到 Pulsar 中便可。
怎么开发RoP
RoP架构
经过对比 Pulsar 和 RocketMQ 之间的协议能够发现,两者在消息处理的思路上有很多类似之处,好比这两种协议都包含以下操做:
Topic Lookup: 全部 Clients 与任意 Broker 创建链接以前,会先去查找当前 Topic 的 Owner Broker。获取到对应的 metadata 以后,Clients 会与 Owner Broker 之间创建 TCP 链接进行数据的交互。
- Produce: Clients 与 Topic 所在的全部 Owner Broker 之间进行通讯并将消息 append 到对应的分布式日志中。
- Consume: Clients 与 Topic 所在的全部 Owner Broker 之间进行通讯并从分布式日志中读取指定的消息。
- Offset: Producer 生产到 topic 中的消息会分配一个惟一的 offset,Pulsar 中使用 MessageID 来标识 offset。消费者能够经过 offset 去日志中获取指定位置的消息。
Apache Pulsar 的存储层使用了 Apache BookKeeper,Pulsar 至关于 BookKeeper 的 Client,经过调用 ManagerLedger 对象可以很容易的达到为分布式日志操做的目的。基于此,RoP 能够很好的将 RocketMQ 中对 commitLog 和 queueLog 的操做映射到 BookKeeper 中来。
RoP概念
Offset 和 MessageID
在 RocketMQ 中,使用 offset 来标识消息的位置,当消息被生产到指定的 Topic 以后,会为每个消息分配一个惟一的 offset;在 Pulsar 中,使用 MessageID 来惟一标识每条消息,每个 MessageID 由三部分组成,ledgerID、entryID 和 partitionID。咱们经过合理的划分将 messageID 和 offset 进行映射,来惟一标识 Topic 中的每一条消息。
Message
对于一条消息,RocketMQ 和 Pulsar 都包含消息的 headers 和 payload 等字段,经过对消息协议的解析,咱们能够轻松的将 RocketMQ message 转换为 Pulsar 的 message 格式。为了更好的兼容 Tag 消息的功能,在消息协议的处理方面增长了 8 字节的特殊字段,用来区分该消息是否属于 Tag 消息。
Topic Lookup
在 Pulsar 中,client 与 broker 创建链接以前,会根据当前传入的 Topic 执行 Lookup 操做,在 Broker 集群中寻找当前 Topic 所在的 Owner Broker,而后将该 Owner Broker 的地址返回并与 client 创建 TCP 链接,再进行数据交互。在 RocketMQ 中,client 与 broker 创建链接以前,会先处理 GET_ROUTEINTO_BY_TOPIC 命令,获取 topic 所在的路由信息后,创建对应的 TCP 链接,再进行数据交互。
如何使用RoP
目前,RoP 发布了 0.1.0 版本,你能够用过如下任一方式参与该项目:
想上手试试?
可在如下网址下载 RoP 和查阅用户指南。不管是快速启动 standalone RoP 或在现有 Pulsar 集群中部署 RoP,均可轻松实现。
另外,为了方便快速使用并验证 RoP,咱们提供了 RocketMQ 的常见使用场景和用例,你能够直接使用这些代码示例验证服务:https://github.com/streamnative/rop/tree/master/examples/src/main/java/org/streamnative/rocketmq/example。
想解决问题?
若有任何问题,能够在 RoP GitHub repo 中 建立 issue 或加入 RoP 微信群进行讨论。不管哪一种方式,RoP 资深专家都随时在线:https://github.com/streamnative/rop/issues/new。
想参与贡献?
RoP 源码开放并托管在 GitHub 上:https://github.com/streamnative/rop。 如需改进功能或修复 bug,欢迎提交 PR。
特别鸣谢
在此特别鸣谢腾讯云中间件团队张勇华、冉小龙、韩明泽、夏子承等同窗的支持,以及StreamNative 在架构设计以及方案的良好建议,共同推动了 RoP 项目的顺利落地。从此双方将继续携手并进、砥砺前行,为消息服务贡献更多力量!