- 主从模式环境能够保障消息的即时性与可靠性
- 投递一条消息后,关闭主节点
- 从节点继续能够提供消费者数据进行消费,可是不能接收消息
- 主节点从新上线后会自动进行消费进度offset的同步
准备两台机器,一主一从:java
机器IP | hostname | 角色 |
---|---|---|
192.168.243.169 | rocketmq01 | master |
192.168.243.170 | rocketmq02 | slave |
我这里事先在两台机器上安装好了RocketMQ,关于RocketMQ的安装能够参考以下文章:apache
接下来,咱们开始搭建RocketMQ主从集群。首先,配置两台机器的hosts
:vim
$ vim /etc/hosts 192.168.243.169 rocketmq-nameserver1 rocketmq-master1 192.168.243.170 rocketmq-nameserver2 rocketmq-slave1
修改master节点的配置文件:bash
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a.properties [root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a.properties #节点所属的集群名称 brokerClusterName=rocketmq-cluster #broker 名字,注意此处不一样的配置文件填写的不同 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=0 #nameServer 地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动建立服务器不存在的 topic,默认建立的队列数 defaultTopicQueueNums=4 #是否容许 Broker 自动建立 Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否容许 Broker 自动建立订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4 点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog 每一个文件的大小默认 1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue 每一个文件默认存 30W 条,根据业务状况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/usr/local/rocketmq-4.7.1/store #commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq-4.7.1/store/index #checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq-4.7.1/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制 Master #- SYNC_MASTER 同步双写 Master #- SLAVE brokerRole=ASYNC_MASTER #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
修改slave节点的配置文件:服务器
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a-s.properties [root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a-s.properties #节点所属的集群名称 brokerClusterName=rocketmq-cluster #broker 名字,注意此处不一样的配置文件填写的不同 brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=1 #nameServer 地址,分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动建立服务器不存在的 topic,默认建立的队列数 defaultTopicQueueNums=4 #是否容许 Broker 自动建立 Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否容许 Broker 自动建立订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #删除文件时间点,默认凌晨 4 点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 #commitLog 每一个文件的大小默认 1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue 每一个文件默认存 30W 条,根据业务状况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 diskMaxUsedSpaceRatio=88 #存储路径 storePathRootDir=/usr/local/rocketmq-4.7.1/store #commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq-4.7.1/store/index #checkpoint 文件存储路径 storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq-4.7.1/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 异步复制 Master #- SYNC_MASTER 同步双写 Master #- SLAVE brokerRole=SLAVE #刷盘方式 #- ASYNC_FLUSH 异步刷盘 #- SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #发消息线程池数量 #sendMessageThreadPoolNums=128 #拉消息线程池数量 #pullMessageThreadPoolNums=128
而后将这两个配置文件拷贝到Slave节点上:app
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties [root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a-s.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties
完成配置后,就能够启动RocketMQ了,在master节点上执行以下命令:dom
[root@rocketmq01 ~]# nohup sh mqnamesrv & [root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
在slave节点上执行以下命令:异步
[root@rocketmq02 ~]# nohup sh mqnamesrv & [root@rocketmq02 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &
启动完成后,分别在两个节点上检查下服务的进程和端口是否正常:async
[root@rocketmq01 ~]# jps 1942 Jps 1739 NamesrvStartup 1775 BrokerStartup [root@rocketmq01 ~]# netstat -lntp |grep java tcp6 0 0 :::10909 :::* LISTEN 1775/java tcp6 0 0 :::10911 :::* LISTEN 1775/java tcp6 0 0 :::10912 :::* LISTEN 1775/java tcp6 0 0 :::9876 :::* LISTEN 1739/java [root@rocketmq01 ~]#
修改RocketMQ的管控台配置,并启动:tcp
[root@rocketmq01 ~]# cd /usr/local/src/rocketmq-externals/rocketmq-console/ [root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# vim src/main/resources/application.properties # 增长nameserver的地址 rocketmq.config.namesrvAddr=192.168.243.169:9876;192.168.243.170:9876 [root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# java -jar target/rocketmq-console-ng-2.0.0.jar
此时在管控台中能够看到有两个节点了:
在Dashboard中也能够看到有两个Broker:
主从集群模式下的高可用机制故障演练
建立一个普通的Maven项目,pom
文件添加rocketmq-client
依赖以下:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
生产者代码示例:
package com.zj.rocketmq.learn.quickstart; import com.zj.rocketmq.learn.constant.Constants; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.util.UUID; /** * rocketmq - 生产者 * * @author 01 * @date 2020-11-30 **/ public class Producer { public static void main(String[] args) throws Exception { // 在rocketmq中生产者必须在一个生产者组内 String producerGroup = "quickstart_producer_group"; DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 设置nameserver的地址 producer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESSES); // 启动生产者 producer.start(); // 消息投递的目标主题 String topic = "quickstart_topic"; // 给消息打一个标签,标签的主要做用是用来过滤的 String tag = "quickstart_tag"; // 给消息设置一个key,是消息的惟一标识 String key = UUID.randomUUID().toString(); // 消息体,即具体的消息内容 String body = "this is quickstart message!"; Message message = new Message(topic, tag, key, body.getBytes()); // 发送消息 SendResult sendResult = producer.send(message); System.out.println("消息发送结果:" + sendResult); producer.shutdown(); } }
常量类代码:
public class Constants { public static final String NAME_SERVER_ADDRESSES = "192.168.243.169:9876;192.168.243.170:9876"; }
运行生产者代码发送一条消息,控制台输出以下:
消息发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A8010B36502437C6DC998FAEE00000, offsetMsgId=C0A8F3A900002A9F0000000000033234, messageQueue=MessageQueue [topic=quickstart_topic, brokerName=broker-a, queueId=1], queueOffset=0]
此时将主节点给停掉,模拟宕机:
[root@rocketmq01 ~]# mqshutdown broker
而后编写消费者端,代码以下:
package com.zj.rocketmq.learn.quickstart; import com.zj.rocketmq.learn.constant.Constants; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; /** * rocketmq - 消费者 * * @author 01 * @date 2020-11-30 **/ public class Consumer { public static void main(String[] args) throws Exception { // 定义消费者组 String consumerGroup = "quickstart_consumer_group"; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); // 设置nameserver的地址 consumer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESS); // 设置从哪一个位置开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 从哪一个主题消费数据 String topic = "quickstart_topic"; // 用于匹配消息标签的表达式 String subExpression = "*"; // 订阅主题 consumer.subscribe(topic, subExpression); // 注册消息监听器,在监听器中实现消息的处理逻辑 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { System.out.println("------------- 接收到消息,开始进行业务处理 -------------"); for (MessageExt msg : msgs) { try { System.out.printf("topic: %s, tags: %s, keys: %s, body: %s%n", msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody())); if ("0".equals(msg.getKeys())) { throw new RuntimeException("模拟业务处理发生异常"); } } catch (Exception e) { e.printStackTrace(); int reconsumeTimes = msg.getReconsumeTimes(); System.err.println("reconsumeTimes: " + reconsumeTimes); if (reconsumeTimes == 3) { // TODO 重试次数达到阈值,放弃重试,记录日志后续作补偿... System.out.println("重试次数达到阈值,放弃重试!"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // 消息处理失败时返回,因为Broker的重试机制,会从新消费该消息 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } // 消息处理成功时返回 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动消费者 consumer.start(); System.out.println("consumer started..."); } }
运行消费者,正常状况下,该消费者依旧可以消费到数据:
从新启动master节点,让其从新加入集群:
[root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
在此过程注意查看消费者的控制台,正常状况下,master从新加入集群,消费者也不会重复消费,由于master会和slave同步offset进度。