本文主要介绍如何将 RocketMQ 集群从原先的主从同步升级到主从切换。java
首先先介绍与 DLedger 多副本即 RocketMQ 主从切换相关的核心配置属性,而后尝试搭建一个主从同步集群,再从原先的 RocketMQ 集群平滑升级到 DLedger 集群的示例,并简单测试一下主从切换功能。服务器
其主要的配置参数以下所示:架构
首先先搭建一个传统意义上的主从同步架构,往集群中灌必定量的数据,而后升级到 DLedger 集群。app
在 Linux 服务器上搭建一个 rocketmq 主从同步集群我想不是一件很难的事情,故本文就不会详细介绍按照过程,只贴出相关配置。源码分析
实验环境的部署结构采起 一主一次,其部署图以下: 测试
下面我就重点贴一下 broker 的配置文件。 220 上的 broker 配置文件以下:设计
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1=192.168.0.220 brokerIP2=192.168.0.220 namesrvAddr=192.168.0.221:9876;192.168.0.220:9876 storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog autoCreateTopicEnable=false autoCreateSubscriptionGroup=false
221 上 broker 的配置文件以下:3d
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 1 deleteWhen = 04 fileReservedTime = 48 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH brokerIP1=192.168.0.221 brokerIP2=192.168.0.221 namesrvAddr=192.168.0.221:9876;192.168.0.220:9876 storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog autoCreateTopicEnable=false autoCreateSubscriptionGroup=false
相关的启动命令以下:日志
nohup bin/mqnamesrv /dev/null 2>&1 & nohup bin/mqbroker -c conf/broker.conf /dev/null 2>&1 &
安装后的集群信息如图所示: code
DLedger 集群至少须要3台机器,故搭建 DLedger 还须要再引入一台机器,其部署结构图以下:
从主从同步集群升级到 DLedger 集群,用户最关心的仍是升级后的集群是否可以兼容原先的数据,即原先存储在消息可否能被消息消费者消费端,甚至于可否查询到。 为了方便后续验证,首先我使用下述程序向 mq 集群中添加了一篇方便查询的消息(设置消息的key)。
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test"); producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876"); producer.start(); for(int i =600000; i < 600100; i ++) { try { Message msg = new Message("topic_dw_test_by_order_01",null , "m" + i,("Hello RocketMQ" + i ).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); //System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); System.out.println("end"); } }
消息的查询结果示例以下:
Step1:将 192.168.0.220 的 rocketmq 拷贝到 192.168.0.222,可使用以下命令进行操做。在 192.168.0.220 上敲以下命令:
scp -r rocketmq-all-4.5.2-bin-release/ root@192.168.0.222:/opt/application/rocketmq-all-4.5.2-bin-release
> 舒适提示:示例中因为版本是同样,实际过程当中,版本须要升级,故需先下载最新的版本,而后将老集群中的 store 目录完整的拷贝到新集群的 store 目录。
Step2:依次在三台服务器的 broker.conf 配置文件中添加与 dledger 相关的配置属性。
192.168.0.220 broker配置文件以下:
brokerClusterName = DefaultCluster brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1=192.168.0.220 brokerIP2=192.168.0.220 namesrvAddr=192.168.0.221:9876;192.168.0.220:9876 storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog autoCreateTopicEnable=false autoCreateSubscriptionGroup=false # 与 dledger 相关的属性 enableDLegerCommitLog=true storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store dLegerGroup=broker-a dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911 dLegerSelfId=n0
192.168.0.221 broker配置文件以下:
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 1 deleteWhen = 04 fileReservedTime = 48 brokerRole = SLAVE flushDiskType = ASYNC_FLUSH brokerIP1=192.168.0.221 brokerIP2=192.168.0.221 namesrvAddr=192.168.0.221:9876;192.168.0.220:9876 storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog autoCreateTopicEnable=false autoCreateSubscriptionGroup=false # 与dledger 相关的配置属性 enableDLegerCommitLog=true storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store dLegerGroup=broker-a dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911 dLegerSelfId=n1
192.168.0.222 broker配置文件以下:
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH brokerIP1=192.168.0.222 brokerIP2=192.168.0.222 namesrvAddr=192.168.0.221:9876;192.168.0.220:9876 storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store storePathCommitLog=/opt/application/rocketmq-all-4.5.2-bin-release/store/commitlog autoCreateTopicEnable=false autoCreateSubscriptionGroup=false # 与 dledger 相关的配置 enableDLegerCommitLog=true storePathRootDir=/opt/application/rocketmq-all-4.5.2-bin-release/store/dledger_store dLegerGroup=broker-a dLegerPeers=n0-192.168.0.220:40911;n1-192.168.0.221:40911;n2-192.168.0.222:40911 dLegerSelfId=n2
> 舒适提示:legerSelfId 分别为 n0、n一、n2。在真实的生产环境中,broker配置文件中的 storePathRootDir、storePathCommitLog 尽可能使用单独的根目录,这样判断其磁盘使用率时才不会相互影响。
Step3:将 store/config 下的 全部文件拷贝到 dledger store 的 congfig 目录下。
cd /opt/application/rocketmq-all-4.5.2-bin-release/store/ cp config/* dledger_store/config/
> 舒适提示:该步骤按照各自按照时配置的目录进行复制便可。
Step4:依次启动三台 broker。
nohup bin/mqbroker -c conf/broker.conf /dev/null 2>&1 &
若是启动成功,则在 rocketmq-console 中看到的集群信息以下:
首先咱们先验证升级以前的消息是否能查询到,那咱们仍是查找key 为 m600000 的消息,查找结果如图所示:
而后咱们来测试一下消息发送。测试代码以下:
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer_dw_test"); producer.setNamesrvAddr("192.168.0.220:9876;192.168.0.221:9876"); producer.start(); for(int i =600200; i < 600300; i ++) { try { Message msg = new Message("topic_dw_test_by_order_01",null , "m" + i,("Hello RocketMQ" + i ).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); System.out.println("end"); } }
执行结果以下:
再去控制台查询一下消息,其结果也代表新的消息也能查询到。
最后咱们再来验证一下主节点宕机,消息发送是否会受影响。
在消息发送的过程当中,去关闭主节点,其截图以下:
再来看一下集群的状态:
等待该复制组从新完成主服务器选举后,便可继续处理消息发送。
> 舒适提示:因为本示例是一主一从,故在选举期间,消息不可用,但在真实的生产环境上,其部署架构是多主主从,即一个复制组在 leader 选举期间,其余复制组能够接替该复制组完成消息的发送,实现消息服务的高可用。
与 DLedger 相关的日志,默认存储在 broker_default.log 文件中。
本文就介绍到这里了,若是以为文章对您有帮助的话,还但愿帮忙点个赞,谢谢。
推荐阅读:源码分析 RocketMQ DLedger 多副本即主从切换系列文章:
三、源码分析 RocketMQ DLedger 多副本存储实现
四、源码分析 RocketMQ DLedger(多副本) 之日志追加流程
五、源码分析 RocketMQ DLedger(多副本) 之日志复制-上篇
六、源码分析 RocketMQ DLedger(多副本) 之日志复制-下篇
七、基于 raft 协议的 RocketMQ DLedger 多副本日志复制设计原理
八、RocketMQ 整合 DLedger(多副本)即主从切换实现平滑升级的设计技巧
九、源码分析 RocketMQ DLedger 多副本即主从切换实现原理
> 做者简介:《RocketMQ技术内幕》做者,RocketMQ 社区布道师,维护公众号:中间件兴趣圈,可扫描以下二维码与做者进行互动。