基于springboot整合的rabbitmq

技术:springboot1.5.2 + maven3.0.5 + rabbitmq3.7.13 + jdk1.8
 

概述

RabbitMQ是对高级消息队列协议(Advanced Message Queueing Protocol, AMQP)的实现,RabbitMQ是消息传输的中间者,能够把它当作是一个消息代理,你把消息传送给它,它再把消息发送给具体的接收人。 这就像是邮局同样,你把邮件放入邮箱当中,邮件员会把邮件发送给你的收件人。不一样的是RabbitMQ是接受,存储和转发二进制数据块——消息。

详细

RabbitMQ官方解释:

消息系统容许软件、应用相互链接和扩展。这些应用能够相互连接起来组成一个更大的应用,或者将用户设备和数据html

 

进行链接。消息系统经过将消息的发送和接收分离来实现应用程序的异步和解偶。
咱们白话文的理解就是:是一个消息代理 - 一个消息系统的媒介。它能够为你的应用提供一个通用的消息发送和接收平台,而且保证消息在传输过程当中的安全。java

 


1、RabbitMQ模型简介
spring

AMQP 的工做过程以下图:消息(message)被发布者(publisher)发送给交换机(exchange),交换机经常被比喻成邮局或者邮箱。而后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。数据库

image.png

2、RabbitMQ 交换机:json

Name Default pre-declared names
Direct exchange (Empty string) and amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.match (and amq.headers in RabbitMQ)

1. 默认交换机:windows

default exchange其实是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每一个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
如:当你声明了一个名为"search-indexing-online"的队列,AMQP代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为"search-indexing-online"。所以,当携带着名为"search-indexing-online"的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为"search-indexing-online"的队列中。换句话说,默认交换机看起来貌似可以直接将消息投递给队列,尽管技术上并无作相关的操做。数组

2.Direct 直连交换机:安全

直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也能够处理多播路由)。springboot


image.png

 

 

3.fanout扇形交换机:服务器

扇型交换机(funout exchange)将消息路由给绑定到它身上的全部队列,而不理会绑定的路由键。若是N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这全部的N个队列。

image.png

 

4.topic 主题交换机:

主题交换机(topic exchanges)经过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机常常用来实现各类分发/订阅模式及其变种。主题交换机一般用来实现消息的多播路由(multicast routing)。

5.head交换机:

有时消息的路由操做会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键创建路由规则。经过判断消息头的值可否与指定的绑定相匹配来确立路由规则。

2、队列

AMQP中的队列(queue)跟其余消息队列或任务队列中的队列是很类似的:它们存储着即将被应用消费掉的消息。队列跟交换机共享某些属性,可是队列也有一些另外的属性。

  • Name

  • Durable(消息代理重启后,队列依旧存在)

  • Exclusive(只被一个链接(connection)使用,并且当链接关闭后队列即被删除)

  • Auto-delete(当最后一个消费者退订后即被删除)

  • Arguments(一些消息代理用他来完成相似与TTL的某些额外功能)

队列在声明(declare)后才能被使用。若是一个队列尚不存在,声明一个队列会建立它。若是声明的队列已经存在,而且属性彻底相同,那么这次声明不会对原有队列产生任何影响。若是声明中的属性与已存在队列的属性有差别,那么一个错误代码为406的通道级异常就会被抛出。

1.队列名称

队列的名字能够由应用(application)来取,也可让消息代理(broker)直接生成一个。队列的名字能够是最多255字节的一个utf-8字符串。若但愿AMQP消息代理生成队列名,须要给队列的name参数赋值一个空字符串:在同一个通道(channel)的后续的方法(method)中,咱们可使用空字符串来表示以前生成的队列名称。之因此以后的方法能够获取正确的队列名是由于通道能够默默地记住消息代理最后一次生成的队列名称。

以"amq."开始的队列名称被预留作消息代理内部使用。若是试图在队列声明时打破这一规则的话,一个通道级的403 (ACCESS_REFUSED)错误会被抛出。

 

2.队列持久化

持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称做暂存队列(Transient queues)。并非全部的场景和案例都须要将队列持久化。

持久化的队列并不会使得路由到它的消息也具备持久性。假若消息代理挂掉了,从新启动,那么在重启的过程当中持久化队列会被从新声明,不管怎样,只有通过持久化的消息才能被从新恢复。

3.绑定

绑定(Binding)是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。若是要指示交换机“E”将消息路由给队列“Q”,那么“Q”就须要与“E”进行绑定。绑定操做须要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。

打个比方:

  • 队列(queue)是咱们想要去的位于纽约的目的地

  • 交换机(exchange)是JFK机场

  • 绑定(binding)就是JFK机场到目的地的路线。可以到达目的地的路线能够是一条或者多条

拥有了交换机这个中间层,不少由发布者直接到队列难以实现的路由方案可以得以实现,而且避免了应用开发者的许多重复劳动。

若是AMQP的消息没法路由到队列(例如,发送到的交换机没有绑定队列),消息会被就地销毁或者返还给发布者。如何处理取决于发布者设置的消息属性。

 

4.消费者

消息若是只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才可以体现。在AMQP 模型中,有两种途径能够达到此目的:

  • 将消息投递给应用 ("push API")

  • 应用根据须要主动获取消息 ("pull API")

使用push API,应用(application)须要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,咱们能够说应用注册了一个消费者,或者说订阅了一个队列。一个队列能够注册多个消费者,也能够注册一个独享的消费者(当独享消费者存在时,其余消费者即被排除在外)。

每一个消费者(订阅者)都有一个叫作消费者标签的标识符。它能够被用来退订消息。消费者标签其实是一个字符串。

5.消息确认

消费者应用(Consumer applications) - 用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。并且网络缘由也有可能引发各类问题。这就给咱们出了个难题,AMQP代理在何时删除消息才是正确的?AMQP 0-9-1 规范给咱们两种建议:

  • 当消息代理(broker)将消息发送给应用后当即删除。(使用AMQP方法:basic.deliver或basic.get-ok)

  • 待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用AMQP方法:basic.ack)

前者被称做自动确认模式(automatic acknowledgement model),后者被称做显式确认模式(explicit acknowledgement model)。在显式模式下,由消费者应用来选择何时发送确认回执(acknowledgement)。应用能够在收到消息后当即发送,或将未处理的消息存储后发送,或等到消息被处理完毕后再发送确认回执(例如,成功获取一个网页内容并将其存储以后)。

若是一个消费者在还没有发送确认回执的状况下挂掉了,那AMQP代理会将消息从新投递给另外一个消费者。若是当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,而后再次尝试投递。

6.拒绝消息

当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用能够向消息代理代表,本条消息因为“拒绝消息(Rejecting Messages)”的缘由处理失败了(或者未能在此时完成)。当拒绝某条消息时,应用能够告诉消息代理如何处理这条消息——销毁它或者从新放入队列。当此队列只有一个消费者时,请确认不要因为拒绝消息而且选择了从新放入队列的行为而引发消息在同一个消费者身上无限循环的状况发生。

Negative Acknowledgements

在AMQP中,basic.reject方法用来执行拒绝消息的操做。但basic.reject有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。可是若是你使用的是RabbitMQ,那么你可使用被称做negative acknowledgements(也叫nacks)的AMQP 0-9-1扩展来解决这个问题。更多的信息请参考帮助页面

7.预取消息

在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每一个消费者一次能够接受多少条消息是很是有用的。这能够在试图批量发布消息的时候起到简单的负载均衡和提升消息吞吐量的做用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,若是生产应用每分钟才发送一条消息,这说明处理工做尚在运行。)

注意,RabbitMQ只支持通道级的预取计数,而不是链接级的或者基于大小的预取。

8.消息属性和有效载荷(消息主体)

AMQP模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以致于AMQP明确的定义了它们,而且应用开发者们无需费心思思考这些属性名字所表明的具体含义。例如:

  • Content type(内容类型)

  • Content encoding(内容编码)

  • Routing key(路由键)

  • Delivery mode (persistent or not)
    投递模式(持久化 或 非持久化)

  • Message priority(消息优先权)

  • Message publishing timestamp(消息发布的时间戳)

  • Expiration period(消息有效期)

  • Publisher application id(发布应用的ID)

有些属性是被AMQP代理所使用的,可是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称做消息头(headers)。他们跟HTTP协议的X-Headers很类似。消息属性须要在消息被发布的时候定义。

AMQP的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被AMQP代理看成不透明的字节数组来对待。消息代理不会检查或者修改有效载荷。消息能够只包含属性而不携带有效载荷。它一般会使用相似JSON这种序列化的格式数据,为了节省,协议缓冲器和MessagePack将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP及其同行者们一般使用"content-type" 和 "content-encoding" 这两个字段来与消息沟通进行有效载荷的辨识工做,但这仅仅是基于约定而已。

消息可以以持久化的方式发布,AMQP代理会将此消息存储在磁盘上。若是服务器重启,系统会确认收到的持久化消息未丢失。简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具备持久化性质:它彻底取决与消息自己的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能形成必定的影响(就像数据库操做同样,健壮性的存在一定形成一些性能牺牲)。

9.消息确认

因为网络的不肯定性和应用失败的可能性,处理确认回执(acknowledgement)就变的十分重要。有时咱们确认消费者收到消息就能够了,有时确认回执意味着消息已被验证而且处理完毕,例如对某些数据已经验证完毕而且进行了数据存储或者索引操做。

这种情形很常见,因此 AMQP 内置了一个功能叫作 消息确认(message acknowledgements),消费者用它来确认消息已经被接收或者处理。若是一个应用崩溃掉(此时链接会断掉,因此AMQP代理亦会得知),并且消息的确认回执功能已经被开启,可是消息代理还没有得到确认回执,那么消息会被重新放入队列(而且在还有还有其余消费者存在于此队列的前提下,当即投递给另一个消费者)。

协议内置的消息确认功能将帮助开发者创建强大的软件。

 

3、准备工做(windows10环境下的RabbitMQ安装步骤)

 

第一步:下载并安装erlang

  • 缘由:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。

  • 下载地址:http://www.erlang.org/downloads

image.png

 

  • 安装完事儿后要记得配置一下系统的环境变量。

此电脑-->鼠标右键“属性”-->高级系统设置-->环境变量-->“新建”系统环境变量

image.png

 

变量名:ERLANG_HOME

变量值就是刚才erlang的安装地址,点击肯定。

而后双击系统变量path

image.png

 

点击“新建”,将%ERLANG_HOME%\bin加入到path中。

最后windows键+R键,输入cmd,再输入erl,看到版本号就说明erlang安装成功了。

 

image.png

 

第二步:下载并安装RabbitMQ

下载地址:http://www.rabbitmq.com/download.html

image.png

 

双击下载后的.exe文件,安装过程与erlang的安装过程相同。

RabbitMQ安装好后接下来安装RabbitMQ-Plugins。打开命令行cd,输入RabbitMQ的sbin目录。

个人目录是:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.3\sbin

而后在后面输入rabbitmq-plugins enable rabbitmq_management命令进行安装

 

image.png

 

打开sbin目录,双击rabbitmq-server.bat


image.png

等几秒钟看到这个界面后,访问http://localhost:15672

而后能够看到以下界面

image.png

默认用户名和密码都是guest,登录便可。

4、程序实现

1.建立rabbitmqconfig配置文件类:

package com.zxh.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

@EnableRabbit
@Configuration
public class RabbitMqConfig {
    public static final String EXCHANGE = "spring.boot.direct";
    public static final String ROUTINGKEY_FAIL = "spring.boot.routingKey.failure";
    public static final String ROUTINGKEY = "spring.boot.routingKey";
    public static final String QUEUE_NAME = "spring.demo";
    public static final String QUEUE_NAME_FAIL = "spring.demo.failure";

    //RabbitMQ的配置信息
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private Integer port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;


    //创建一个链接容器,类型数据库的链接池
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);// 确认机制
//        connectionFactory.setPublisherReturns(true);
        //发布确认,template要求CachingConnectionFactory的publisherConfirms属性设置为true
        return connectionFactory;
    }

    // RabbitMQ的使用入口
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必须是prototype类型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
        template.setMessageConverter(this.jsonMessageConverter());
        template.setMandatory(true);
        return template;
    }

    /**
     * 交换机
     * 针对消费者配置
     * FanoutExchange: 将消息分发到全部的绑定队列,无routingkey的概念
     * HeadersExchange :经过添加属性key-value匹配
     * DirectExchange:按照routingkey分发到指定队列
     * DirectExchange:多关键字匹配
     */
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(EXCHANGE);
    }

    /**
     * 队列
     *
     * @return
     */
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true); //队列持久

    }
    @Bean
    public Queue queueFail() {
        return new Queue(QUEUE_NAME_FAIL, true); //队列持久

    }


    /**
     * 绑定
     *
     * @return
     */
    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue()).to(exchange()).with(RabbitMqConfig.ROUTINGKEY);
    }
    @Bean
    public Binding bindingFail(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queueFail()).to(exchange()).with(RabbitMqConfig.ROUTINGKEY_FAIL);
    }


    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

//    @Bean
//    public CharacterEncodingFilter characterEncodingFilter() {
//        CharacterEncodingFilter filter = new CharacterEncodingFilter();
//        filter.setEncoding("UTF-8");
//        filter.setForceEncoding(true);
//        return filter;
//    }

}

 

2.生产者推送消息

package com.zxh.service;

import com.zxh.config.RabbitMqConfig;
import com.zxh.pojo.User;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

import java.util.List;
import java.util.UUID;

@Service
public class UserService {
	
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private RabbitTemplate template;

    /**
     * 增长用户
     *
     */
    public boolean addPerson(User user) throws Exception {
        Assert.notNull(user, "添加对象信息不能为空");

        Assert.hasText(user.getUserId(), "添加对象信息用户编号不能为空");
        Assert.notNull(user.getAge(), "添加对象信息年龄不能为空");

        template.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY, user.toString());
//        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//            @Override
//            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//                if (!ack) {
//                    logger.info("send message failed: " + cause); //+ correlationData.toString());
//                    throw new RuntimeException("send error " + cause);
//                } else {
//                    logger.info("send to broke ok" + correlationData.getId());
//                }
//            }
//        });

        return true;
    }

    private Message buildMessage(User user) throws Exception {
        Message message = MessageBuilder.withBody(user.toString().getBytes())
                .setMessageId(UUID.randomUUID().toString()).setContentType("application/json").build();
        return message;
    }


}

 

3.消费者订阅消息

package com.zxh.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class UserTopicRecive {

	@RabbitListener(queues="spring.demo")
	public void process(String user) throws InterruptedException {
		System.out.println("TopicRecive1接受的消息: "+user);
	}
}

 

5、程序演示

image.png

image.png

6、项目结构图

 

image.png

 

7、小结 - RabbitMQ的工做流程介绍

 

一、创建信息。Publisher定义须要发送消息的结构和内容。

 

二、创建Conection和Channel。由Publisher和Consumer建立链接,链接到Broker的物理节点上,同时创建Channel。Channel是创建在Connection之上的,一个Connection能够创建多个Channel。Publisher链接Virtual Host 创建Channel,Consumer链接到相应的Queue上创建Channel。

 

三、声明交换机和队列。声明一个消息交换机(Exchange)和队列(Queue),并设置相关属性。

 

四、发送消息。由Publisher发送消息到Broker中的Exchange中

 

五、路由转发。RabbitMQ收到消息后,根据消息指定的Exchange(交换机) 来查找Binding(绑定) 而后根据规则(Routing Key)分发到不一样的Queue。这里就是说使用Routing Key在消息交换机(Exchange)和消息队列(Queue)中创建好绑定关系,而后将消息发送到绑定的队列中去。

 

六、消息接收。Consumer监听相应的Queue,一旦Queue中有能够消费的消息,Queue就将消息发送给Consumer端。

 

七、消息确认。当Consumer完成某一条消息的处理以后,须要发送一条ACK消息给对应的Queue。

 

Consumer收到消息时须要显式的向RabbitMQ Broker发送basic.ack消息或者Consumer订阅消息时设置auto_ack参数为true。

在通讯过程当中,队列对ACK的处理有如下几种状况:

 

若是Consumer接收了消息,发送ack,RabbitMQ会删除队列中这个消息,发送另外一条消息给Consumer。

若是Consumer接收了消息, 但在发送ack以前断开Channel,RabbitMQ会认为这条消息没有被deliver(递送),若是有其余的Channel,会该消息将被发送给另外的Channel。若是没有,当在Consumer再次链接的时候,这条消息会被redeliver(从新递送)。

若是consumer接收了消息,可是忘记了ack,RabbitMQ不会重复发送消息。

新版RabbitMQ还支持Consumer reject某条(类)消息,能够经过设置requeue参数中的reject为true达到目地,那么Consumer将会把消息发送给下一个注册的Consumer。

八、关闭消息通道(channel)以及和服务器的链接。

 



 

 

注:本文著做权归做者,由demo大师发表,拒绝转载,转载须要做者受权

相关文章
相关标签/搜索