消息中间件avitiveMQ,RabbitMQ, KafKa, ZeroMQ层出不穷, 做为如今必备的 一项技术,博客中一次也没有说起,今天来一发RocketMQ。apache
RocketMQ 火箭,顾名思义速度很是快。选择RocketMQ的理由:app
1.国产开源项目,支持国产。maven
2.虽然吞吐量不及KafKa,相比其它MQ,性能优。性能
3.稳定性很是好,经受了双十一的考验,荣誉满满。学习
4. 功能强大,易用性。测试
废话说了一堆,开始搞起。须要环境: jdk, maven, RocketMQ.ui
下载一个RocketMQ, 本人 3.5.8版本 ,而后解压url
环境变量 设置:spa
NAMESRV_ADDRserver
127.0.0.1:9876
运行maven命令:
mvn -Dmaven.test.skip=true clean packageinstall assembly:assembly –U
启动调度管理器: mqnamesrv
{RocketMQ目录}/bin/mqnamesrv
启动mqbroker:
{RocketMQ目录}/bin/mqbroker
写producer的测试代码:
public class Producer { private void startProduce(){ DefaultMQProducer defaultMQProducer= new DefaultMQProducer("Producer"); //nameserver服务,多个以;分开 defaultMQProducer.setNamesrvAddr("127.0.0.1:9876"); try { defaultMQProducer.start(); for(int j=0;j<200;j++) { for (int i = 0; i < 5; i++) { String str = String.valueOf(i); Message message = new Message("PushTopic", "push", str, new String("content" + str).getBytes()); defaultMQProducer.send(message); System.out.println("send OK!:" + str); Thread.sleep(1000); } } }catch (Exception e){ e.printStackTrace(); } finally { defaultMQProducer.shutdown(); } } public static void main(String[] args) { Producer producer=new Producer(); producer .startProduce(); } }
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.dennis.rocketmqroot</groupId> <artifactId>productor</artifactId> <packaging>war</packaging> <version>1.0-SNAPSHOT</version> <name>productor Maven Webapp</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.4.6</version> </dependency> </dependencies> <build> <finalName>productor</finalName> </build> </project>
consumer端的代码:
/** * Created by Dennis on 2017/8/6. */ public class Consumer1 { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); try { //订阅PushTopic下Tag为push的消息 consumer.subscribe("PushTopic", "push"); //程序第一次启动从消息队列头取数据 consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> list, ConsumeConcurrentlyContext Context) { for(Message msg:list){ System.out.print("topic:"+msg.getTopic()+",tag:"+msg.getTags()+",key:"+msg.getKeys()); byte[] b=msg.getBody(); System.out.print(",body:"); for(byte b1:b){ System.out.print((char)b1); } System.out.println(); System.out.println("*******************************************************"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
pom.xml 和producer 一致
致此测试代码写完,运行producer 和 consumer, consumer 启动2个类。
Server端打印:
consumer1 的打印:
consumer2 的打印:
至此一个简单的 生产消费结束,RocketMQ很优秀,也很精深,你们一块儿来学习吧.