RabbitMQ详解

消息队列:RabbitMQ

全名为:Message Queuejava

消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。就是一个先进先出的队列,只是队列中存放的是message而已,由于消息的生产和消费都是异步的,并且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。git

常见的MQ产品

ActiveMQ:基于JMSspring

RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好apache

RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会vim

Kafka:分布式消息系统,高吞吐量centos

  • 如上所说的JMS和AMQP:数组

    • MQ是消息通讯的模型,并发具体实现。如今实现MQ的有两种主流方式:AMQP、JMS,具体百度浏览器

    • JMS是定义了统一的接口,来对消息操做进行统一;AMQP是经过规定协议来统一数据交互的格式缓存

    • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,所以是跨语言的。并发

    • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

RabbitMQ环境搭建及相关设置

安装Erlang

yum install esl-erlang_17.3-1~centos~6_amd64.rpm
yum install esl-erlang-compat-R14B-1.el6.noarch.rpm

安装RabbitMQ

首先安装包下载并上传:连接:https://pan.baidu.com/s/1XM24RprcaXMAFHPdctkEIw 提取码:1490

我是上传到 /usr/local/rabbitmq/ ,大家随意;

进入到安装包上传目录:

rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

修改配置文件

#将默认的配置文件模版 复制到 etc目录下
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
#编辑配置问价
vim /etc/rabbitmq/rabbitmq.config

  

注意:打开注解,删掉末尾的逗号,保存退出便可;

chkconfig rabbitmq-server on   #设置为开机启动
service rabbitmq-server start   #启动服务
service rabbitmq-server stop    #关闭服务
service rabbitmq-server restart #服务重启

开启Web管理页面

rabbitmq-plugins enable rabbitmq_management   #经过命令开启
service rabbitmq-server restart              # 服务重启,配置生效

端口是15672,自行开放,我是直接关闭了防火墙的;

下面咱们咱们既能够王文Web管理页面:帐号密码默认为:guest

浏览器没有弹出翻译页面,咱们自翻译

  

  • connections:不管生产者仍是消费者,都须要与RabbitMQ创建链接后才能够完成消息的生产和消费,在这里能够查看链接状况

  • channels:通道,创建链接后,会造成通道,消息的投递获取依赖通道。

  • Exchanges:交换机,用来实现消息的路由

  • Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。

  

用户的添加

  

用户的角色指定,对应不一样权限:

  • 超级管理员(administrator)

    可登录管理控制台,可查看全部的信息,而且能够对用户,策略(policy)进行操做。

  • 监控者(monitoring)

    可登录管理控制台,同时能够查看rabbitmq节点的相关信息(进程数,内存使用状况,磁盘使用状况等)

  • 策略制定者(policymaker)

    可登录管理控制台, 同时能够对policy进行管理。但没法查看节点的相关信息(上图红框标识的部分)。

  • 普通管理者(management)

    仅可登录管理控制台,没法看到节点信息,也没法对策略进行管理。

  • 其余

    没法登录管理控制台,一般就是普通的生产者和消费者。

建立虚拟主机

RabbitMQ为了实现每一个用户互不干扰,经过虚拟主机的方式,不一样用户使用不一样的路径,各自有各自的队列、交换机

  

虚拟机就建立好了,而后咱们能够给用户分配权限:

消息模型—基本模型

RabbitMQ提供了6种消息模型,可是第6种实际上是RPC,并非MQ,咱们就说说前面五种消息模型

基本的消息模型:

 

P:消息生产者

C:消息消费者

queue:消息队列,消费者投递消息,消费者取出消息并消费

  •  <!--RabbitMQ-->
     <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
           <version>2.1.4.RELEASE</version>
     </dependency>
  • java的链接MQ工具类:

  • public class ConnectionUtil { //创建与RabbitMQ的链接
     public static Connection getConnection() throws Exception { //定义链接工厂
         ConnectionFactory factory = new ConnectionFactory(); //设置服务地址
         factory.setHost("192.168.159.159"); //端口
         factory.setPort(5672); //设置帐号信息,用户名、密码、vhost
         factory.setVirtualHost("/new1"); factory.setUsername("/admin"); factory.setPassword("admin"); // 经过工程获取链接
         Connection connection = factory.newConnection(); return connection; } }

生产者发送消息

package com.mq.start; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /**生产者**/
public class send { //肯定队列的标识
    private final static String QUEUE_NAME = "simple_queue"; ​ public static void main(String[] argv) throws Exception { // 获取到链接
        Connection connection = connectionUtils.getConnection(); // 从链接中建立通道,使用通道才能完成消息相关的操做
        Channel channel = connection.createChannel(); // 声明(建立)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息内容
        String message = "Hello World!"; // 向指定的队列中发送消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); ​ System.out.println(" [服务提供者] Send '" + message + "'"); ​ //关闭通道和链接
 channel.close(); connection.close(); } }

这个时候咱们切换到刚刚建立的用户上 /admin 上查看信息:

  

消费者获取消息:

package com.mq.start; ​ import com.rabbitmq.client.*; import java.io.IOException; ​ public class get { //队列name 要达成通讯 必须和发送的队列name 一致
    private final static String QUEUE_NAME = "simple_queue"; ​ public static void main(String[] argv) throws Exception { // 获取到链接
        Connection connection = connectionUtils.getConnection(); // 建立通道
        Channel channel = connection.createChannel(); // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,而且处理,这个方法相似事件监听,若是有消息的时候,会被自动调用
 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { // body 即消息体
                String msg = new String(body); System.out.println(" [服务消费者] get : " + msg + "!"); } }; // 监听队列,第二个参数:是否自动进行消息确认。
        channel.basicConsume(QUEUE_NAME, true, consumer); } }

控制台打印:由于我发送了两次

 

再次查看Web管理页面,没有消息了:

消费者的消息确认机制

通过刚刚的小Demo,我能发现一旦消息从队列中被消费者拉取消费后,队列中的消息就会删除,

这里就涉及到一个MQ是经过消息确认机制知道消息什么时候被消费,当消费者获取到信息后,回想MQ返回一个ACK回执告知已被接受,能够删除。不过ACK回执分问两种状况:

  • 手动ACK:消息接收后,通常在消费者消费掉该消息后手动发送ACK

  • 自动ACK:消息接受后当即就会自动发送ACK

至于如何选择:根据信息的重要程度区分

  • 消息不过重要,即便丢失影响也不大,自动ACK比较巴适

  • 消息很重要,不容许丢失,那就等咱们消费者消费完这个信息后手动发送回执

java实现:部分实现

DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException { String msg = new String(body); System.out.println(" [服务消费者] get1 : " + msg + "!"); //在消息消费完后,手动发送ACK回执给MQ
                channel.basicAck(envelope.getDeliveryTag(), false); } }; // 监听队列,第二个参数:是否自动进行消息确认。
        channel.basicConsume(QUEUE_NAME, false, consumer); }

消息模型—work消息模型 [ 任务模型 ]

当消息处理比较耗时的时候,可呢个生产消息的速度回远远大于消息的消费速度,随着时间的推移,队列中的消息就会堆积如山没法及时的处理,此时work模型横空出世,让多个消费者绑定到一个队列上,共同消费同一个队列中的消息

  • 一个生产者,一个队列,两个或者更多的消费者

消息的生产者:连续发送50个消息去队列

package com.mq.start.work模型; ​ import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ​ public class send {
private final static String QUEUE_NAME = "test_work_queue"; public static void main(String[] argv) throws Exception { // 获取到链接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 循环发布任务 for (int i = 0; i < 50; i++) { // 消息内容 String message = "task .. " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); ​ Thread.sleep(i * 2); } // 关闭通道和链接 channel.close(); connection.close(); } }

两个消费者:一次只能处理接收一个消息处理:

package com.mq.start.work模型; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * get1消费者有Thread.sleep(1000),模拟更耗时 */
public class get1 { private final static String QUEUE_NAME = "test_work_queue"; ​ public static void main(String[] argv) throws Exception { // 获取到链接
        Connection connection = connectionUtils.getConnection(); // 获取通道
        final Channel channe1 = connection.createChannel(); // 声明队列
        channe1.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channe1) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体
                String msg = new String(body); System.out.println(" [消费者1] get : " + msg + "!"); // 手动ACK
                channe1.basicAck(envelope.getDeliveryTag(), false); try { //模拟这个消费者消费一个消息很耗时
                    Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; // 监听队列。
        channe1.basicConsume(QUEUE_NAME, false, consumer); }
package com.mq.start.work模型; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; /** * get2处理比价快 */
public class get2 { private final static String QUEUE_NAME = "test_work_queue"; ​ public static void main(String[] argv) throws Exception { // 获取到链接
        Connection connection = connectionUtils.getConnection(); // 获取通道
        final Channel channe2 = connection.createChannel(); // 声明队列
        channe2.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channe2) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体
                String msg = new String(body); System.out.println(" [消费者2] get : " + msg + "!"); // 手动ACK
                channe2.basicAck(envelope.getDeliveryTag(), false); } }; // 监听队列。
        channe2.basicConsume(QUEUE_NAME, false, consumer); } }

舒适提示:优先启动两个消费者,随后再启动消息发布者

而后咱们看下面的控制台:get1慢吞吞的在消费,get2快速的消费完便在休息了,一人消费一半

  • 在上面这种状况下,消费者get1的消费效率是要比消费者get2的效率要低的

  • 但是两个消费者最终的消费消费的信息数量确实同样的,是任务均分的;

  • 消费者get1一直在忙碌于消费,消费者get2处理完分配的一半后便处于空闲状态

能者多劳

消费者同一时间只会接受一条消息,在处理完以前不会接新的消息,让处理快的人接受更多的消息:

两个消费者都修改设置以下:

// 设置每一个消费者同时只能处理一条消息
  channel.basicQos(1);

让咱们看看效果如何:

消息模型—订阅模型

示意图:

  

  • P:生产者,发送消息给X(交换机)

  • Exchange:交换机,图中的X。接收生产者发送的消息。知道如何处理消息,例如递交给某个特别队列、递交给全部队列、或是将消息丢弃。到底如何操做,取决于Exchange的类型。Exchange有如下3种类型:

    • Fanout:广播,将消息交给全部绑定到交换机的队列

    • Direct:定向,把消息交给符合指定routing key 的队列

    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

  • Queue:消息队列,接收消息、缓存消息。

  • C:消费者,消息的消费者,会一直等待消息到来。

注意:交换机只负责转发消息,不具有消息储存的能力,若是没有队列与其进行对接,消息会丢失

消息模型—订阅模型—广播 [ Fanout ]

流程图:

  

在广播模式下,消息发送流程是这样的:

  • 1) 能够有多个消费者

  • 2) 每一个消费者有本身对接的queue(队列)

  • 3) 每一个队列都要对接到Exchange(交换机)

  • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪一个队列,生产者没法决定。

  • 5) 交换机把消息发送给绑定过的全部队列

  • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

生产者:

package com.mq.start.订阅模型_广播; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ​ public class send { private final static String EXCHANGE_NAME = "fanout_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到链接
        Connection connection = connectionUtils.getConnection(); // 获取通道
        Channel channel = connection.createChannel(); ​ // 声明exchange,指定类型为fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); ​ // 消息内容
        String message = "四川新闻广播电视台为你播报:今天..."; // 发布消息到Exchange
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [生产者] Send '" + message + "'"); ​ channel.close(); connection.close(); } }

消费者1:

package com.mq.start.订阅模型_广播; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; ​ public class get1 { private final static String QUEUE_NAME = "fanout_queue_1"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到链接
        Connection connection = connectionUtils.getConnection(); // 获取通道
        Channel channel = connection.createChannel(); // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); ​ // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,而且处理,这个方法相似事件监听,若是有消息的时候,会被自动调用
 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体
                String msg = new String(body); System.out.println(" [消费者1] received : " + msg + "!"); } }; // 监听队列,自动返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer); } }

消费者2:

package com.mq.start.订阅模型_广播; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; ​ public class get2 { private final static String QUEUE_NAME = "fanout_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到链接
        Connection connection = connectionUtils.getConnection(); // 获取通道
        Channel channel = connection.createChannel(); // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); ​ // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,而且处理,这个方法相似事件监听,若是有消息的时候,会被自动调用
 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体
                String msg = new String(body); System.out.println(" [消费者2] received : " + msg + "!"); } }; // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, true, consumer); } }

而后查看控制台:一条消息被全部订阅的队列都消费

消息模型—订阅模型— [ Direct ]

广播是一条消息被全部与交换机对接的队列都消费,但有时候,咱们想不一样的信息被不一样的队列说消费,这是就要用到Direct类型的交换机

  

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

  • X:Exchange(交换机),接收生产者的消息,而后把消息递交给 与routing key彻底匹配的队列

  • C1:消费者,其所在队列指定了须要routing key 为 error 的消息

  • C2:消费者,其所在队列指定了须要routing key 为 info、error、warning 的消息

在Direct模式下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange再也不把消息交给每个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key彻底一致,才会接收到消息

消息生产者:

package com.mq.start.订阅模型_Direct; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 咱们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete */
public class send { private final static String EXCHANGE_NAME = "direct_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到链接
        Connection connection = connectionUtils.getConnection(); // 获取通道
        Channel channel = connection.createChannel(); // 声明exchange,指定类型为direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息内容
        String message = "商品新增, id = 1001"; // 发送消息,而且指定routing key 为:insert ,表明新增商品
        channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes()); System.out.println(" [商品服务:] Send '" + message + "'"); channel.close(); connection.close(); } }

记住咱们的roting key 是insert噢!

消息消费者1:get1 ,他能接受routing key 为 "update"、"delete"的消息

package com.mq.start.订阅模型_Direct; ​import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; ​import java.io.IOException; ​ public class get1 {
private final static String QUEUE_NAME = "direct_exchange_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到链接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 绑定队列到交换机,同时指定须要订阅的routing key。假设此处须要update和delete消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); ​ // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,而且处理,这个方法相似事件监听,若是有消息的时候,会被自动调用 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [消费者1] git : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }

消息消费者2:get2,他能接受routing key 为 "insert"、"update"、"delete" 的消息

package com.mq.start.订阅模型_Direct; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; ​ public class get2 { private final static String QUEUE_NAME = "direct_exchange_queue_2"; private final static String EXCHANGE_NAME = "direct_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到链接
        Connection connection = connectionUtils.getConnection(); // 获取通道
        Channel channel = connection.createChannel(); // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 绑定队列到交换机,同时指定须要订阅的routing key。订阅 insert、update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); ​ // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,而且处理,这个方法相似事件监听,若是有消息的时候,会被自动调用
 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体
                String msg = new String(body); System.out.println(" [消费者2] get : " + msg + "!"); } }; // 监听队列,自动ACK
        channel.basicConsume(QUEUE_NAME, true, consumer); } }

咱们分别设置routing key 为 insert、update、delete,逐一测试:

其余咱们就执行测试吧,我就测了routing key为insert,最终被get2所消费

消息模型—订阅模型— [ Topic ]

Topic类型yuDirect相比,其实差很少的,都是根据rounting key 把消息路由到不一样的队列,就是Topic类型的交换机支在匹配的时候支持rounting key的通配符

通配符规则:

#:匹配一个或多个词

*:匹配很少很多刚好1个词

举例:

audit.#:可以匹配audit.irs.corporate或者 audit.irs

audit.*:只能匹配audit.irs

消息生产者:Rounting key 为: item.insert / update / delete

package com.mq.start.订阅模型_Topic; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; ​ public class send {
private final static String EXCHANGE_NAME = "topic_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到链接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明exchange,指定类型为topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息内容 String message = "新增商品 : id = 1001"; // 发送消息,而且指定routing key 为:insert ,表明新增商品 channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes()); System.out.println(" [商品服务:] Send '" + message + "'"); ​ channel.close(); connection.close(); } }

消息消费者1:get1,匹配的Rounting Key为 item.update / delete

package com.mq.start.订阅模型_Topic; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; import java.io.IOException; ​ public class get1 { private final static String QUEUE_NAME = "topic_exchange_queue_1"; private final static String EXCHANGE_NAME = "topic_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到链接
        Connection connection = connectionUtils.getConnection(); // 获取通道
        Channel channel = connection.createChannel(); // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 绑定队列到交换机,同时指定须要订阅的routing key。须要 update、delete
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); ​ // 定义队列的消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,而且处理,这个方法相似事件监听,若是有消息的时候,会被自动调用
 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体
                String msg = new String(body); System.out.println(" [Get1] get : " + msg + "!"); } }; // 监听队列,自动ACK
        channel.basicConsume(QUEUE_NAME, true, consumer); } }

消息消费者2:get2 ,经过通配符的方式,消费全部item打头,后拼一个单词的全部消息

package com.mq.start.订阅模型_Topic; import com.mq.start.utils.connectionUtils; import com.rabbitmq.client.*; ​import java.io.IOException; ​ public class get2 {
private final static String QUEUE_NAME = "topic_exchange_queue_2"; private final static String EXCHANGE_NAME = "topic_exchange_test"; ​ public static void main(String[] argv) throws Exception { // 获取到链接 Connection connection = connectionUtils.getConnection(); // 获取通道 Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); ​ // 绑定队列到交换机,同时指定须要订阅的routing key。订阅 insert、update、delete channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*"); ​ // 定义队列的消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 获取消息,而且处理,这个方法相似事件监听,若是有消息的时候,会被自动调用 ​ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 即消息体 String msg = new String(body); System.out.println(" [get2] get : " + msg + "!"); } }; // 监听队列,自动ACK channel.basicConsume(QUEUE_NAME, true, consumer); } }

自测,效果一模一样

如何避免消息丢失

手动ACK

消费者的手动ACK机制,可有效的避免消息的丢失

消息持久化

若想支持消息持久化,队列和交换机都得持久化

交换机的持久化:

// 声明exchange,指定类型为topic,其后跟一个true参数目标是开启交换机的持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);

队列的持久化:

// 声明队列,第二个参数表示是否开启队列持久化
channe1.queueDeclare(QUEUE_NAME, false, false, false, null);

消息持久化:

// 发送消息,而且指定routing key 为:insert ,第三个参数表示开启信息持久化
channel.basicPublish(EXCHANGE_NAME, "item.update",MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

Spring AMQP

Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的惟一实现。底层使用的就是RabbitMQ。

依赖和配置:

pom.xml

 <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

application.yml

spring:
  rabbitmq:
    host: 192.168.159.159
    username: /admin
    password: admin
    virtual-host: /new1
    template:                           #有关Template的配置
      retry:                            #失败重试
        enabled: true                   #失败重试_开启失败重试
        initial-interval: 10000ms       #失败重试_第一次重试的间隔时长
        max-interval: 300000ms          #失败重试_最长重试间隔
        multiplier: 2                   #失败重试_下次重试间隔的倍速
      exchange: spring.test.exchange    #指定交换机,发送消息若不指定交换机就使用配置的交换机
    publisher-confirms: true            #生产者确认机制,确保消息正确发送,发送失败会有错误回执

服务的监听者:在SpringAMQP中,普通方法 + 注解,就能够成为一个消费者。

package com.mq.start.SpringAMQP; ​import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; ​ /** * 这是一个消费者 / 监听者 */ @Component @RabbitListener(queues = "spring.test.queue" ) public class Listener { ​ /** * - @Componet`:类上的注解,注册到Spring容器 * - `@RabbitListener`:方法上的注解,声明这个方法是一个消费者方法,须要指定下面的属性: * - `bindings`:指定绑定关系,能够有多个。值是`@QueueBinding`的数组。`@QueueBinding`包含下面属性: * - `value`:这个消费者关联的队列。值是`@Queue`,表明一个队列 * - `exchange`:队列所绑定的交换机,值是`@Exchange`类型 * - `key`:队列和交换机绑定的`RoutingKey` */ ​ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "spring.test.queue", durable = "true"), exchange = @Exchange( value = "spring.test.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC ), key = {"#.#"})) public void listen(String msg){ System.out.println("接收到消息:" + msg); } }

消息的发送者:AmqpTemplate

Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,很是方便的发送消息,其发送方法:

package com.mq.start.SpringAMQP; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; ​ @RunWith(SpringRunner.class) @SpringBootTest public class mqSend { ​ @Autowired private AmqpTemplate amqpTemplate; ​ @Test public void testSend() throws InterruptedException { String msg = "hello, Spring boot amqp"; this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg); // 等待10秒后再结束
        Thread.sleep(10000); } }

外加一个SpringBoot项目的启动类:

package com.mq.start; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; ​ @SpringBootApplication public class Run { public static void main(String[] args) { SpringApplication.run(Run.class, args); } }

内容就这么多,整合就算完成,咱们首先启动SpringBoot项目,而后启动测试类生产消息,消息的监听者自会监听到消息后处理:

相关文章
相关标签/搜索