1、RocketMQ机器硬件要求内存最好不要低于8G, 系统linux,且已经安装好JDKjava
2、安装文件下载地址:
http://mirror.bit.edu.cn/apache/incubator/rocketmq/4.0.0-incubating/rocketmq-all-4.0.0-incubating-bin-release.ziplinux
3、下载RocketMQ安装文件并上传到服务器上后apache
解压 bash
unzip rocketmq-all-4.0.0-incubating-bin-release.zip
进入到解压目录下的bin目录中服务器
启动 NameServer:maven
nohup sh mqnamesrv &
启动 brokerui
nohup sh mqbroker -n localhost:9876 &
使用jps命令能够看到有如下两个java程序运行中.net
为了便于其它机器调试访问,可临时将防火墙关闭:调试
service firewalld stop
4、Java 程序democode
maven依赖包:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.0.0-incubating</version> </dependency>
消息生产者demo代码:
package com.classtest.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args){ DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("192.168.133.141:9876"); try { producer.start(); Message msg = new Message("PushTopic", "push", "1", "Just for test1.".getBytes()); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "2", "Just for test2.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "1", "Just for test3.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); }finally{ producer.shutdown(); } } }
消息消费者Java代码demo:
package com.classtest.rocketmq; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("192.168.133.141:9876"); try { //订阅PushTopic下Tag为push的消息 consumer.subscribe("PushTopic", "push"); //程序第一次启动从消息队列头取数据 consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> list, ConsumeConcurrentlyContext Context) { Message msg = list.get(0); System.out.println(msg.toString()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
能够先运行消费者demo, 而后运行生产者demo时能够消费者demo的运行窗口输出消息
参考: