最近在本身本地搭建rocketMQ,过程当中遇到一些问题,如今总结一下,便于之后查看.html
首先打开rocket官网:http://rocketmq.apache.org, 点击latest realease,有source版本和binary版本供下载,点击quikstart能够看到source版本的构建步骤:java
source版本须要先安装了如下软件:apache
我系统环境是OS 10.13.6,下载的是binary版本,省去了构建成为二进制文件的步骤.bash
将rocketmq-all-4.4.0-bin-release.zip文件解压后获得rocketmq-all-4.4.0-bin-release文件夹,服务器
配置环境变量ROCKETMQ_HOME=xxx/rocketmq-all-4.4.0-bin-release,其中xxx表示你的文件夹父路径,异步
例如/Users/david/Downloads/packages/rocketmq-all-4.4.0-bin-release.async
再将ROCKETMQ_HOME加入到PATH中如 $ROCKETMQ_HOME/bin:$PATHmaven
打开teiminal,输入sh mqnamesrv启动名称服务器,以下图:ide
能够看到”Java HotSpot(TM) 64-Bit Server VM warning“警告信息,不要怕,工具
它是说CMS垃圾收集器已通过时,在将来的JVM版本中会被其余收集器替代,能够不用管他.
namesrv已经正常启动了
一样,再开一个terminal,输入sh mqbroker 启动broker.
若是在启动过程当中遇到:
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
不要慌,这是说你的JAVA_HOME环境变量设置有问题,请检查环境变量.
若是检查后发现环境变量没问题,但仍是报以上错误,这多是系统问题吧,
记得OS系统有好几个文件能够配置环境变量,最多见的是用户目录下的.bash_profile,对当前用户生效,还有一个全局的,在etc目录下的profile文件
能够再确认一下,若是两个文件都配置了依然报错,那只好用最戳的方式解决:
打开mq的安装目录,找到bin文件夹下的runserver.sh和runbroker.sh,
在文件中主动加入你的JAVA_HOME,以下图:
一切顺利解决,至此namesrv和broker都已经成功启动,下面来写几个Javademo
首先加入maven依赖包:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
使用GRADLE构建的话加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
使用GRADLE构建的话加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
使用RocketMQ有如下3种发送消息的方式,分别适用不一样场景:
咱们选第一种发送方式写一个demo:
package com.example.demo.common.mq.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
经过循环发送100条消息,运行代码,结果抛出以下错误:
该错误表示找不到这个topic的路由,关于这个错误,网上也有一些对策,无外乎如下几个缘由:
1.broker没有连上namesrv
2.producer没有连上namesrv
3.namesrv没有建立并维护该topic信息
4.netty包版本冲突
5.防火墙问题
前面3条能够经过查找日志肯定问题,
1).broker.log里面若是查到register broker to name server localhost:9876 OK 这样的信息表示broker已经连上namesrv了.
若是没有,那么你能够从新启动broker: sh mqbroker -n localhost:9876
2).namesrv.log里面若是可以查到 new topic registered, TopicTest QueueData这样的信息,表示你的topic也已经被成功建立.
若是没有,那么你能够经过mq的admin工具主动生成该topic: sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t TopicTest
3).producer有没有连上namesrv,只需检查一下代码中namesrv的值对不对就好了
4).至于4和5那就须要你本身去跟踪源码发现问题了.
如今再来运行生产者代码,成功发送100条消息:
消费者也来写个demo:
package com.example.demo.common.mq.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
成功消费100条消息:
另外,关于rocketMQ的一些基本概念请参考另外一篇: http://www.javashuo.com/article/p-cyjwsvap-hm.html
下次来看看本地伪集群怎么搭建: http://www.javashuo.com/article/p-gxrmdamh-hp.html