有段时间没写这些技术文章了, 今天抽空写一点,否则本身都快忘记了 这篇文章记录了rocketmq 集群方式搭建的过程, 也是本身半天的成果记录吧! 感兴趣的朋友点个赞在走呗!php
好了,废话很少,下面开搞。html
本文章参考http://www.javashuo.com/article/p-kbmdwwtl-mh.html 这个博客文章编写java
第一步:关闭要搭建的全部机器的防火墙 第二步:每台机器执行下以下步骤linux
[root@ma01 ~]# vim /etc/sysconfig/selinux ...... SELINUX=disabled [root@ma01~]# setenforce 0 [root@ma01~]# getenforce
第三步:全部机器装好jdk, maven , zip , unzip , ssh 免密登陆apache
配置crt链接: https://blog.csdn.net/cmqwan/article/details/61932792 安装maven参考老哥博客: https://www.cnblogs.com/clicli/p/5866390.html 安装zip,unzip参考: http://www.rpmfind.net/linux/rpm2html/search.php?query=zip&submit=Search+...&system=&arch= 安装ssh参考: https://blog.csdn.net/m0_37590135/article/details/74275859 jdk本身百度哈, 不少参考博客的!
第四步: 以下命令是ssh机器之间copy用的命令vim
scp -r /home/administrator/test/ root@192.168.1.100:/root/
第五步: 下载完成后, 解压服务器
unzip rocketmq-all-4.4.0-bin-release.zip
第六步:进入解压后的文件夹rocketmq-bin4.4.0 , 在文件夹里面新建logs , data/store, data2/store 目录 第七步:安装顺序修改bin下面的几个启动文件, 因默认配置内存空间太大,本地启动会报错ssh
1. vim runbroker.sh 对应地方更改成 -server -Xms512m -Xmx512m -Xmn256m 2. vim runserver.sh (一样的道理) -server -Xms512m -Xmx512m -Xmn126m -XX:PermSize=128m -XX:MaxPermSize=320m 3. vim tools.sh -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m
第八步:到rocketmq-bin4.4.0/conf/2m-2s-async 下 修改这四个文件 async
第九步: 130主机器修改以下配置文件, broker-a.properties broker-b-s.properties两个文件 内容分别以下 broker-a.propertiesmaven
brokerClusterName=RocketMQCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH ##Broker 对外服务的监听端口 listenPort=10911 #nameserver地址,分号分割 namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876 #在发送消息时,自动建立服务器不存在的topic,默认建立的队列数 defaultTopicQueueNums=4 #是否容许 Broker 自动建立Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否容许 Broker 自动建立订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true brokerIP1=192.168.175.130 storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog # 消费队列存储路径存储路径 storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue #消息索引存储路径 storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index #checkpoint 文件存储路径 storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint #abort 文件存储路径 abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 # commitLog每一个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每一个文件默认存300W条,根据业务状况调整 mapedFileSizeConsumeQueue=3000000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 #diskMaxUsedSpaceRatio=88
broker-b-s.properties
brokerClusterName=RocketMQCluster brokerName=broker-b brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=10921 #nameserver地址,分号分割 namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876 brokerIP1=192.168.175.130 #在发送消息时,自动建立服务器不存在的topic,默认建立的队列数 defaultTopicQueueNums=4 #是否容许 Broker 自动建立Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否容许 Broker 自动建立订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog2 # 消费队列存储路径存储路径 storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue2 #消息索引存储路径 storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index2 #checkpoint 文件存储路径 storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint2 #abort 文件存储路径 abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort2 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 # commitLog每一个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每一个文件默认存300W条,根据业务状况调整 mapedFileSizeConsumeQueue=3000000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 #diskMaxUsedSpaceRatio=88
第十步: 131, 132 机器只修改 broker-b.properties 和broker-a-s.properties 内容分别以下: broker-b.properties
# limitations under the License. brokerClusterName=RocketMQCluster brokerName=broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10911 #nameserver地址,分号分割 namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876 brokerIP1=192.168.175.131 #在发送消息时,自动建立服务器不存在的topic,默认建立的队列数 defaultTopicQueueNums=4 #是否容许 Broker 自动建立Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否容许 Broker 自动建立订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data/store storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data/store/commitlog # 消费队列存储路径存储路径 storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data/store/consumequeue #消息索引存储路径 storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data/store/index #checkpoint 文件存储路径 storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data/store/checkpoint #abort 文件存储路径 abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 # commitLog每一个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每一个文件默认存300W条,根据业务状况调整 mapedFileSizeConsumeQueue=3000000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 #diskMaxUsedSpaceRatio=88
broker-a-s.properties
# limitations under the License. brokerClusterName=RocketMQCluster brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=10921 namesrvAddr=192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876 brokerIP1=192.168.175.131 #在发送消息时,自动建立服务器不存在的topic,默认建立的队列数 defaultTopicQueueNums=4 #是否容许 Broker 自动建立Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true #是否容许 Broker 自动建立订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true storePathRootDir=/opt/local/data/install/rocketmq-bin4.4.0/data2/store storePathCommitLog=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/commitlog # 消费队列存储路径存储路径 storePathConsumerQueue=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/consumequeue #消息索引存储路径 storePathIndex=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/index #checkpoint 文件存储路径 storeCheckpoint=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/checkpoint #abort 文件存储路径 abortFile=/opt/local/data/install/rocketmq-bin4.4.0/data2/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留时间,默认 48 小时 fileReservedTime=120 # commitLog每一个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每一个文件默认存30W条,根据业务状况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间 #diskMaxUsedSpaceRatio=88
第十一步: 启动
三台都执行: nohup sh bin/mqnamesrv > ./logs/namesrvrun.log 2>&1 & 130机器执行: nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a.log 2>&1 & nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b-s.log 2>&1 & 131, 132 机器执行: nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-b.log 2>&1 & nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties -n"192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876" > ./logs/broker-a-s.log 2>&1 &
执行以后,jps结果,有两个brokerstartup就好了, 若是报错的化,看下本身建的logs文件夹日志
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-all</artifactId> <version>3.5.9</version> <type>pom</type> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> </dependencies>
import java.io.UnsupportedEncodingException; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; /** * 消息发送者 * @author LELE * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { // 声明并初始化一个producer // 须要一个producer group名字做为构造方法的参数,这里为producer1 DefaultMQProducer producer = new DefaultMQProducer("producer1"); producer.setVipChannelEnabled(false); // 设置NameServer地址,此处应改成实际NameServer地址,多个地址之间用;分隔 // NameServer的地址必须有 // producer.setClientIP("xxxx"); // producer.setInstanceName("Producer"); producer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876"); // 调用start()方法启动一个producer实例 producer.start(); // 发送1条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值 try { for(int i=0;i<30000;i++) { // 封装消息 Message msg = new Message("TopicTest", // topic "TagA", // tag ("Hello RocketMQ--------"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); // 调用producer的send()方法发送消息 // 这里调用的是同步的方式,因此会有返回结果 SendResult sendResult = producer.send(msg); // 打印返回结果 System.out.println(sendResult); } } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } // 发送完消息以后,调用shutdown()方法关闭producer System.out.println("send success"); producer.shutdown(); } }
import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; /** * 消息接收者, 须要服务器启动mq服务 * @author LELE * */ public class Consumer { public static void main(String[] args) throws MQClientException { // 声明并初始化一个consumer // 须要一个consumer group名字做为构造方法的参数,这里为consumer1 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1"); // consumer.setVipChannelEnabled(false); // 一样也要设置NameServer地址 consumer.setNamesrvAddr("192.168.175.130:9876;192.168.175.131:9876;192.168.175.132:9876"); // 这里设置的是一个consumer的消费策略 // CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息 // CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)所有消费一遍 // CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时之前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 设置consumer所订阅的Topic和Tag,*表明所有的Tag consumer.subscribe("TopicTest", "*"); // 设置一个Listener,主要进行消息的逻辑处理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest")) { // 执行TopicTest1的消费逻辑 System.out.println("TagA:" + new String(msg.getBody())); } System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size() + "----------------------------------------------------------------------------------"); // 返回消费状态 // CONSUME_SUCCESS 消费成功 // RECONSUME_LATER 消费失败,须要稍后从新消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 调用start()方法启动consumer consumer.start(); System.out.println("Consumer Started."); } }
启动开始尽情玩耍吧,少年, 记得点赞哦!