rocketMQ 本地环境搭建

  最近在本身本地搭建rocketMQ,过程当中遇到一些问题,如今总结一下,便于之后查看.html

  首先打开rocket官网:http://rocketmq.apache.org, 点击latest realease,有source版本和binary版本供下载,点击quikstart能够看到source版本的构建步骤:java

      source版本须要先安装了如下软件:apache

  1. 建议使用64位操做系统,Linux / Unix / Mac;
  2. 64位JDK 1.8+;
  3. Maven 3.2.x;
  4. Git;
  5. 4g +免费磁盘用于Broker服务器

  

  我系统环境是OS 10.13.6,下载的是binary版本,省去了构建成为二进制文件的步骤.bash

将rocketmq-all-4.4.0-bin-release.zip文件解压后获得rocketmq-all-4.4.0-bin-release文件夹,服务器

配置环境变量ROCKETMQ_HOME=xxx/rocketmq-all-4.4.0-bin-release,其中xxx表示你的文件夹父路径,异步

例如/Users/david/Downloads/packages/rocketmq-all-4.4.0-bin-release.async

再将ROCKETMQ_HOME加入到PATH中如 $ROCKETMQ_HOME/bin:$PATHmaven

 

打开teiminal,输入sh mqnamesrv启动名称服务器,以下图:ide

能够看到Java HotSpot(TM) 64-Bit Server VM warning“警告信息,不要怕,工具

它是说CMS垃圾收集器已通过时,在将来的JVM版本中会被其余收集器替代,能够不用管他.

namesrv已经正常启动了

一样,再开一个terminal,输入sh mqbroker 启动broker.

若是在启动过程当中遇到:

ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!

 

不要慌,这是说你的JAVA_HOME环境变量设置有问题,请检查环境变量.

若是检查后发现环境变量没问题,但仍是报以上错误,这多是系统问题吧,

记得OS系统有好几个文件能够配置环境变量,最多见的是用户目录下的.bash_profile,对当前用户生效,还有一个全局的,在etc目录下的profile文件

能够再确认一下,若是两个文件都配置了依然报错,那只好用最戳的方式解决:

打开mq的安装目录,找到bin文件夹下的runserver.sh和runbroker.sh,

在文件中主动加入你的JAVA_HOME,以下图:

一切顺利解决,至此namesrv和broker都已经成功启动,下面来写几个Javademo

首先加入maven依赖包:

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>

使用GRADLE构建的话加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'


使用GRADLE构建的话加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

使用RocketMQ有如下3种发送消息的方式,分别适用不一样场景:

1.同步发送消息:可靠的同步传输用于普遍的场景,如重要的通知消息,短信通知,短信营销系统等。

2.异步发送消息:异步传输一般用于响应时间敏感的业务场景。

3.以单向模式发送消息:单向传输用于须要中等可靠性的状况,例如日志收集。

咱们选第一种发送方式写一个demo:

package com.example.demo.common.mq.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

经过循环发送100条消息,运行代码,结果抛出以下错误:

该错误表示找不到这个topic的路由,关于这个错误,网上也有一些对策,无外乎如下几个缘由:

1.broker没有连上namesrv

2.producer没有连上namesrv

3.namesrv没有建立并维护该topic信息

4.netty包版本冲突

5.防火墙问题

前面3条能够经过查找日志肯定问题,

  1).broker.log里面若是查到register broker to name server localhost:9876 OK 这样的信息表示broker已经连上namesrv了.

若是没有,那么你能够从新启动broker: sh mqbroker -n localhost:9876

  2).namesrv.log里面若是可以查到 new topic registered, TopicTest QueueData这样的信息,表示你的topic也已经被成功建立.

若是没有,那么你能够经过mq的admin工具主动生成该topic: sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t TopicTest

  3).producer有没有连上namesrv,只需检查一下代码中namesrv的值对不对就好了

  4).至于4和5那就须要你本身去跟踪源码发现问题了.

 

如今再来运行生产者代码,成功发送100条消息:

 

 消费者也来写个demo:

package com.example.demo.common.mq.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
public static void main(String[] args) throws MQClientException {

// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");

// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

});

//Launch the consumer instance.
consumer.start();

System.out.printf("Consumer Started.%n");
}
}

 成功消费100条消息:

 

 另外,关于rocketMQ的一些基本概念请参考另外一篇: http://www.javashuo.com/article/p-cyjwsvap-hm.html

 下次来看看本地伪集群怎么搭建: http://www.javashuo.com/article/p-gxrmdamh-hp.html