MQ:Message Queue,消息队列,是一种应用程序对应用程序的通讯方法;应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。html
rabbitMQ官网:http://www.rabbitmq.com/git
Erlang官网:http://www.erlang.org/github
RabbitMQ是一个由Erlang开发的AMQP(AdvancedMessage Queue )的开源实现,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。web
借用网络中一个rabbitMQ的系统架构图:redis
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不一样产品,不一样的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。--百度百科算法
Message Broker与AMQP简介spring
Message Broker是一种消息验证、传输、路由的架构模式,其设计目标主要应用于下面这些场景:编程
消息路由到一个或多个目的地windows
消息转化为其余的表现方式缓存
执行消息的汇集、消息的分解,并将结果发送到他们的目的地,而后从新组合相应返回给消息用户
调用Web服务来检索数据
响应事件或错误
使用发布-订阅模式来提供内容或基于主题的消息路由
AMQP是Advanced Message QueuingProtocol的简称,它是一个面向消息中间件的开放式标准应用层协议。AMQP定义了这些特性:
消息方向
消息队列
消息路由(包括:点到点和发布-订阅模式)
可靠性
安全性
RabbitMQ就是以AMQP协议实现的一种中间件产品,它能够支持多种操做系统,多种编程语言,几乎能够覆盖全部主流的企业级技术平台。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
简单介绍AMQP的协议栈,AMQP协议自己包含三层,以下:
Model Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端能够经过这些命令实现本身的业务逻辑,例如,客户端能够经过queue declare声明一个队列,利用consume命令获取队列的消息。
Session Layer,主要负责将客户端命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通讯提供可靠性、同步机制和错误处理。
Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。
这种分层架构相似于OSI网络协议,可替换各层实现而不影响与其它层的交互。AMQP定义了合适的服务器端域模型,用于规范服务器的行为(AMQP服务器端可称为broker)。在这里Model层决定这些基本域模型所产生的行为,这种行为在AMQP中用command表示。Session层定义客户端与broker之间的通讯(通讯双方都是一个peer,可互称作partner),为command的可靠传输提供保障。Transport层专一于数据传送,并与Session保持交互,接受上层的数据,组装成二进制流,传送到receiver后再解析数据,交付给Session层。Session层须要Transport层完成网络异常状况的汇报,顺序传送command等工做。
接下来了解下AMQP当中的一些概念。
Broker(Server):接受客户端链接,实现AMQP消息队列和路由功能的进程。
Virtual Host:实际上是一个虚拟概念,相似于权限控制组,一个Virtual Host里面能够有若干个Exchange和Queue,可是权限控制的最小粒度是Virtual Host。
Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不一样类型的Exchange路由的行为是不同的。
Message Queue:消息队列,用于存储还未被消费者消费的消息。
Message:由Header和Body组成,Header是由生产者添加的各类属性的集合,包括Message是否被持久化、由哪一个Message Queue接受、优先级是多少等。而Body是真正须要传输的APP数据。
Binding:Binding联系了Exchange与MessageQueue。Exchange在与多个MessageQueue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header获得Routing Key,Exchange根据Routing Key与ExchangeType将Message路由到MessageQueue。Binding Key由Consumer在Binding Exchange与MessageQueue时指定,而Routing Key由Producer发送Message时指定,二者的匹配方式由ExchangeType决定。
Connection:链接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP链接。
Channel:信道,仅仅建立了客户端到Broker之间的链接后,客户端仍是不能发送消息的。须要为每个Connection建立Channel,AMQP协议规定只有经过Channel才能执行AMQP的命令。一个Connection能够包含多个Channel。之因此须要Channel,是由于TCP链接的创建和释放都是十分昂贵的,若是一个客户端每个线程都须要与Broker交互,若是每个线程都创建一个TCP链接,暂且不考虑TCP链接是否浪费,就算操做系统也没法承受每秒创建如此多的TCP链接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,可是建议尽可能共用Connection。
Command:AMQP的命令,客户端经过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端能够经过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。
消息中间件的主要功能是消息的路由(Routing)和缓存(Buffering)。在AMQP中提供相似功能的两种域模型:Exchange 和 Messagequeue。
Exchange接收消息生产者(MessageProducer)发送的消息根据不一样的路由算法将消息发送往Message queue。Messagequeue会在消息不能被正常消费时缓存这些消息,具体的缓存策略由实现者决定,当message queue与消息消费者(Messageconsumer)之间的链接通畅时,Message queue有将消息转发到consumer的责任。
一个Message的处理流程相似于下图:
Message是当前模型中所操纵的基本单位,它由Producer产生,通过Broker被Consumer所消费。它的基本结构有两部分: Header和Body。Header是由Producer添加上的各类属性的集合,这些属性有控制Message是否可被缓存,接收的queue是哪一个,优先级是多少等。Body是真正须要传送的数据,它是对Broker不可见的二进制数据流,在传输过程当中不该该受到影响。
一个broker中会存在多个Message queue,Exchange怎样知道它要把消息发送到哪一个Message queue中去呢? 这就是上图中所展现Binding的做用。Messagequeue的建立是由client application控制的,在建立Message queue后须要肯定它来接收并保存哪一个Exchange路由的结果。Binding是用来关联Exchange与Message queue的域模型。Clientapplication控制Exchange与某个特定Messagequeue关联,并将这个queue接受哪一种消息的条件绑定到Exchange,这个条件也叫Binding key或是 Criteria。
在与多个Messagequeue关联后,Exchange中就会存在一个路由表,这个表中存储着每一个Message queue所须要消息的限制条件。Exchange就会检查它接受到的每一个Message的Header及Body信息,来决定将Message路由到哪一个queue中去。Message的Header中应该有个属性叫Routing Key,它由Message发送者产生,提供给Exchange路由这条Message的标准。Exchange根据不一样路由算法有不一样有ExchangeType。好比有Direct相似,须要Bindingkey等于Routing key;也有Bindingkey与Routing key符合一个模式关系;也有根据Message包含的某些属性来判断。一些基础的路由算法由AMQP所提供,clientapplication也能够自定义各类本身的扩展路由算法。
在AMQP中,Client application想要与Broker沟通,就须要创建起与Broker的connection,这种connection实际上是与Virtual Host相关联的,也就是说,connection是创建在client与Virtual Host之间。能够在一个connection上并发运行多个channel,每一个channel执行与Broker的通讯,咱们前面提供的session就是依附于channel上的。
这里的Session能够有多种定义,既能够表示AMQP内部提供的command分发机制,也能够说是在宏观上区别与域模型的接口。正常理解就是咱们平时所说的交互context,主要做用就是在网络上可靠地传递每个command。在AMQP的设计中,应当是借鉴了TCP的各类设计,用于保证这种可靠性。
在Session层,为上层所须要交互的每一个command分配一个唯一标识符(能够是一个UUID),是为了在传输过程当中能够对command作校验和重传。Command发送端也须要记录每一个发送出去的command到ReplayBuffer,以期获得接收方的回馈,保证这个command被接收方明确地接收或是已执行这个command。对于超时没有收到反馈的command,发送方再次重传。若是接收方已明确地回馈信息想要告知command发送方但这条信息在中途丢失或是其它问题发送方没有收到,那么发送方不断重传会对接收方产生影响,为了下降这种影响,command接收方设置一个过滤器IdempotencyBarrier,来拦截那些已接收过的command。关于这种重传及确认机制,能够参考下TCP的相关设计。
Erlang([':l])是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CS-Lab开发,目的是创造一种能够应对大规模并发活动的编程语言和运行环境。Erlang问世于1987年,通过十年的发展,于1998年发布开源版本。Erlang是运行于虚拟机的解释性语言,可是如今也包含有乌普萨拉大学高性能Erlang计划(HiPE)开发的本地代码编译器,自R11B-4版本开始,Erlang也开始支持脚本式解释器。在编程范型上,Erlang属于多重范型编程语言,涵盖函数式、并发式及分布式。顺序执行的Erlang是一个及早求值, 单次赋值和动态类型的函数式编程语言。
Erlang是一个结构化,动态类型编程语言,内建并行计算支持。最初是由爱立信专门为通讯应用设计的,好比控制交换机或者变换协议等,所以很是适合于构建分布式,实时软并行计算系统。使用Erlang编写出的应用运行时一般由成千上万个轻量级进程组成,并经过消息传递相互通信。进程间上下文切换对于Erlang来讲仅仅只是一两个环节,比起C程序的线程切换要高效得多得多了。
使用Erlang来编写分布式应用要简单的多,由于它的分布式机制是透明的:对于程序来讲并不知道本身是在分布式运行。Erlang运行时环境是一个虚拟机,有点像Java虚拟机,这样代码一经编译,一样能够随处运行。它的运行时系统甚至容许代码在不被中断的状况下更新。另外若是须要更高效的话,字节代码也能够编译成本地代码运行。
来自:百度百科
其余MQ
首页,下拉至:download下载,Tutorials教程
下载windows版本:
rabbitmq-server-3.6.12.exe
教程RabbitMQ Tutorials:
http://www.erlang.org/访问比较慢,建议你们也能够网上找资源下载。
spring-boot-rabbitMQ项目源码:https://git.oschina.net/wyait/springboot1.5.4.git
config配置类:
@Autowired
private ConnectionFactoryconnectionFactory;
@Autowired
private Queue3Listenerqueue3Listener;
@Bean
@Primary
public RabbitTemplaterabbitTemplate() {
RabbitTemplate rabbitTemplate = newRabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
@Primary
publicSimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactorysimpleRabbitListenerContainerFactory = newSimpleRabbitListenerContainerFactory();
simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
simpleRabbitListenerContainerFactory.setMessageConverter(newJackson2JsonMessageConverter());
returnsimpleRabbitListenerContainerFactory;
}
@Bean
publicSimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container =new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue3());
container.setMessageListener(queue3Listener);
return container;
}
@Bean
public DirectExchangedirectExchange() {
return new DirectExchange(EX_CHANGE_NAME1);
}
@Bean
public Queue queue1() {
return new Queue(QUEUE1, true);
}
@Bean
public Queue queue2() {
return new Queue(QUEUE2, true);
}
@Bean
public Queue queue3() {
return new Queue(QUEUE3, true);
}
@Bean
public Binding binding1() {
returnBindingBuilder.bind(queue1()).to(directExchange()).with(ROUTING_KEY1);
}
@Bean
public Binding binding2() {
returnBindingBuilder.bind(queue2()).to(directExchange()).with(ROUTING_KEY2);
}
@Bean
public Binding binding3() {
return BindingBuilder.bind(queue3()).to(directExchange()).with(ROUTING_KEY3);
}
基本概念:
ConnectionFactory、Connection、Channel
connection为socket链接的封装,connectionFqactory是connection的生产工程,channel是通讯的信道,实际进行数据交流的管道,由于创建connection的开销明显要比创建channel要大不少,因此数据传输真实发生在channel内
Exchange,Queue
exchange是能够理解成一条特殊的传输通道,他会把消息投递到绑定的消息池内。
queue就是消息池了,使用前须要绑定exchange,以及本身的标志。
exchange_key,routing_key
exchange_key决定了publisher的消息投递到哪条通道,routing_key决定了将消息放到哪一个池子里
绑定
queue要接受消息必须与exchange进行绑定,并在绑定的时候给本身与exchange的绑定设置一个标记routing_key,之后用来匹配消息接收
exchange与queue是一对多的关系,根据exchange不一样类型,分别投递到不一样的消息池
下面来看看exchange的类型
1. fanout
直接将消息发送到与该exchange绑定的全部queue内
2. direct
对routing_key进行严格匹配,当消息来到的时候,只有exchange与某queue绑定的routing_key彻底匹配才将消息投递到该queue
3. topic
用特殊符号进行匹配,知足条件的queue都能收到消息,这里的routing_key以"."分隔,*匹配一个单词,#匹配多个单词,若是知足多个条件也不会投递屡次
4. headers
不依赖routing_key匹配,根据消息体内的headers属性匹配,绑定的时候能够制定键值对
接下来来看看配置文件
1.@Bean统一注入到容器中,咱们声明了connectionfactory,他会自动根据application里面的属性进行组装,这个链接对于后面的容器都是要用到的,这里要注意converter的设置,由于咱们要将pojo类型进行传输,通常程序外的传输都是创建在字节流的基础上,converter就会自动转换
2.接下来咱们声明queue,true属性设置为持久型的池子,当链接断开时,消息会呗保留,而后声明exchange,这里咱们使用的是directexchange,接下来将二者绑定起来
5. 声明SimpleMessageListenerContainer,SimpleRabbitListenerContainerFactory注意这里声明两个是由于这是消息监听的两种方式
首先讲讲SimpleMessageListenerContainer,这个须要设置确认方式,有较多属性克设置,有兴趣可自行设置,这里我只是简单的设置了一下,而后要设置listener,
listener须要实现ChannelAwareMessageListener里面有
public void onMessage(Message message,Channel channel) 的重载方法须要实现,消息体在Message的body内,相对来讲信息比较完备
接下来看看SimpleRabbitListenerContainerFactory,这个有几个注意点,须要再次设置converter由于,一个是发消息的时候解析成二进制,这个则是将二进制解析成具体的类,回调相对简单一点
@Component
@RabbitListener(queues =RabbitMQConfig.QUEUE1, containerFactory ="simpleRabbitListenerContainerFactory")
public class Queue1Listener {
private static Logger logger =LoggerFactory.getLogger(Queue1Listener.class);
@RabbitHandler
public void receive(@Payload String s) {
logger.info("listener1 info: " +s);
}
}
记得须要containerFactory具体写出来
在接收消息的方法上写@RabbitHandler,消息体打上@payload久能够接受消息了。
其实还有个方法就是指定一个MessageAdapter,而后在container里面就能够指定接收的方法名,不是很推荐,明文反射总感受容易出问题
固然publisher也是有消息的回调的
RabbitTemplate下有ConfirmCallback实现confirm方法就行了
安装步骤:
1. 安装Erland,经过官方下载页面http://www.erlang.org/downloads获取exe安装包,直接打开并完成安装。
2. 安装RabbitMQ,经过官方下载页面https://www.rabbitmq.com/download.html获取exe安装包。
3. 下载完成后,能够直接运行安装程序,或配置环境变量后运行rabbitMQ-server安装程序。
4. RabbitMQ Server安装完成以后,会自动的注册为服务,并以默认配置启动起来。
安装过程请百度
安装成功:访问:http://127.0.0.1:15672 用户密码:guest/guest
咱们能够看到一些基本概念,好比:Connections、Channels、Exchanges、Queue等。第一次使用,能够都点开看看都有些什么内容,熟悉一下RabbitMQ Server的服务端。
点击Admin标签,在这里能够进行用户的管理。
点击admin,添加用户:wyait/wyait并受权。
点击all users表单中的用户名“wyait”进行受权:
程序中和rabbitMQ交互的端口是:5672,AMQP协议端口
项目源码,
码云地址:https://git.oschina.net/wyait/springboot1.5.4.git
github地址:https://github.com/wyait/spring-boot-1.5.4.git
spring boot整合rabbitMQ项目博客连接:spring boot 1.5.4 整合rabbitMQ(十七)
官网:rabbitMQTutorials
前提,rabbitMQ服务已经启动;测试过程:
1,spring Boot项目先启动,监听队列;
2,启动测试类发送消息到队列中;、
3,消费者消费消息。
The simplest thingthat does something 简单的消息队列:
P:消息的生产者;
C:消息的消费者;
红色框:消息队列;
demo参考1.2.2章节。
Distributing tasksamong workers (the competing consumers pattern)
一个生产者对应一个消息队列MQ,MQ能够对应多个消费者,可是同一个消息只能被一个客户端生产者所获取;
同一个消息只能被一个客户端所获取。可是对于不一样的消费者,接受消息,处理的效率不一样,因此会有不合理的地方。
在RabbitMqConfig中定义一个队列workQueues:
@Bean
public Queue workQueue() {
return new Queue("workQueues");
}
消息生产者WorkSender:
@Component
public class WorkSender {
@Autowired
private AmqpTemplate rabbitMQTemplate;
/**
*
* @描述:work模式
* @建立人:wyait
* @建立时间:2017年9月14日下午5:51:20
*/
public void workSend(String msg) {
String context = msg + new Date();
System.out.println("workSender : " + context);
this.rabbitMQTemplate.convertAndSend("workQueues",context);
}
}
消息消费者1 WorkReceiver:
@Component
@RabbitListener(queues ="workQueues")
public class WorkReceiver {
@RabbitHandler
// handler注解来指定对消息的处理方法
public void process(String hello) {
System.out.println("workReceiver:" + hello);
}
}
消息消费者2 WorkReceiverTwo:
@Component
@RabbitListener(queues ="workQueues")
public class WorkReceiverTwo {
@RabbitHandler
// handler注解来指定对消息的处理方法
public void process(String hello) {
System.out.println("workReceiverTwo:" + hello);
}
}
平均分配消息原则(你一条,我一条)。
可经过更改channel设置,改变分配策略。
Sending messagesto many consumers at once.
一个生产者将同一条消息message发送到交换机exchange,经过exchange发送到多个队列中,而对应的消费者都能获取到该消息。
注意:
问题1:消息是发送到交换机而不是队列?答:消息能够发送到队列,也能够发送到交换机。
问题2:消费者的消息来源只能是队列;
问题3:若是将消息发送到没有绑定队列的交换机上,消息会去哪?答:消息丢失。
总结:消息只能存放于队列,不能存放在交换机;交换机只能用于消息的传递,消息通道。
Fanout Exchange:
不处理路由键(routingKey)。你只须要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的全部队列上。很像子网广播,每台子网内的主机都得到了一份复制的消息。Fanout交换机转发消息是最快的。
Fanout 就是咱们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的全部队列都收到这个消息。
这里使用三个队列来测试(也就是在Application类中建立和绑定的fanout.A、fanout.B、fanout.C)这三个队列都和Application中建立的fanoutExchange转发器绑定。
新增subscribe订阅模式配置:
// ******************subscribe订阅模式**********Start****************
/**
*
* @描述:subscribe订阅模式的队列A;
* @建立人:wyait
* @建立时间:2017年9月15日下午3:24:31
* @return
*/
@Bean
public Queue subscribeQueueA() {
return new Queue("fanout.A");
}
/**
*
* @描述:subscribe订阅模式的队列B;
* @建立人:wyait
* @建立时间:2017年9月15日下午3:24:31
* @return
*/
@Bean
public Queue subscribeQueueB() {
return new Queue("fanout.B");
}
/**
*
* @描述:subscribe订阅模式的队列C;
* @建立人:wyait
* @建立时间:2017年9月15日下午3:24:31
* @return
*/
@Bean
public Queue subscribeQueueC() {
return new Queue("fanout.C");
}
/**
*
* @描述:fanoutExchange交换机
* @建立人:wyait
* @建立时间:2017年9月15日下午3:34:41
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
/**
*
* @描述:subscribeQueue绑定fanoutExchange交换机
* @建立人:wyait
* @建立时间:2017年9月15日下午3:41:10
* @param subscribeQueue
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeA(Queue subscribeQueueA,
FanoutExchange fanoutExchange) {
// 绑定队列A到fanoutExchange交换机,也可使用:bind(subscribeQueueA())进行绑定;
return BindingBuilder.bind(subscribeQueueA).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue subscribeQueueB,
FanoutExchange fanoutExchange) {
return BindingBuilder.bind(subscribeQueueB).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue subscribeQueueC,
FanoutExchange fanoutExchange) {
return BindingBuilder.bind(subscribeQueueC).to(fanoutExchange);
}
// ******************subscribe订阅模式**********End****************
消息生产者SubscribeSender指定交换机:
@Component
public class SubscribeSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
String sendMsg = msg + new Date();
System.out.println("---SubscribeSender : " +sendMsg);
// convertAndSend(String exchange, String routingKey, Objectmessage)
this.rabbitTemplate.convertAndSend("fanoutExchange","aaa", sendMsg);
}
}
消息消费者SubscribeReveicerA、B、C监听队列fanout.A/B/C:
@Component
@RabbitListener(queues ="fanout.A")
public class SubscribeReceiver{
@RabbitHandler
public void precess(String msg) {
System.out.println("SubscribeReceiverA : " + msg);
}
}
测试test类:
@Autowired
private SubscribeSender subSend;
@Test
public void subscribeTest() {
System.out.println("==========subscribe发送消息!");
for (int i = 0; i < 50; i++) {
String msg = "==========msg_" + i;
subSend.send(msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
三个消费者都接收到了每一条信息。
注意:subscribe订阅模式和work模式的区别。
一、 work模式将消息发送到队列
二、 订阅模式将消息发送到交换机
三、 work模式是1个队列N个消费者,订阅模式是N个队列N个消费者(N>0)
路由模式:基于订阅模式,
能够在队列绑定到交换机时指定一个规则,根据不一样的消息规则,选择是否接受该消息。
处理路由键(routingKey)。须要将一个队列绑定到交换机上,要求该消息与一个特定的路由键routingKey彻底匹配。
基于Subscribe订阅模式,配置类中添加队列、DirectExchange交换机并进行绑定:
/**
*
* @描述:routing路由模式的队列A;
* @建立人:wyait
* @建立时间:2017年9月15日下午3:24:31
* @return
*/
@Bean
public Queue routingQueueA() {
return new Queue("routing.A");
}
/**
*
* @描述:routing路由模式的队列B;
* @建立人:wyait
* @建立时间:2017年9月15日下午3:24:31
* @return
*/
@Bean
public Queue routingQueueB() {
return new Queue("routing.B");
}
/**
*
* @描述:DirectExchange交换机
* @建立人:wyait
* @建立时间:2017年9月15日下午3:34:41
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
/**
*
* @描述:routingQueue绑定directExchange交换机
* @建立人:wyait
* @建立时间:2017年9月15日下午3:41:10
* @param routingQueue
* @param directExchange
* @return
*/
@Bean
Binding bindingDirectExchangeA(Queue routingQueueA,
DirectExchange directExchange) {
// 绑定routing队列A到directExchange交换机,并指定routing路由规则;
return BindingBuilder.bind(routingQueueA()).to(directExchange())
.with("info");
}
@Bean
Binding bindingDirectExchangeB(Queue routingQueueB,
DirectExchange directExchange) {
// 绑定routing队列A到directExchange交换机,并指定routing路由规则;
returnBindingBuilder.bind(routingQueueB()).to(directExchange())
.with("error");
}
消息生产者:
@Component
public class RoutingSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender : " + sendMsg);
this.rabbitTemplate.convertAndSend("directExchange","info", sendMsg);
}
public void sendTwo(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender TWO: " +sendMsg);
this.rabbitTemplate
.convertAndSend("directExchange", "infoTwo",sendMsg);
}
public void sendError(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender Error: " +sendMsg);
this.rabbitTemplate.convertAndSend("directExchange","error", sendMsg);
}
public void sendErrorTwo(String msg) {
String sendMsg = msg + new Date();
System.out.println("---RoutingSender ErrorTwo: " +sendMsg);
this.rabbitTemplate.convertAndSend("directExchange","errorTwo",
sendMsg);
}
}
消息消费者A:
@Component
@RabbitListener(queues ="routing.A")
public class RoutingReceiver {
@RabbitHandler
public void precess(String msg) {
System.out.println("RoutingReceiverA === : " + msg);
}
}
测试类:
@Autowired
private RoutingSender routSend;
@Test
public void routingTest() {
System.out.println("==========routing发送消息!");
routSend.send("==========msg_info ");
routSend.sendTwo("==========msg_infoTwo ");
routSend.sendError("==========msg_error ");
routSend.sendErrorTwo("==========msg_ErrorTwo ");
System.out.println("==========routing发送消息 结束!");
}
运行:
MqApplication控制台:
由此能够看出,routingKey符合规则的消息,会被消费方接收并消费。
Receiving messagesbased on a pattern (topics)
基于路由模式,使用通配符匹配队列,发送消息
将路由键和某模式进行匹配。
任何发送到Topic Exchange的消息都会被转发到全部关心RouteKey中指定话题的Queue上
1. 这种模式须要RouteKey,要提早绑定Exchange与Queue。
2. 若是Exchange没有发现可以与RouteKey匹配的Queue,则会抛弃此消息。
3. 在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心全部涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
4. “#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,没法与“log.warn.timeout”匹配;可是“log.#”能与上述二者匹配。
通配符#和*的区别;
#:表明匹配一个或多个词;
*:表明只匹配一个词.
配置类中新增队列绑定TopicExchange交换机,并指定routingKey和匹配模式:
@Bean
public Queue topicQueueA() {
return new Queue("topic.queueA", true); // true表示持久化该队列
}
@Bean
public Queue topicQueueB() {
return new Queue("topic.queueB", true);
}
// 声明交互器
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
// 绑定
@Bean
public Binding bindingA() {
return BindingBuilder.bind(topicQueueA()).to(topicExchange())
.with("topic.message");
}
@Bean
public Binding bindingB() {
return BindingBuilder.bind(topicQueueB()).to(topicExchange())
.with("topic.#");
}
消息生产者:
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
String sendMsg = msg + new Date();
System.out.println("---TopicSender : " + sendMsg);
this.rabbitTemplate.convertAndSend("topicExchange","topic.message",
sendMsg);
}
public void sendTwo(String msg) {
String sendMsg = msg + new Date();
System.out.println("---TopicSender messages: " +sendMsg);
this.rabbitTemplate.convertAndSend("topicExchange","topic.messages",
sendMsg);
}
}
消息消费者:
@Component
@RabbitListener(queues ="topic.queueA")
public class TopicReceiver {
@RabbitHandler
public void precess(String msg) {
System.out.println("TopicReceiverA : " + msg);
}
}
test测试类:
@Autowired
private TopicSender topicSend;
@Test
public void topicTest() {
System.out.println("==========topic发送消息!");
topicSend.send("==========msg_info ");
topicSend.sendTwo("==========msg_infoTwo ");
System.out.println("==========topic发送消息 结束!");
}
重启MqApplication,运行test类:结果:
根据路由规则,接收不一样生产者的消息。
RPC模式能够百度查资料去了解!
FanoutExchange: 将消息分发到全部的绑定队列,无routingkey的概念
HeadersExchange :经过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
进入Exchanges交换机界面,能够看到全部的AMQP默认的交换机和定义的Exchange:
选择topicExchange:
能够对队列进行添加和解绑操做!
RabbitMQ的队列有2种,一种是内存队列,一种是持久化队列
一、 内存队列
优势:速度快,效率高
缺点:宕机,消息丢失
二、 持久化队列
优势:消息能够持久化保存,宕机或断电后消息不丢失
缺点:比内存存储速度慢,性能差
设置方法:
@Bean
public Queue topicQueueA() {
return new Queue("topic.queueA", true); // true表示持久化该队列
}
管理界面查看是否持久化:
D 持久化
项目源码,
码云地址:https://git.oschina.net/wyait/springboot1.5.4.git
github地址:https://github.com/wyait/spring-boot-1.5.4.git
spring boot系列文章:
spring boot 1.5.4 集成devTools(五)
spring boot 1.5.4 集成JdbcTemplate(六)
spring boot 1.5.4 集成spring-Data-JPA(七)
spring boot 1.5.4 定时任务和异步调用(十)
spring boot 1.5.4 整合log4j2(十一)
spring boot 1.5.4 整合 mybatis(十二)
spring boot 1.5.4 整合 druid(十三)
spring boot 1.5.4 之监控Actuator(十四)
spring boot 1.5.4 整合webService(十五)