每章一点正能量:自我控制是最强者的本能。——萧伯纳php
最近在学习消息中间件——RocketMQ,打算把这个学习过程记录下来。此章主要介绍环境搭建。这次主要是单机搭建(条件有限),包括在Windows、Linux环境下的搭建,以及console监控平台搭建,最后加一demo验证一下。html
在搭建RocketMQ以前,请先确保以下环境已经搭建完毕java
没有搭建的同窗走传送门:linux
JDK环境搭建: JAVA8环境搭建 Maven环境搭建: Windows环境下使用Nexus 3.X 搭建Maven私服及使用介绍 Git环境搭建:Git环境搭建及配置git
官方网站:rocketmq.apache.org/github
以上操做完毕以后,进入目录bin目录,我这里是 H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin
。 找到runserver.cmd
和runbroker.cmd
中的JAVA_OPT。 web
set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
复制代码
将 Xms Xmx 这两个值改小一些,改成1g,如:spring
set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
复制代码
本身根据虚拟机内存大小设置,超出内存大小可能会报错。apache
上述步骤执行完毕后,咱们须要将RocketMQ安装目录的bin目录配置到环境变量中。编程
以上配置都完成,接下来就是启动过程。中间有点坑,请务必按步骤安装。
在RocketMQ安装目录的bin目录下,执行命令cmd:
个人目录:
H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin
复制代码
能够经过shift+鼠标右击 触发cmd窗口选项。也能够经过win+R 在窗口输入cmd,进入cmd窗口后移动到bin目录下。
成功后会弹出提示框,此框勿关闭。
注意:假如弹出提示框提示‘错误: 找不到或没法加载主类 xxxxxx’。打开runbroker.cmd,而后将‘%CLASSPATH%’加上英文双引号。
runbroker.cmd
进行修改 原:
set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"
复制代码
修改后:
set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""
复制代码
下载完后如图所示:选择——>rocketmq-console
下载完成以后,进入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夹,打开‘application.properties’进行配置。
进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成。中间有个比较慢的下载过程须要等待。
访问地址:localhost:8082
下载JDK:www.oracle.com/technetwork…
下载须要的版本:
上传到建立的目录/usr/java
解压命令
tar -zxvf jdk-8u181-linux-x64.tar.gz
复制代码
配置环境变量命令
vim /etc/profile
JAVA_HOME=/usr/java/jdk1.8.0_161
JRE_HOME=/usr/java/jdk1.8.0_161/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH
source /etc/profile
复制代码
验证是否成功命令
java -version
复制代码
按照以上操做,完成JDK的安装。接下来安装Maven环境。
wget http://mirror.bit.edu.cn/apache/maven/binaries/apache-maven-3.2.2-bin.tar.gz
复制代码
tar -zxvf apache-maven-3.2.2-bin.tar.gz
复制代码
vim /etc/profile
#配置maven环境变量
export MAVEN_HOME=/usr/maven/apache-maven-3.5.4
export MAVEN_HOME
export PATH=$PATH:$MAVEN_HOME/bin
source /etc/profile
复制代码
mvn -v
复制代码
wget http://mirrors.hust.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-source-release.zip
复制代码
unzip rocketmq-all-4.4.0-source-release.zip
复制代码
进入解压后的文件目录。
mvn -Prelease-all -DskipTests clean install -U
复制代码
同Windows环境同样,修改JVM配置。 移动到目录 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq/bin
中。编辑bin目录下runserver.sh
与 runbroker.sh
文件。
根据我的虚拟机大小进行修改
vim runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"
复制代码
分别执行以下命令:
#修改环境变量
vim /etc/profile
export ROCKETMQ=/home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq
export PATH=$PATH:$ROCKETMQ/bin
#更新配置
source /etc/profile
复制代码
依然在以前的目录 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq
##启动命令
nohup sh bin/mqnamesrv >/dev/null 2>&1 &
##查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
复制代码
能够看图已经成功了!
##启动命令
nohup sh bin/mqbroker -n localhost:9876 &
##查看日志
tail -f ~/logs/rocketmqlogs/broker.log
复制代码
注意防火墙,若是端口链接失败,注意开通。
sh bin/mqshutdown broker //中止 broker
sh bin/mqshutdown namesrv //中止 nameserver
复制代码
同Windows平台搭建
我这里直接将Windows平台打包好的jar包直接丢到了Linux系统中
java -jar rocketmq-console-ng-1.0.1.jar
复制代码
访问地址:http://192.168.220.72:8082
这里不作过多介绍,能够参考如下文章
其余博客地址:guozh.net/rocketmqzhi…
案例整合环境:SpringBoot环境 案例来源于网络
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.coderprogramming.rocketmq</groupId>
<artifactId>rocketmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
复制代码
**
 * @Description: 生产者
 * @author Coder编程
 * @date 2019/5/8 17:08
 */
@Component
public class Producer {
/** * 生产者的组名 */
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
/** * NameServer 地址 */
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
public void orderedProducer() throws MQClientException, InterruptedException {
/** * 一个应用建立一个Producer,由应用来维护此对象,能够设置为全局对象或者单例 * 注意:ProducerGroupName须要由应用来保证惟一 * ProducerGroup这个概念发送普通的消息时,做用不大,可是发送分布式事务消息时,比较关键, * 由于服务器会回查这个Group下的任意一个Producer */
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
/** * Producer对象在使用以前必需要调用start初始化,初始化一次便可 注意:切记不能够在每次发送消息时,都调用start方法 */
producer.start();
/** * 下面这段代码代表一个Producer对象能够发送多个topic,多个tag的消息。 * 注意:send方法是同步调用,只要不抛异常就标识成功。可是发送成功也可会有多种状态 * 例如消息写入Master成功,可是Slave不成功,这种状况消息属于成功,可是对于个别应用若是对消息可靠性要求极高, * 须要对这种状况作处理。另外,消息可能会存在发送失败的状况,失败重试由应用来处理。 */
try {
for (int i = 0; i < 10; i++) {
Message msg = new Message("Topic1",// topic
"TagA",// tag
"001",// key
("Send Msg:Hello MetaQ1").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
Message msg2 = new Message("Topic2",// topic
"TagB",// tag
"002",// key
("Send Msg:Hello MetaQ2").getBytes());// body
SendResult sendResult2 = producer.send(msg2);
System.out.println(sendResult2);
Message msg3 = new Message("Topic3",// topic
"TagC",// tag
"003",// key
("Send Msg:Hello MetaQ3").getBytes());// body
SendResult sendResult3 = producer.send(msg3);
System.out.println(sendResult3);
}
} catch (Exception e) {
e.printStackTrace();
}
/** * 应用退出时,要调用shutdown来清理资源,关闭网络链接,从MetaQ服务器上注销本身 * 注意:咱们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法 */
producer.shutdown();
}
}
复制代码
/**  * @Description: 消费者  * @author Coder编程  * @date 2019/5/8 17:08  */
@Component
public class Consumer {
/** * 生产者的组名 */
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
/** * NameServer 地址 */
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
/** * 当前例子是PushConsumer用法,使用方式给用户感受是消息从RocketMQ服务器推到了应用客户端。 * 可是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,而后再回调用户Listener方法 */
public void orderedConsumer() throws InterruptedException,MQClientException {
/** * 一个应用建立一个Consumer,由应用来维护此对象,能够设置为全局对象或者单例 * 注意:ConsumerGroupName须要由应用来保证惟一 */
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);
// consumer.setNamesrvAddr("10.10.0.102:9876");
consumer.setNamesrvAddr(namesrvAddr);
/** * 订阅指定topic下tags分别等于TagA或TagC或TagD */
consumer.subscribe("Topic1", "TagA || TagC || TagD");
/** * 订阅指定topic下全部消息<br> * 注意:一个consumer对象能够订阅多个topic */
consumer.subscribe("Topic2", "*");
consumer.subscribe("Topic3", "*");
/** * 设置Consumer第一次启动是从队列头部开始消费仍是队列尾部开始消费 若是非第一次启动,那么按照上次消费的位置继续消费 */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
/** * 默认msgs里只有一条消息,能够经过设置consumeMessageBatchMaxSize参数来批量接收消息 */
@Override
public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("Topic1")) {
if (null != msg.getTags()) {
// 执行Topic1的消费逻辑
if (msg.getTags().equals("TagA")) {
// 执行TagA的消费
System.out.println("TagA开始。");
} else if (msg.getTags().equals("TagC")) {
System.out.println("TagC开始。");
// 执行TagC的消费
} else if (msg.getTags().equals("TagD")) {
// 执行TagD的消费
System.out.println("TagD开始。");
}
}
} else if (msg.getTopic().equals("Topic2")) {
// 执行Topic2的消费逻辑
System.out.println("Topic2");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/** * Consumer对象在使用以前必需要调用start初始化,初始化一次便可 */
consumer.start();
System.out.println("Consumer Started.");
}
}
复制代码
# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=192.168.220.72:9876
# 设置应用端口
server.port=8089
复制代码
/** * @author Coder编程 * @Title: HelloWord * @ProjectName rocketmq * @Description: Hello World * @date 2019/5/814:14 */
@RestController
public class Test {
@Autowired
private Producer producer;
@Autowired
private Consumer consumer;
@RequestMapping("/test")
public String testMQ2() {
try {
System.out.println("-----------------开始生产-----------------");
producer.orderedProducer();
System.out.println("-----------------开始消费-----------------");
consumer.orderedConsumer();
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}
}
复制代码
以上安装jar包和案例测试源码已经上传至GitHub/Gitee
欢迎关注公众号:Coder编程 获取最新原创技术文章和相关免费学习资料,随时随地学习技术知识!