rabbitMQ消息队列原理

MQ:Message Queue,消息队列,是一种应用程序对应用程序的通讯方法;应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。html

 

1      rabbitMQ入门及原理

rabbitMQ官网:http://www.rabbitmq.com/git

Erlang官网:http://www.erlang.org/github

1.1  rabbitMQ概述

RabbitMQ是一个由Erlang开发的AMQPAdvancedMessage Queue )的开源实现,支持多种客户端,如:PythonRuby.NETJavaJMSCPHPActionScriptXMPPSTOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。web

借用网络中一个rabbitMQ的系统架构图:redis

d77e4cfd3d9dc6bf328c7db4b60fbc95.jpg

49e36a7d3c1523e781dac3fa753337d0.png

 

1.1.1     AMQP简介

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不一样产品,不一样的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。--百度百科算法

Message BrokerAMQP简介spring

Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景:编程

  • 消息路由到一个或多个目的地windows

  • 消息转化为其余的表现方式缓存

  • 执行消息的汇集、消息的分解,并将结果发送到他们的目的地,而后从新组合相应返回给消息用户

  • 调用Web服务来检索数据

  • 响应事件或错误

  • 使用发布-订阅模式来提供内容或基于主题的消息路由

AMQP是Advanced Message QueuingProtocol的简称,它是一个面向消息中间件的开放式标准应用层协议。AMQP定义了这些特性:

  • 消息方向

  • 消息队列

  • 消息路由(包括:点到点和发布-订阅模式)

  • 可靠性

  • 安全性

RabbitMQ就是以AMQP协议实现的一种中间件产品,它能够支持多种操做系统,多种编程语言,几乎能够覆盖全部主流的企业级技术平台。

1.1.1.1             AMQP理论

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

简单介绍AMQP的协议栈,AMQP协议自己包含三层,以下:

79c3f2ad461558298780c4cbe3bfdda6.png

Model Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端能够经过这些命令实现本身的业务逻辑,例如,客户端能够经过queue declare声明一个队列,利用consume命令获取队列的消息。

Session Layer,主要负责将客户端命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通讯提供可靠性、同步机制和错误处理。

Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。

这种分层架构相似于OSI网络协议,可替换各层实现而不影响与其它层的交互。AMQP定义了合适的服务器端域模型,用于规范服务器的行为(AMQP服务器端可称为broker)。在这里Model层决定这些基本域模型所产生的行为,这种行为在AMQP中用command表示。Session层定义客户端与broker之间的通讯(通讯双方都是一个peer,可互称作partner),为command的可靠传输提供保障。Transport层专一于数据传送,并与Session保持交互,接受上层的数据,组装成二进制流,传送到receiver后再解析数据,交付给Session层。Session层须要Transport层完成网络异常状况的汇报,顺序传送command等工做。

接下来了解下AMQP当中的一些概念。

BrokerServer):接受客户端链接,实现AMQP消息队列和路由功能的进程。

Virtual Host:实际上是一个虚拟概念,相似于权限控制组,一个Virtual Host里面能够有若干个ExchangeQueue,可是权限控制的最小粒度是Virtual Host

Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeTypedirectFanoutTopic三种,不一样类型的Exchange路由的行为是不同的。

Message Queue:消息队列,用于存储还未被消费者消费的消息。

Message:由HeaderBody组成,Header是由生产者添加的各类属性的集合,包括Message是否被持久化、由哪一个Message Queue接受、优先级是多少等。而Body是真正须要传输的APP数据。

BindingBinding联系了ExchangeMessageQueueExchange在与多个MessageQueue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header获得Routing KeyExchange根据Routing KeyExchangeTypeMessage路由到MessageQueueBinding KeyConsumerBinding ExchangeMessageQueue时指定,而Routing KeyProducer发送Message时指定,二者的匹配方式由ExchangeType决定。

Connection:链接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP链接。

Channel:信道,仅仅建立了客户端到Broker之间的链接后,客户端仍是不能发送消息的。须要为每个Connection建立ChannelAMQP协议规定只有经过Channel才能执行AMQP的命令。一个Connection能够包含多个Channel。之因此须要Channel,是由于TCP链接的创建和释放都是十分昂贵的,若是一个客户端每个线程都须要与Broker交互,若是每个线程都创建一个TCP链接,暂且不考虑TCP链接是否浪费,就算操做系统也没法承受每秒创建如此多的TCP链接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,可是建议尽可能共用Connection

CommandAMQP的命令,客户端经过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端能够经过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。

消息中间件的主要功能是消息的路由(Routing)和缓存(Buffering)。在AMQP中提供相似功能的两种域模型:Exchange Messagequeue

c9b25308f76d038e2de2bee39b2d0762.jpg

Exchange接收消息生产者(MessageProducer)发送的消息根据不一样的路由算法将消息发送往Message queueMessagequeue会在消息不能被正常消费时缓存这些消息,具体的缓存策略由实现者决定,当message queue与消息消费者(Messageconsumer)之间的链接通畅时,Message queue有将消息转发到consumer的责任。

一个Message的处理流程相似于下图:

a789e83e1de4d4dfa2233f9d87158ceb.jpg

Message是当前模型中所操纵的基本单位,它由Producer产生,通过BrokerConsumer所消费。它的基本结构有两部分: HeaderBodyHeader是由Producer添加上的各类属性的集合,这些属性有控制Message是否可被缓存,接收的queue是哪一个,优先级是多少等。Body是真正须要传送的数据,它是对Broker不可见的二进制数据流,在传输过程当中不该该受到影响。

一个broker中会存在多个Message queueExchange怎样知道它要把消息发送到哪一个Message queue中去呢? 这就是上图中所展现Binding的做用。Messagequeue的建立是由client application控制的,在建立Message queue后须要肯定它来接收并保存哪一个Exchange路由的结果。Binding是用来关联ExchangeMessage queue的域模型。Clientapplication控制Exchange与某个特定Messagequeue关联,并将这个queue接受哪一种消息的条件绑定到Exchange,这个条件也叫Binding key或是 Criteria

在与多个Messagequeue关联后,Exchange中就会存在一个路由表,这个表中存储着每一个Message queue所须要消息的限制条件。Exchange就会检查它接受到的每一个MessageHeaderBody信息,来决定将Message路由到哪一个queue中去。MessageHeader中应该有个属性叫Routing Key,它由Message发送者产生,提供给Exchange路由这条Message的标准。Exchange根据不一样路由算法有不一样有ExchangeType。好比有Direct相似,须要Bindingkey等于Routing key;也有BindingkeyRouting key符合一个模式关系;也有根据Message包含的某些属性来判断。一些基础的路由算法由AMQP所提供,clientapplication也能够自定义各类本身的扩展路由算法。

AMQP中,Client application想要与Broker沟通,就须要创建起与Brokerconnection,这种connection实际上是与Virtual Host相关联的,也就是说,connection是创建在clientVirtual Host之间。能够在一个connection上并发运行多个channel,每一个channel执行与Broker的通讯,咱们前面提供的session就是依附于channel上的。

这里的Session能够有多种定义,既能够表示AMQP内部提供的command分发机制,也能够说是在宏观上区别与域模型的接口。正常理解就是咱们平时所说的交互context,主要做用就是在网络上可靠地传递每个command。在AMQP的设计中,应当是借鉴了TCP的各类设计,用于保证这种可靠性。

Session层,为上层所须要交互的每一个command分配一个唯一标识符(能够是一个UUID),是为了在传输过程当中能够对command作校验和重传。Command发送端也须要记录每一个发送出去的commandReplayBuffer,以期获得接收方的回馈,保证这个command被接收方明确地接收或是已执行这个command。对于超时没有收到反馈的command,发送方再次重传。若是接收方已明确地回馈信息想要告知command发送方但这条信息在中途丢失或是其它问题发送方没有收到,那么发送方不断重传会对接收方产生影响,为了下降这种影响,command接收方设置一个过滤器IdempotencyBarrier,来拦截那些已接收过的command关于这种重传及确认机制,能够参考下TCP的相关设计。

 

1.1.2     Erlang简介

 

Erlang([':l])是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CS-Lab开发,目的是创造一种能够应对大规模并发活动的编程语言和运行环境。Erlang问世于1987年,通过十年的发展,于1998年发布开源版本。Erlang是运行于虚拟机的解释性语言,可是如今也包含有乌普萨拉大学高性能Erlang计划(HiPE)开发的本地代码编译器,自R11B-4版本开始,Erlang也开始支持脚本式解释器。在编程范型上,Erlang属于多重范型编程语言,涵盖函数式、并发式及分布式。顺序执行的Erlang是一个及早求值, 单次赋值和动态类型的函数式编程语言。

Erlang是一个结构化,动态类型编程语言,内建并行计算支持。最初是由爱立信专门为通讯应用设计的,好比控制交换机或者变换协议等,所以很是适合于构建分布式,实时软并行计算系统。使用Erlang编写出的应用运行时一般由成千上万个轻量级进程组成,并经过消息传递相互通信。进程间上下文切换对于Erlang来讲仅仅只是一两个环节,比起C程序的线程切换要高效得多得多了。

使用Erlang来编写分布式应用要简单的多,由于它的分布式机制是透明的:对于程序来讲并不知道本身是在分布式运行。Erlang运行时环境是一个虚拟机,有点像Java虚拟机,这样代码一经编译,一样能够随处运行。它的运行时系统甚至容许代码在不被中断的状况下更新。另外若是须要更高效的话,字节代码也能够编译成本地代码运行。

 

来自:百度百科

 

其余MQ

 f97ab5699444eadf310e07173f50f83f.png

1.1.3     下载rabbitMQErlang

 ec3645273847c31033ff96f5c4d49d4f.png

首页,下拉至:download下载,Tutorials教程

 5e996097280f3fbe68b2156b1eff7f64.png

下载windows版本:

rabbitmq-server-3.6.12.exe

 

教程RabbitMQ Tutorials

http://www.erlang.org/访问比较慢,建议你们也能够网上找资源下载。

0d2d1abea7347d9af2481204d39e4426.png

 

1.1.4     rabbitMQ基本概念

spring-boot-rabbitMQ项目源码https://git.oschina.net/wyait/springboot1.5.4.git

 

config配置类:

@Autowired

private ConnectionFactoryconnectionFactory;

@Autowired

private Queue3Listenerqueue3Listener;

 

@Bean

@Primary

public RabbitTemplaterabbitTemplate() {

    RabbitTemplate rabbitTemplate = newRabbitTemplate(connectionFactory);

    rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());

    return rabbitTemplate;

}

 

@Bean

@Primary

publicSimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {

    SimpleRabbitListenerContainerFactorysimpleRabbitListenerContainerFactory = newSimpleRabbitListenerContainerFactory();

   simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);

    simpleRabbitListenerContainerFactory.setMessageConverter(newJackson2JsonMessageConverter());

    returnsimpleRabbitListenerContainerFactory;

}

 

@Bean

publicSimpleMessageListenerContainer simpleMessageListenerContainer() {

    SimpleMessageListenerContainer container =new SimpleMessageListenerContainer(connectionFactory);

    container.setQueues(queue3());

   container.setMessageListener(queue3Listener);

    return container;

}

 

@Bean

public DirectExchangedirectExchange() {

    return new DirectExchange(EX_CHANGE_NAME1);

}

 

@Bean

public Queue queue1() {

    return new Queue(QUEUE1, true);

}

 

@Bean

public Queue queue2() {

    return new Queue(QUEUE2, true);

}

 

@Bean

public Queue queue3() {

    return new Queue(QUEUE3, true);

}

 

@Bean

public Binding binding1() {

    returnBindingBuilder.bind(queue1()).to(directExchange()).with(ROUTING_KEY1);

}

 

@Bean

public Binding binding2() {

    returnBindingBuilder.bind(queue2()).to(directExchange()).with(ROUTING_KEY2);

 

}

 

@Bean

public Binding binding3() {

    return BindingBuilder.bind(queue3()).to(directExchange()).with(ROUTING_KEY3);

}

 

基本概念:

  • ConnectionFactoryConnectionChannel

    connectionsocket链接的封装,connectionFqactoryconnection的生产工程,channel是通讯的信道,实际进行数据交流的管道,由于创建connection的开销明显要比创建channel要大不少,因此数据传输真实发生在channel

  • Exchange,Queue

    exchange是能够理解成一条特殊的传输通道,他会把消息投递到绑定的消息池内。

    queue就是消息池了,使用前须要绑定exchange,以及本身的标志。

  • exchange_key,routing_key

    exchange_key决定了publisher的消息投递到哪条通道,routing_key决定了将消息放到哪一个池子里

  • 绑定

    queue要接受消息必须与exchange进行绑定,并在绑定的时候给本身与exchange的绑定设置一个标记routing_key,之后用来匹配消息接收

    exchangequeue是一对多的关系,根据exchange不一样类型,分别投递到不一样的消息池

 

下面来看看exchange的类型

 

  1. 1.       fanout

    直接将消息发送到与该exchange绑定的全部queue

  1. 2.       direct

    routing_key进行严格匹配,当消息来到的时候,只有exchange与某queue绑定的routing_key彻底匹配才将消息投递到该queue

  1. 3.       topic

    用特殊符号进行匹配,知足条件的queue都能收到消息,这里的routing_key"."分隔,*匹配一个单词,#匹配多个单词,若是知足多个条件也不会投递屡次

  1. 4.       headers

    不依赖routing_key匹配,根据消息体内的headers属性匹配,绑定的时候能够制定键值对

    接下来来看看配置文件

          1.@Bean统一注入到容器中,咱们声明了connectionfactory,他会自动根据application里面的属性进行组装,这个链接对于后面的容器都是要用到的,这里要注意converter的设置,由于咱们要将pojo类型进行传输,通常程序外的传输都是创建在字节流的基础上,converter就会自动转换

          2.接下来咱们声明queuetrue属性设置为持久型的池子,当链接断开时,消息会呗保留,而后声明exchange,这里咱们使用的是directexchange,接下来将二者绑定起来

  1. 5.       声明SimpleMessageListenerContainerSimpleRabbitListenerContainerFactory注意这里声明两个是由于这是消息监听的两种方式

    首先讲讲SimpleMessageListenerContainer,这个须要设置确认方式,有较多属性克设置,有兴趣可自行设置,这里我只是简单的设置了一下,而后要设置listener

    listener须要实现ChannelAwareMessageListener里面有

    public void onMessage(Message message,Channel channel) 的重载方法须要实现,消息体在Messagebody内,相对来讲信息比较完备

    接下来看看SimpleRabbitListenerContainerFactory,这个有几个注意点,须要再次设置converter由于,一个是发消息的时候解析成二进制,这个则是将二进制解析成具体的类,回调相对简单一点

 

    @Component

    @RabbitListener(queues =RabbitMQConfig.QUEUE1, containerFactory ="simpleRabbitListenerContainerFactory")

    public class Queue1Listener  {

     private static Logger logger =LoggerFactory.getLogger(Queue1Listener.class);

     @RabbitHandler

     public void receive(@Payload String s) {

 

     logger.info("listener1 info: " +s);

 

     }

    }

 

    记得须要containerFactory具体写出来

    在接收消息的方法上写@RabbitHandler,消息体打上@payload久能够接受消息了。

    其实还有个方法就是指定一个MessageAdapter,而后在container里面就能够指定接收的方法名,不是很推荐,明文反射总感受容易出问题

    固然publisher也是有消息的回调的

    RabbitTemplate下有ConfirmCallback实现confirm方法就行了

 

1.2  rabbitMQ入门

1.2.1     安装rabbitMQwindows

安装步骤:

1.    安装Erland,经过官方下载页面http://www.erlang.org/downloads获取exe安装包,直接打开并完成安装。

2.    安装RabbitMQ,经过官方下载页面https://www.rabbitmq.com/download.html获取exe安装包。

3.    下载完成后,能够直接运行安装程序,或配置环境变量后运行rabbitMQ-server安装程序。

4.    RabbitMQ Server安装完成以后,会自动的注册为服务,并以默认配置启动起来。

 

安装过程请百度

 

安装成功:访问:http://127.0.0.1:15672   用户密码:guest/guest

19f5a378734d5d5164c22f58228dee5f.png

咱们能够看到一些基本概念,好比:ConnectionsChannelsExchangesQueue等。第一次使用,能够都点开看看都有些什么内容,熟悉一下RabbitMQ Server的服务端。

 

点击Admin标签,在这里能够进行用户的管理。

4cb5295ff0954532b14efa40468bc720.png

点击admin,添加用户:wyait/wyait并受权。

9ecaf7b7d635a42a1442d6a7f7828590.png

点击all users表单中的用户名“wyait”进行受权:

 1ae05b2095255154c1e6dd23f9e33b4f.png

1.2.1.1             Virtual Hosts设置界面:

160f8f3478f8a7e0a8237bbad7acb3d0.png

程序中和rabbitMQ交互的端口是:5672AMQP协议端口

 

1.2.2     建立spring-boot-MQ工程

项目源码,

码云地址:https://git.oschina.net/wyait/springboot1.5.4.git

github地址:https://github.com/wyait/spring-boot-1.5.4.git

spring boot整合rabbitMQ项目博客连接:spring boot 1.5.4 整合rabbitMQ(十七)

 

1.2.3     消息队列

官网:rabbitMQTutorials

04d7e56a4dd5d8b50c30711d96b72f2b.png

前提,rabbitMQ服务已经启动;测试过程:

1spring Boot项目先启动,监听队列;

2,启动测试类发送消息到队列中;、

3,消费者消费消息。

 

1.2.3.1             hello world

The simplest thingthat does something 简单的消息队列:

ae97423ac713c3dac0e57e861f95bd2b.png

P:消息的生产者;

C:消息的消费者;

红色框:消息队列;

 

demo参考1.2.2章节。

 

1.2.3.2             Work Queues

Distributing tasksamong workers (the competing consumers pattern)

9ef016c90da8e91d24f217825d6dff74.png

一个生产者对应一个消息队列MQMQ能够对应多个消费者,可是同一个消息只能被一个客户端生产者所获取;

 

同一个消息只能被一个客户端所获取。可是对于不一样的消费者,接受消息,处理的效率不一样,因此会有不合理的地方。

 

RabbitMqConfig中定义一个队列workQueues

@Bean

   public Queue workQueue() {

      return new Queue("workQueues");

   }

 

消息生产者WorkSender:

@Component

public class WorkSender {

   @Autowired

   private AmqpTemplate rabbitMQTemplate;

 

   /**

    *

    * @描述:work模式

    * @建立人:wyait

    * @建立时间:2017914下午5:51:20

    */

   public void workSend(String msg) {

      String context = msg + new Date();

      System.out.println("workSender : " + context);

      this.rabbitMQTemplate.convertAndSend("workQueues",context);

   }

}

 

消息消费者1 WorkReceiver:

@Component

@RabbitListener(queues ="workQueues")

public class WorkReceiver {

 

   @RabbitHandler

   // handler注解来指定对消息的处理方法

   public void process(String hello) {

      System.out.println("workReceiver:" + hello);

   }

}

 

消息消费者2 WorkReceiverTwo:

@Component

@RabbitListener(queues ="workQueues")

public class WorkReceiverTwo {

 

   @RabbitHandler

   // handler注解来指定对消息的处理方法

   public void process(String hello) {

      System.out.println("workReceiverTwo:" + hello);

   }

}

 

测试消费消息结果:
503a2221941bc7043dad04eb1bcc8e89.png

平均分配消息原则(你一条,我一条)。

可经过更改channel设置,改变分配策略。

 

1.2.3.3             Publish/Subscribe订阅模式

Sending messagesto many consumers at once.

一个生产者将同一条消息message发送到交换机exchange,经过exchange发送到多个队列中,而对应的消费者都能获取到该消息。

30edb6a834b95aabead6a9659c5caceb.png

注意:

问题1:消息是发送到交换机而不是队列?答:消息能够发送到队列,也能够发送到交换机。

问题2:消费者的消息来源只能是队列;

问题3:若是将消息发送到没有绑定队列的交换机上,消息会去哪?答:消息丢失。

总结:消息只能存放于队列,不能存放在交换机;交换机只能用于消息的传递,消息通道。

 

Fanout Exchange:

46015481703c70a380c08c87f758e852.png

不处理路由键(routingKey)。你只须要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的全部队列上。很像子网广播,每台子网内的主机都得到了一份复制的消息。Fanout交换机转发消息是最快的。

 

Fanout 就是咱们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的全部队列都收到这个消息。

 

这里使用三个队列来测试(也就是在Application类中建立和绑定的fanout.Afanout.Bfanout.C)这三个队列都和Application中建立的fanoutExchange转发器绑定。

 

新增subscribe订阅模式配置:

// ******************subscribe订阅模式**********Start****************

   /**

    *

    * @描述:subscribe订阅模式的队列A;

    * @建立人:wyait

    * @建立时间:2017915下午3:24:31

    * @return

    */

   @Bean

   public Queue subscribeQueueA() {

      return new Queue("fanout.A");

   }

 

   /**

    *

    * @描述:subscribe订阅模式的队列B;

    * @建立人:wyait

    * @建立时间:2017915下午3:24:31

    * @return

    */

   @Bean

   public Queue subscribeQueueB() {

      return new Queue("fanout.B");

   }

 

   /**

    *

    * @描述:subscribe订阅模式的队列C;

    * @建立人:wyait

    * @建立时间:2017915下午3:24:31

    * @return

    */

   @Bean

   public Queue subscribeQueueC() {

      return new Queue("fanout.C");

   }

 

   /**

    *

    * @描述:fanoutExchange交换机

    * @建立人:wyait

    * @建立时间:2017915下午3:34:41

    * @return

    */

   @Bean

   public FanoutExchange fanoutExchange() {

      return new FanoutExchange("fanoutExchange");

   }

 

   /**

    *

    * @描述:subscribeQueue绑定fanoutExchange交换机

    * @建立人:wyait

    * @建立时间:2017915下午3:41:10

    * @param subscribeQueue

    * @param fanoutExchange

    * @return

    */

   @Bean

   Binding bindingExchangeA(Queue subscribeQueueA,

        FanoutExchange fanoutExchange) {

      // 绑定队列AfanoutExchange交换机,也可使用:bind(subscribeQueueA())进行绑定;

      return BindingBuilder.bind(subscribeQueueA).to(fanoutExchange);

   }

 

   @Bean

   Binding bindingExchangeB(Queue subscribeQueueB,

        FanoutExchange fanoutExchange) {

      return BindingBuilder.bind(subscribeQueueB).to(fanoutExchange);

   }

 

   @Bean

   Binding bindingExchangeC(Queue subscribeQueueC,

        FanoutExchange fanoutExchange) {

      return BindingBuilder.bind(subscribeQueueC).to(fanoutExchange);

   }

 

   // ******************subscribe订阅模式**********End****************

 

消息生产者SubscribeSender指定交换机:

@Component

public class SubscribeSender {

   @Autowired

   private AmqpTemplate rabbitTemplate;

 

   public void send(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---SubscribeSender : " +sendMsg);

      // convertAndSend(String exchange, String routingKey, Objectmessage)

      this.rabbitTemplate.convertAndSend("fanoutExchange","aaa", sendMsg);

   }

 

}

 

消息消费者SubscribeReveicerABC监听队列fanout.A/B/C:

@Component

@RabbitListener(queues ="fanout.A")

public class SubscribeReceiver{

   @RabbitHandler

   public void precess(String msg) {

      System.out.println("SubscribeReceiverA  : " + msg);

   }

}

 

测试test类:

@Autowired

   private SubscribeSender subSend;

 

   @Test

   public void subscribeTest() {

      System.out.println("==========subscribe发送消息!");

      for (int i = 0; i < 50; i++) {

        String msg = "==========msg_" + i;

        subSend.send(msg);

        try {

           Thread.sleep(100);

        } catch (InterruptedException e) {

           e.printStackTrace();

        }

      }

   }

 

三个消费者都接收到了每一条信息。

851514323debff33e0affe3d7a010e66.png

注意:subscribe订阅模式和work模式的区别。

一、  work模式将消息发送到队列

二、  订阅模式将消息发送到交换机

三、  work模式是1个队列N个消费者,订阅模式是N个队列N个消费者(N>0)

 

1.2.3.4             Routing路由模式

e1eec04aaba9c5ee0ec7c4c151e98eb4.png

路由模式:基于订阅模式,

能够在队列绑定到交换机时指定一个规则,根据不一样的消息规则,选择是否接受该消息。

0f3bcce82aacc2088184af47292c1a86.png

处理路由键(routingKey)。须要将一个队列绑定到交换机上,要求该消息与一个特定的路由键routingKey彻底匹配。

 

基于Subscribe订阅模式,配置类中添加队列、DirectExchange交换机并进行绑定:

/**

    *

    * @描述:routing路由模式的队列A;

    * @建立人:wyait

    * @建立时间:2017915下午3:24:31

    * @return

    */

   @Bean

   public Queue routingQueueA() {

      return new Queue("routing.A");

   }

 

   /**

    *

    * @描述:routing路由模式的队列B;

    * @建立人:wyait

    * @建立时间:2017915下午3:24:31

    * @return

    */

   @Bean

   public Queue routingQueueB() {

      return new Queue("routing.B");

   }

 

   /**

    *

    * @描述:DirectExchange交换机

    * @建立人:wyait

    * @建立时间:2017915下午3:34:41

    * @return

    */

   @Bean

   public DirectExchange directExchange() {

      return new DirectExchange("directExchange");

   }

 

   /**

    *

    * @描述:routingQueue绑定directExchange交换机

    * @建立人:wyait

    * @建立时间:2017915下午3:41:10

    * @param routingQueue

    * @param directExchange

    * @return

    */

   @Bean

   Binding bindingDirectExchangeA(Queue routingQueueA,

        DirectExchange directExchange) {

      // 绑定routing队列AdirectExchange交换机,并指定routing路由规则;

      return BindingBuilder.bind(routingQueueA()).to(directExchange())

           .with("info");

   }

 

   @Bean

   Binding bindingDirectExchangeB(Queue routingQueueB,

        DirectExchange directExchange) {

      // 绑定routing队列AdirectExchange交换机,并指定routing路由规则;

      returnBindingBuilder.bind(routingQueueB()).to(directExchange())

           .with("error");

   }

 

消息生产者:

@Component

public class RoutingSender {

   @Autowired

   private AmqpTemplate rabbitTemplate;

 

   public void send(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---RoutingSender : " + sendMsg);

      this.rabbitTemplate.convertAndSend("directExchange","info", sendMsg);

   }

 

   public void sendTwo(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---RoutingSender TWO: " +sendMsg);

      this.rabbitTemplate

           .convertAndSend("directExchange", "infoTwo",sendMsg);

   }

 

   public void sendError(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---RoutingSender Error: " +sendMsg);

      this.rabbitTemplate.convertAndSend("directExchange","error", sendMsg);

   }

 

   public void sendErrorTwo(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---RoutingSender ErrorTwo: " +sendMsg);

      this.rabbitTemplate.convertAndSend("directExchange","errorTwo",

           sendMsg);

   }

 

}

 

消息消费者A

@Component

@RabbitListener(queues ="routing.A")

public class RoutingReceiver {

   @RabbitHandler

   public void precess(String msg) {

      System.out.println("RoutingReceiverA === : " + msg);

   }

 

}

 

测试类:

@Autowired

   private RoutingSender routSend;

 

   @Test

   public void routingTest() {

      System.out.println("==========routing发送消息!");

      routSend.send("==========msg_info ");

      routSend.sendTwo("==========msg_infoTwo ");

      routSend.sendError("==========msg_error ");

      routSend.sendErrorTwo("==========msg_ErrorTwo ");

 

      System.out.println("==========routing发送消息   结束!");

   }

 

运行:

e1ea13567a2e8d26b56be095dc8fe33e.png

MqApplication控制台:

33e294ec1cc02f99e7e6b190bd609584.png

由此能够看出,routingKey符合规则的消息,会被消费方接收并消费。

 

1.2.3.5             Topics通配符模式

Receiving messagesbased on a pattern (topics)

9186caf294234d3acd57b894fc72a1ea.png

基于路由模式,使用通配符匹配队列,发送消息

4ea0164e18715288b5c2314834987a09.png

将路由键和某模式进行匹配。

 

任何发送到Topic Exchange的消息都会被转发到全部关心RouteKey中指定话题的Queue

 

1. 这种模式须要RouteKey,要提早绑定ExchangeQueue

 

2. 若是Exchange没有发现可以与RouteKey匹配的Queue,则会抛弃此消息。

 

3. 在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心全部涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)

 

4. #”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,没法与“log.warn.timeout”匹配;可是“log.#”能与上述二者匹配。

通配符#*的区别;

#:表明匹配一个或多个词;

*:表明只匹配一个词.

 

配置类中新增队列绑定TopicExchange交换机,并指定routingKey和匹配模式:

@Bean

   public Queue topicQueueA() {

      return new Queue("topic.queueA", true); // true表示持久化该队列

   }

 

   @Bean

   public Queue topicQueueB() {

      return new Queue("topic.queueB", true);

   }

 

   // 声明交互器

   @Bean

   TopicExchange topicExchange() {

      return new TopicExchange("topicExchange");

   }

 

   // 绑定

   @Bean

   public Binding bindingA() {

      return BindingBuilder.bind(topicQueueA()).to(topicExchange())

           .with("topic.message");

   }

 

   @Bean

   public Binding bindingB() {

      return BindingBuilder.bind(topicQueueB()).to(topicExchange())

           .with("topic.#");

   }

 

消息生产者:

@Component

public class TopicSender {

   @Autowired

   private AmqpTemplate rabbitTemplate;

 

   public void send(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---TopicSender : " + sendMsg);

      this.rabbitTemplate.convertAndSend("topicExchange","topic.message",

           sendMsg);

   }

 

   public void sendTwo(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---TopicSender messages: " +sendMsg);

      this.rabbitTemplate.convertAndSend("topicExchange","topic.messages",

           sendMsg);

   }

 

}

 

消息消费者:

@Component

@RabbitListener(queues ="topic.queueA")

public class TopicReceiver {

   @RabbitHandler

   public void precess(String msg) {

      System.out.println("TopicReceiverA  : " + msg);

   }

}

 

test测试类:

   @Autowired

   private TopicSender topicSend;

 

   @Test

   public void topicTest() {

      System.out.println("==========topic发送消息!");

      topicSend.send("==========msg_info ");

      topicSend.sendTwo("==========msg_infoTwo ");

 

      System.out.println("==========topic发送消息   结束!");

   }

 

重启MqApplication,运行test类:结果:

43a0b8c3dc037057b030a37d6838a34c.png

 92193865ac175fbe027e1cf8d04884fc.png

根据路由规则,接收不一样生产者的消息。

1.2.3.6             交换机总结

RPC模式能够百度查资料去了解!

 

FanoutExchange: 将消息分发到全部的绑定队列,无routingkey的概念 

        HeadersExchange :经过添加属性key-value匹配 

        DirectExchange:按照routingkey分发到指定队列 

        TopicExchange:多关键字匹配 

 

1.2.4     管理界面操做队列和交换机

 

进入Exchanges交换机界面,能够看到全部的AMQP默认的交换机和定义的Exchange:

f3e237c7577f5ebd94d7330a5bd15911.png

选择topicExchange:

5093d779e87925abfbf8fad052723d98.png

 

能够对队列进行添加和解绑操做!

 

1.2.5     队列的持久化

RabbitMQ的队列有2种,一种是内存队列,一种是持久化队列

一、  内存队列

  1. 优势:速度快,效率高

  2. 缺点:宕机,消息丢失

二、  持久化队列

  1. 优势:消息能够持久化保存,宕机或断电后消息不丢失

  2. 缺点:比内存存储速度慢,性能差

设置方法:

@Bean

   public Queue topicQueueA() {

      return new Queue("topic.queueA", true); // true表示持久化该队列

   }

 

管理界面查看是否持久化:

107a956b9ba6c55ce12636ecf6b0bfd7.png

D 持久化

 

项目源码,

码云地址:https://git.oschina.net/wyait/springboot1.5.4.git

github地址:https://github.com/wyait/spring-boot-1.5.4.git



spring boot系列文章:

spring boot 1.5.4 概述(一)

spring boot 1.5.4入门和原理(二)

spring boot 1.5.4 之web开发(三)

spring boot 1.5.4 整合JSP(四)

spring boot 1.5.4 集成devTools(五)

spring boot 1.5.4 集成JdbcTemplate(六)

spring boot 1.5.4 集成spring-Data-JPA(七)

spring boot 1.5.4 配置文件详解(八)

spring boot 1.5.4 统一异常处理(九)

spring boot 1.5.4 定时任务和异步调用(十)

spring boot 1.5.4 整合log4j2(十一)

spring boot 1.5.4 整合 mybatis(十二)

spring boot 1.5.4 整合 druid(十三)

spring boot 1.5.4 之监控Actuator(十四)

spring boot 1.5.4 整合webService(十五)

spring boot 1.5.4 整合redis、拦截器、过滤器、监听器、静态资源配置(十六)

spring boot 1.5.4 整合rabbitMQ(十七)

相关文章
相关标签/搜索