学习RocketMQ
的第一天,应该从官网的QuickStart
案例开始,这一节就来介绍一下如何部署单机RocketMQ
以及进行消息的收发。java
使用RocketMQ
须要有以下的硬件要求:git
了解版本说明以后,咱们就能够开始进行实战了。github
Ps: RocketMQ
版本为Release.4.7.0
。apache
打开RocketMQ
在Github
上的主页,获取仓库地址。而后在本地电脑上克隆本仓库。服务器
git clone https://github.com/apache/rocketmq.git
打开项目后,第一步要作的是启动nameserver
,这是RocketMQ
的路由中心,它提供轻量级服务发现和路由,主要的做用是存储路由信息,管理broker
节点,包括路由的查找、注册和删除。app
在RocketMQ
工程的namesrv
包中找到入口类org.apache.rocketmq.namesrv.NamesrvStartup
,运行这个类的main
函数,发现报错了。ide
Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation
这个报错是由于在为nameserver
设置相关配置时没有设置成功。函数
if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); }
ROCKETMQ_HOME
环境变量主要用于设置nameserver
的配置,只须要将包含conf
配置目录的这个路径赋值给环境变量ROCKETMQ_HOME
便可,以下图。学习
再次运行main
函数,就会发现启动成功。ui
The Name Server boot success. serializeType=JSON
接下来要启动的是broker
,它主要用于消息存储,接收和发送。
一样在RocketMQ
工程的broker
包中找到入口类org.apache.rocketmq.broker.BrokerStartup
,可是与启动nameserver
不一样的是,启动broker
时须要指定注册的nameserver
地址,在启动命令中输入-n 127.0.0.1:9876
便可。
运行main
函数,若是发现与以前同样的报错,从新设置该Application
环境变量便可,运行成功的输出以下。
The broker[daxiongMac.local, 192.168.31.126:10911] boot success. serializeType=JSON and name server is namesrvAddr=127.0.0.1:9876
至此,RocketMQ
的路由中心和接收发消息的服务器就启动成功了,咱们能够经过nameserver
和broker
来进行消息传递了。
找到example
包的org.apache.rocketmq.example.quickstart.Producer
类,这是一个最简单的消息生产者,咱们来看一下它的源码。
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 1000; i++) { try { Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
DefaultMQProducer
类来建立生产者实例,并指定消息组Group
和路由中心地址。Topic
和Tag
(用于区分消息的类别)。21:22:35.450 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0). RocketMQLog:WARN Please initialize the logger system properly. SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E4E0000, offsetMsgId=C0A81F7E00002A9F0000000000068BA2, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=1], queueOffset=500] SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E810001, offsetMsgId=C0A81F7E00002A9F0000000000068C54, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=2], queueOffset=500] SendResult [sendStatus=SEND_OK, msgId=C0A81F7E8D39330BEDB41E560E850002, offsetMsgId=C0A81F7E00002A9F0000000000068D06, messageQueue=MessageQueue [topic=TopicTest, brokerName=daxiongMac.local, queueId=3], queueOffset=500] ......
这样就实现了RocketMQ
的发送消息。
找到example
包的org.apache.rocketmq.example.quickstart.Consumer
类,这是一个最简单的消息消费者,咱们来看一下它的源码。
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
DefaultMQPushConsumer
类来建立消费者实例,并指定消息组Group
、路由中心地址、消费模式、消息类别。21:24:03.482 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework Consumer Started. ConsumeMessageThread_6 Receive New Messages: [MessageExt [brokerName=daxiongMac.local, queueId=2, storeSize=178, queueOffset=502, sysFlag=0, bornTimestamp=1591449756319, bornHost=/192.168.31.126:50803, storeTimestamp=1591449756321, storeHost=/192.168.31.126:10911, msgId=C0A81F7E00002A9F00000000000691E4, commitLogOffset=430564, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1591449844575, UNIQ_KEY=C0A81F7E8D39330BEDB41E560E9F0009, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57], transactionId='null'}]] ConsumeMessageThread_5 Receive New Messages: [MessageExt [brokerName=daxiongMac.local, queueId=1, storeSize=178, queueOffset=502, sysFlag=0, bornTimestamp=1591449756316, bornHost=/192.168.31.126:50803, storeTimestamp=1591449756317, storeHost=/192.168.31.126:10911, msgId=C0A81F7E00002A9F0000000000069132, commitLogOffset=430386, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1591449844576, UNIQ_KEY=C0A81F7E8D39330BEDB41E560E9C0008, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56], transactionId='null'}]] ......
至此,咱们就完成了RocketMQ
的快速入门,启动nameserver
和broker
,建立生产者发送消息,建立消费者接收消息。
版权声明:本文为 Planeswalker23所创,转载请带上原文连接,感谢。