java使用rocketMq

RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件,号称消息中间件中的最强者,支持高并发,亿级的消息堆积能力,在高并发的电商,金融等业务场景中多有使用。其具备如下特性:java

支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
支持拉(pull)和推(push)两种消息模式
单一队列百万消息的堆积能力
支持多种消息协议,如 JMS、MQTT 等
分布式高可用的部署架构,知足至少一次消息传递语义
提供 docker 镜像用于隔离测试和云集群部署
提供配置、指标和监控等功能丰富的 Dashboard
对于这些特性描述,你们简单过一眼就便可,深刻学习以后天然就明白了。mysql

专业术语git

Producer
消息生产者,生产者的做用就是将消息发送到 MQ,生产者自己既能够产生消息,如读取文本信息等。也能够对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。github

Producer Group
生产者组,简单来讲就是多个发送同一类消息的生产者称之为一个生产者组。在这里能够不用关心,只要知道有这么一个概念便可。web

Consumer
消息消费者,简单来讲,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,仍是直接存储到数据库等取决于业务须要。spring

Consumer Group
消费者组,和生产者相似,消费同一类消息的多个 consumer 实例组成一个消费者组。sql

Topic
Topic 是一种消息的逻辑分类,好比说你有订单类的消息,也有库存类的消息,那么就须要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。docker

Message
Message 是消息的载体。一个 Message 必须指定 topic,至关于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端能够基于 tag 进行过滤消息。也能够添加额外的键值对,例如你须要一个业务 key 来查找 broker 上的消息,方便在开发过程当中诊断问题。数据库

Tag
标签能够被认为是对 Topic 进一步细化。通常在相同业务模块中经过引入标签来标记不一样用途的消息。apache

Broker
Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求作好准备。

Name Server
Name Server 为 producer 和 consumer 提供路由信息。

这里写图片描述

由这张图能够看到有四个集群,分别是 NameServer 集群、Broker 集群、Producer 集群和 Consumer 集群:

NameServer: 提供轻量级的服务发现和路由。 每一个 NameServer 记录完整的路由信息,提供等效的读写服务,并支持快速存储扩展。
Broker: 经过提供轻量级的 Topic 和 Queue 机制来处理消息存储,同时支持推(push)和拉(pull)模式以及主从结构的容错机制。
Producer:生产者,产生消息的实例,拥有相同 Producer Group 的 Producer 组成一个集群。
Consumer:消费者,接收消息进行消费的实例,拥有相同 Consumer Group 的
Consumer 组成一个集群。
简单说明一下图中箭头含义,从 Broker 开始,Broker Master1 和 Broker Slave1 是主从结构,它们之间会进行数据同步,即 Date Sync。同时每一个 Broker 与
NameServer 集群中的全部节
点创建长链接,定时注册 Topic 信息到全部 NameServer 中。

Producer 与 NameServer 集群中的其中一个节点(随机选择)创建长链接,按期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 创建长链接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master,可是 Consumer 则不同,它同时和提供 Topic 服务的 Master 和 Slave
创建长链接,既能够从 Broker Master 订阅消息,也能够从 Broker Slave 订阅消息。

RocketMQ 集群部署模式

单 master 模式
也就是只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用,适合我的学习使用。

多 master 模式
多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。
优势:全部模式中性能最高
缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复以前不可用,消息的实时性就受到影响。
注意:使用同步刷盘能够保证消息不丢失,同时 Topic 相对应的 queue 应该分布在集群中各个节点,而不是只在某各节点上,不然,该节点宕机会对订阅该 topic 的应用形成影响。

多 master 多 slave 异步复制模式
在多 master 模式的基础上,每一个 master 节点都有至少一个对应的 slave。master
节点可读可写,可是 slave 只能读不能写,相似于 mysql 的主备模式。
优势: 在 master 宕机时,消费者能够从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 同样。
缺点:使用异步复制的同步方式有可能会有消息丢失的问题。

多 master 多 slave 同步双写模式
同多 master 多 slave 异步复制模式相似,区别在于 master 和 slave 之间的数据同步方式。
优势:同步双写的同步模式能保证数据不丢失。
缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右。
刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步仍是异步存储)
同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步)
注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其余方式低。

因为本篇属于入门演示篇,为使用方便,这里在centos7.0下搭建了单节点的rocketmq,rocketmq的搭建过程不作具体的演示,比较简单,若是须要深刻探究其中的内容,能够参考官方文档,针对每一处的配置文件能够进行深刻研究,那就上升到运维的高度了,

首先启动rocketmq,rocketmq启动分为两步,下面罗列主要的流程:

一、安装前提条件(推荐)
64bit OS, Linux/Unix/Mac
64bit JDK 1.8+;
二、快速开始 http://rocketmq.apache.org/docs/quick-start/
下载安装包:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip

三、解压压缩包

1)进入bin目录,启动namesrv  nohup sh mqnamesrv & 

    2) 查看日志 tail -f nohup.out
    结尾:The Name Server boot success. serializeType=JSON 表示启动成功

    3)、启动broker   
        nohup sh mqbroker -n 192.168.2.203:9876 &
        nohup sh mqbroker -n 192.168.2.203:9876 autoCreateTopicEnable=true &

    4)、关闭nameserver broker执行的命令
        sh mqshutdown namesrv
        sh mqshutdown broker

启动完毕,查看进程信息:
这里写图片描述

NameServer 和 Broker都已经成功启动了

为了运维和学习方便,最好部署一下管控台,管控台的部署流程可参考官网流程,下面罗列我部署的主要流程:
一、下载 https://github.com/apache/rocketmq-externals
二、编译打包 mvn clean package -Dmaven.test.skip=true
三、target目录 经过java -jar的方式运行
四、没法链接获取broker信息
1)修改配置文件,名称路由地址为 namesrvAddr,例如我本机为
2)src/main/resources/application.properties
rocketmq.config.namesrvAddr=192.168.2.203:9876
五、默认端口 localhost:8080

这里写图片描述

看到这个界面,表面管控台部署成功了,新版的管控台4.X以上的版本,看起来比3.X的是否是要炫酷不少?一下子程序代码中运行之后,监控的相关信息会在管控台上看出来;

准备工做到此结束,接下来是代码的编写和使用啦;

1)建立一个maven工程:
这里写图片描述

pom依赖的jar包,

org.springframework.boot
spring-boot-starter-parent
2.0.1.RELEASE

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <rocketmq.version>4.1.0-incubating</rocketmq.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>1.3.2</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>

    <!-- 若是配置线程池则加入 -->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-pool</artifactId>
    </dependency>

    <!-- 整合RocketMq -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>${rocketmq.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-common</artifactId>
        <version>${rocketmq.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.1.0-incubating</version>
    </dependency>

     <!--简化javabean相关操做-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <scope>provided</scope>
    </dependency>

    <!-- rabbitmq -->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.3.0</version>
    </dependency>

    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.5</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

</dependencies>

不须要的可根据本身的须要进行选择,

2)生产者代码:
public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

    //须要一个producer group名字做为构造方法的参数,这里为producer1
    DefaultMQProducer producer = new DefaultMQProducer("producer1");

    //设置NameServer地址,此处应改成实际NameServer地址,多个地址之间用;分隔
    //NameServer的地址必须有,可是也能够经过环境变量的方式设置,不必定非得写死在代码里
    producer.setNamesrvAddr("192.168.2.203:9876");
    producer.setVipChannelEnabled(false);

    //为避免程序启动的时候报错,添加此代码,可让rocketMq自动建立topickey
    producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
    producer.start();

    for(int i=0;i<10;i++){
        try {
            Message message = new Message("TopicTest", "Tag1", 
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult sendResult = producer.send(message);

            System.out.println("发送的消息ID:" + sendResult.getMsgId() +"--- 发送消息的状态:" + sendResult.getSendStatus());
        } catch (Exception e) {
             e.printStackTrace();
             Thread.sleep(1000);
        }
    }

    producer.shutdown();

}

}

3)消费者代码:

public class Consumer {

private static final String ADDR = "192.168.2.203:9876";

public static void main(String[] args) throws MQClientException {
    //设置消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_LRW_DEV_SUBS");

    consumer.setVipChannelEnabled(false);
    consumer.setNamesrvAddr(ADDR);
    //设置消费者端消息拉取策略,表示从哪里开始消费
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

    //设置消费者拉取消息的策略,*表示消费该topic下的全部消息,也能够指定tag进行消息过滤
    consumer.subscribe("TopicTest", "*");

    //消费者端启动消息监听,一旦生产者发送消息被监听到,就打印消息,和rabbitmq中的handlerDelivery相似
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                String topic = messageExt.getTopic();
                String tag = messageExt.getTags();
                String msg = new String(messageExt.getBody());
                System.out.println("*********************************");
                System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ",  msgBody : " + msg + ", tag:" + tag + ", topic:" + topic);
                System.out.println("*********************************");
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

    //调用start()方法启动consumer
    consumer.start();
    System.out.println("Consumer Started....");
}

}

而后,首先启动消费者,
这里写图片描述

再启动生产者,模拟发送10条消息:

这里写图片描述

能够看到控制台下的消息,显示生产者发送消息成功,这时候再看消费者:

这里写图片描述

能够看到消费者端也成功收到了这10条消息,每条消息都会存在一个msgId,所以实际业务中,能够据此回溯查到每一条消息,保证消息的较低的丢失率;

到此,rocketMqd的基本用法就结束了,实际业务场景远比这个复杂,但基本的思路能够进行迁移和使用,里面涉及到的具体细节能够在此基础上继续拓展,好比消息的确认机制,消息失败的重试策略的设置等,欢迎观看。