RabbitMQ是一个开源的消息代理软件(面向消息的中间件),它的核心做用就是建立消息队列,异步接收和发送消息,MQ的全程是:Message Queue中文的意思是消息队列。java
削峰填谷:用于应对间歇性流量提高对于系统的“破坏”,好比秒杀活动,能够把请求先发送到消息队列在平滑的交由系统去处理,当访问量大于必定数量的时候,还能够直接屏蔽后续操做,给前台的用户友好的显示;git
延迟处理:能够进行事件后置,好比订单超时业务,用户下单30分钟未支付取消订单;github
系统解耦:消息队列也能够帮开发人员完成业务的解耦,好比用户上传头像的功能,最初的设计是用户上传完以后才能发帖,后面有增长了经验系统,须要在上传头像以后增长经验值,到后来又上线了金币系统,上传头像以后能够增长金币,像这种需求的不断升级,若是在业务代码里面写死每次该业务代码是很不优雅的,这个时候若是使用消息队列,那么只须要增长一个订阅器用于介绍用户上传头像的消息,再执行经验的增长和金币的增长是很是简单的,而且在不改动业务模块业务代码的基础上能够轻松实现,若是后期须要撤销某个模块了,只须要删除订阅器便可,就这样就下降了系统开发的耦合性;spring
如今市面上比较主流的消息队列还有Kafka、RocketMQ、RabbitMQ,它们的介绍和区别以下:docker
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache定级项目。Kafka主要特色是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。数据库
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。安全
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具备高吞吐量、高可用性、适合大规模分布式系统应用的特色。RocketMQ思路起源于Kafka,但并非Kafka的一个Copy,它对消息的可靠传输及事务性作了优化,目前在阿里集团被普遍应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。springboot
简单总结: Kafka的性能最好,适用于对消息吞吐量达,对消息丢失不敏感的系统;RocketMQ借鉴了Kafka并提升了消息的可靠性,修复了Kafka的不足;RabbitMQ性能略低于Kafka,并实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的标准,有很是好的稳定性。bash
支持语言对比服务器
RabbitMQ的特色是易用、扩展性好(集群访问)、高可用,具体以下:
在了解消息通信以前首先要了解3个概念:生产者、消费者和代理。
生产者:消息的建立者,负责建立和推送数据到消息服务器;
消费者:消息的接收方,用于处理数据和确认消息;
代理:就是RabbitMQ自己,用于扮演“快递”的角色,自己不生产消息,只是扮演“快递”的角色。
首先你必须链接到Rabbit才能发布和消费消息,那怎么链接和发送消息的呢?
你的应用程序和Rabbit Server之间会建立一个TCP链接,一旦TCP打开,并经过了认证,认证就是你试图链接Rabbit以前发送的Rabbit服务器链接信息和用户名和密码,有点像程序链接数据库,使用Java有两种链接认证的方式,后面代码会详细介绍,一旦认证经过你的应用程序和Rabbit就建立了一条AMQP信道(Channel)。
信道是建立在“真实”TCP上的虚拟链接,AMQP命令都是经过信道发送出去的,每一个信道都会有一个惟一的ID,不管是发布消息,订阅队列或者接收消息都是经过信道完成的。
对于操做系统来讲建立和销毁TCP会话是很是昂贵的开销,假设高峰期每秒有成千上万条链接,每一个链接都要建立一条TCP会话,这就形成了TCP链接的巨大浪费,并且操做系统每秒能建立的TCP也是有限的,所以很快就会遇到系统瓶颈。
若是咱们每一个请求都使用一条TCP链接,既知足了性能的须要,又能确保每一个链接的私密性,这就是引入信道概念的缘由。
ConnectionFactory(链接管理器): 应用程序与Rabbit之间创建链接的管理器,程序代码中使用;
Channel(信道): 消息推送使用的通道;
Exchange(交换器): 用于接受、分配消息;
Queue(队列): 用于存储生产者的消息;
RoutingKey(路由键): 用于把生成者的数据分配到交换器上;
BindingKey(绑定键): 用于把交换器的消息绑定到队列上;
看到上面的解释,最难理解的路由键和绑定键了,那么他们具体怎么发挥做用的,请看下图:
RabbitMQ的Exchange(交换器)分为四类:
其中headers交换器容许你匹配AMQP消息的header而非路由键,除此以外headers交换器和direct交换器彻底一致,但性能却不好,几乎用不到,因此咱们这里不作解释。
direct为默认的交换器类型,也很是的简单,若是路由键匹配的话,消息就投递到相应的队列,以下图:
fanout有别于direct交换器,fanout是一种发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到全部附加到这个交换器的队列上。
注意: 对于fanout交换器来讲routingKey(路由键)是无效的,这个参数是被忽略的。
topic交换器运行和fanout相似,可是能够更灵活的匹配本身想要订阅的信息,这个时候routingKey路由键就排上用场了,使用路由键进行消息(规则)匹配。
topic路由器的关键在于定义路由键,定义routingKey名称不能超过255字节,使用“.”做为分隔符,例如:com.mq.rabbit.error。
匹配规则
匹配表达式能够用“*”和“#”匹配任何字符,具体规则以下:
例如发布了一个“cn.mq.rabbit.error”的消息:
能匹配上的路由键:
cn.mq.rabbit.*
cn.mq.rabbit.#
#.error
cn.mq.#
#
不能匹配上的路由键:
cn.mq.*
*.error
*
RabbitMQ队列和交换器有一个不可告人的秘密,就是默认状况下重启服务器会致使消息丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化。
当你把消息发送到Rabbit服务器的时候,你须要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须知足3个条件:
持久化工做原理
Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费以后,Rabbit会把这条消息标识为等待垃圾回收。
持久化的缺点
消息持久化的优势显而易见,但缺点也很明显,那就是性能,由于要写入硬盘要比写入内存性能较低不少,从而下降了服务器的吞吐量,尽管使用SSD硬盘可使事情获得缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。
因此使用者要根据本身的状况,选择适合本身的方式。
学习更多RabbitMQ知识,访问:gitbook.cn/gitchat/act…
(1)下载镜像
从镜像的大小也能够很直观的看出来alpine是轻量版。
使用命令:
docker pull rabbitmq:3.7.7-management
下载带management插件的版本。
(2)运行RabbitMQ
使用命令:
docker run -d --hostname myrabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.7-management
正常启动以后,访问:http://localhost:15672/
登陆网页管理页面,用户名密码:guest/guest,登陆成功以下图:
若是用Idea建立新项目,能够直接在建立Spring Boot的时候,点击“Integration”面板,选择RabbitMQ集成,以下图:
若是是老Maven项目,直接在pom.xml添加以下代码:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
在application.properties设置以下信息:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
复制代码
3.3 代码
本节分别来看三种交换器:direct、fanout、topic的实现代码。
建立DirectConfig.java代码以下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
final static String QUEUE_NAME = "direct"; //队列名称
final static String EXCHANGE_NAME = "mydirect"; //交换器名称
@Bean
public Queue queue() {
// 声明队列 参数一:队列名称;参数二:是否持久化
return new Queue(DirectConfig.QUEUE_NAME, false);
}
// 配置默认的交换机,如下部分均可以不配置,不设置使用默认交换器(AMQP default)
@Bean
DirectExchange directExchange() {
// 参数一:交换器名称;参数二:是否持久化;参数三:是否自动删除消息
return new DirectExchange(DirectConfig.EXCHANGE_NAME, false, false);
}
// 绑定“direct”队列到上面配置的“mydirect”路由器
@Bean
Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with(DirectConfig.QUEUE_NAME);
}
}
复制代码
建立Sender.java代码以下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
/** * 消息发送者-生产消息 */
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void driectSend(String message) {
System.out.println("Direct 发送消息:" + message);
//参数一:交换器名称,能够省略(省略存储到AMQP default交换器);参数二:路由键名称(direct模式下路由键=队列名称);参数三:存储消息
this.rabbitTemplate.convertAndSend("direct", message);
}
}
复制代码
注意:
建立Receiver.java代码以下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 消息接收者-消费消息
*/
@Component
@RabbitListener(queues = "direct")
public class Receiver {
@Autowired
private AmqpTemplate rabbitTemplate;
@RabbitHandler
/**
* 监听消费消息
*/
public void process(String message) {
System.out.println("Direct 消费消息:" + message);
}
}
复制代码
使用Spring Boot中的默认测试框架JUnit进行单元测试,不了解JUnit的能够参考个人上一篇文章,建立MQTest.java代码以下:
package com.example.rabbitmq.mq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
import static org.junit.Assert.*;
@RunWith(SpringRunner.class)
@SpringBootTest
public class MQTest {
@Autowired
private Sender sender;
@Test
public void driectTest() {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.driectSend("Driect Data:" + sf.format(new Date()));
}
}
复制代码
执行以后,效果以下图:
表示消息已经被发送并被消费了。
建立FanoutConfig.java代码以下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
final static String QUEUE_NAME = "fanout"; //队列名称
final static String QUEUE_NAME2 = "fanout2"; //队列名称
final static String EXCHANGE_NAME = "myfanout"; //交换器名称
@Bean
public Queue queueFanout() {
return new Queue(FanoutConfig.QUEUE_NAME);
}
@Bean
public Queue queueFanout2() {
return new Queue(FanoutConfig.QUEUE_NAME2);
}
//配置交换器
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(FanoutConfig.EXCHANGE_NAME);
}
// 绑定队列到交换器
@Bean
Binding bindingFanoutExchangeQueue(Queue queueFanout, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueFanout).to(fanoutExchange);
}
// 绑定队列到交换器
@Bean
Binding bindingFanoutExchangeQueue2(Queue queueFanout2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueFanout2).to(fanoutExchange);
}
}
复制代码
建立FanoutSender.java代码以下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String message) {
System.out.println("发送消息:" + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME, message);
}
public void send2(String message) {
System.out.println("发送消息2:" + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME2, message);
}
}
复制代码
建立两个监听类,第一个FanoutReceiver.java代码以下:
package com.example.rabbitmq.mq;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "fanout")
public class FanoutReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("Fanout(FanoutReceiver)消费消息:" + msg);
}
}
复制代码
第二个FanoutReceiver2.java代码以下:
package com.example.rabbitmq.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout2")
public class FanoutReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Fanout(FanoutReceiver2)消费消息:" + message);
}
}
复制代码
建立FanoutTest.java代码以下:
package com.example.rabbitmq.mq;
import com.example.rabbitmq.RabbitmqApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.text.SimpleDateFormat;
import java.util.Date;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class FanoutTest {
@Autowired
private FanoutSender sender;
@Test
public void Test() throws InterruptedException {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
sender.send("Time1 => " + sf.format(new Date()));
sender.send2("Date2 => " + sf.format(new Date()));
}
}
复制代码
运行测试代码,输出结果以下:
发送消息:Time1 => 2018-09-11
发送消息2:Date2 => 2018-09-11
Fanout(FanoutReceiver2)消费消息:Time1 => 2018-09-11
Fanout(FanoutReceiver2)消费消息:Date2 => 2018-09-11
Fanout(FanoutReceiver)消费消息:Time1 => 2018-09-11
Fanout(FanoutReceiver)消费消息:Date2 => 2018-09-11
复制代码
总结: 能够看出fanout会把消息分发到全部订阅到该交换器的队列,fanout模式是忽略路由键的。
@Configuration
public class TopicConfig {
final static String QUEUE_NAME = "log";
final static String QUEUE_NAME2 = "log.all";
final static String QUEUE_NAME3 = "log.all.error";
final static String EXCHANGE_NAME = "topicExchange"; //交换器名称
@Bean
public Queue queuetopic() {
return new Queue(TopicConfig.QUEUE_NAME);
}
@Bean
public Queue queuetopic2() {
return new Queue(TopicConfig.QUEUE_NAME2);
}
@Bean
public Queue queuetopic3() {
return new Queue(TopicConfig.QUEUE_NAME3);
}
// 配置交换器
@Bean
TopicExchange topicExchange() {
return new TopicExchange(TopicConfig.EXCHANGE_NAME);
}
// 绑定队列到交换器,并设置路由键(log.#)
@Bean
Binding bindingtopicExchangeQueue(Queue queuetopic, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic).to(topicExchange).with("log.#");
}
// 绑定队列到交换器,并设置路由键(log.*)
@Bean
Binding bindingtopicExchangeQueue2(Queue queuetopic2, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic2).to(topicExchange).with("log.*");
}
// 绑定队列到交换器,并设置路由键(log.*.error)
@Bean
Binding bindingtopicExchangeQueue3(Queue queuetopic3, TopicExchange topicExchange) {
return BindingBuilder.bind(queuetopic3).to(topicExchange).with("log.*.error");
}
}
复制代码
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void topicSender(String message) {
String routingKey = "log.all.error";
System.out.println(routingKey + " 发送消息:" + message);
this.rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE_NAME, routingKey, message);
}
}
复制代码
@Component
@RabbitListener(queues = "log")
public class TopicReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("log.# 消费消息:" + msg);
}
}
复制代码
@Component
@RabbitListener(queues = "log.all")
public class TopicReceiver2 {
@RabbitHandler
public void process(String msg) {
System.out.println("log.* 消费消息:" + msg);
}
}
复制代码
@Component
@RabbitListener(queues = "log.all.error")
public class TopicReceiver3 {
@RabbitHandler
public void process(String msg) {
System.out.println("log.*.error 消费消息:" + msg);
}
}
复制代码
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqApplication.class)
public class FanoutTest {
@Autowired
private FanoutSender fanoutSender;
@Test
public void Test() {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
fanoutSender.send("Time1 => " + sf.format(new Date()));
fanoutSender.send2("Date2 => " + sf.format(new Date()));
}
}
复制代码
输出结果:
log.all.error 发送消息:time => 2018-09-11
log.# 消费消息:time => 2018-09-11
log.*.error 消费消息:time => 2018-09-11
复制代码
总结: 在Topic Exchange中“#”能够匹配全部内容,而“*”则是匹配一个字符段的内容。
以上示例代码Github地址:github.com/vipstone/sp…
参考文档
阿里 RocketMQ 优点对比:juejin.im/entry/5a0ab…