RabbitMQ 是一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的做用。本文介绍RabbitMQ 安装和使用。php
RabbitMQ 是一个开源的AMQP
实现,服务器端用Erlang
语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。html
能够把消息队列想象成邮局,你的笔友把信件投递到邮局,邮递员源源不断地进出邮局,把笔友的信送到你的手里。此时的笔友就是一个生产者(Product),邮递员一次送信就是(Queue),而你收信就像是消费者(Consumer)。java
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的原始用途只是为金融界提供一个能够彼此协做的消息协议,而如今的目标则是为通用消息队列架构提供通用构建工具。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。c++
RabbitMQ 则是一个开源的 AMQP 实现。git
一般咱们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多作了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange)。这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。github
经过 RabbitMQ 官网 的示例中看到 RabbitMQ 有六种模式。spring
官网中有多种语言的实现,本文用 Java 来实现。采用 Springboot 集成 RabbitMQ。apache
yum updatecentos
yum install epel-release安全
yum install gcc gcc-c++ glibc-devel make ncurses-devel openssl-devel autoconf java-1.8.0-openjdk-devel git wget wxBase.x86_64
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
yum update
yum install erlang
验证是否安装成功,输入命令:erl
由于 EPEL 中的 Elixir 版本太老,因此下面是经过源码编译安装的过程:
git clone https://github.com/elixir-lang/elixir.git
cd elixir/
make clean test
export PATH=”$PATH:/usr/local/elixir/bin”
验证是否安装成功,输入命令:iex
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-3.6.1-1.noarch.rpm
rpm –import https://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum install rabbitmq-server-3.6.1-1.noarch.rpm
至此已经安装完成,下面介绍启动和自动开机启动命令和配置
启动:
systemctl start rabbitmq-server
开机自动启动:
systemctl enable rabbitmq-server
查看 rabbitmq-server 状态:
rabbitmqctl status
关闭:
systemctl enable rabbitmq-server
能够直接经过配置文件的访问进行管理,也能够经过Web的访问进行管理。
经过Web进行管理,开启 Web 管理:
rabbitmq-plugins enable rabbitmq_management
chown -R rabbitmq:rabbitmq /var/lib/rabbitmq/
注:先启动 RabbitMQ
访问:http://192.168.2.223:15672/
,默认用户 guest ,密码 guest。
发现登陆失败,因为帐号guest具备全部的操做权限,而且又是默认帐号,出于安全因素的考虑,guest用户只能经过localhost登录使用。
咱们新增一个用户:
rabbitmqctl add_user admin 123456
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin “.“ “.“ “.*”
假设如今已经按照前面的步骤完成了 RabbitMQ 的安装,如今开始使用 Springboot 集成 RabbitMQ。
IDEA 先新建一个 maven 项目,在 pom 文件中添加相关依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.shuiyujie</groupId> <artifactId>pom</artifactId> <version>1.0-SNAPSHOT</version> <name>pom</name> <!-- Spring Boot 启动父依赖 --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.2.RELEASE</version> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> </properties> <dependencies> <!-- Spring Boot Test 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- rabbitmq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> </project> |
1 2 3 4 5 6 |
# rabbitmq 配置文件 spring.rabbitmq.host=192.168.0.223 # 默认端口 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456 |
如今咱们的目标很简单就是建立一个生产者 P,和一个消费者 C,同时将 P 产生的消息放到队列中供 C 使用。
Queue
1 2 3 4 5 6 7 8 9 10 11 |
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Bean public Queue helloQueue() { return new Queue("hello"); } } |
HelloSender
1 2 3 4 5 6 7 8 9 10 11 12 |
@Controller public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hello " + new Date(); System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("hello", context); } } |
HelloReceiver
1 2 3 4 5 6 7 8 |
@Component public class HelloReceiver { @RabbitHandler @RabbitListener(queues = "hello") public void process(String hello) { System.out.println("Receiver : " + hello); } } |
运行
1 2 3 4 5 6 7 8 9 10 11 12 |
@RunWith(SpringRunner.class) @SpringBootTest(classes = HelloApplication.class) public class RabbitmqApplicationTests { @Autowired private HelloSender helloSender; @Test public void hello() throws Exception { helloSender.send(); } } |
成功接收到消息
1 |
Receiver : hello Thu Feb 01 22:21:39 CST 2018 |
注意:HelloReceiver
的@RabbitListener(queues = "hello")
注解是方法级的,参照别的文章都是类级别的注解致使一直没法正常链接。
Work Queues
模式在原来的基础上多增长了一个消费者。同理咱们能够扩展三个、四个甚至更多的consumer
。这样作的好处在于,当咱们使用一个consumer
的时候,当它收到一条消息进行处理的时候会发生阻塞。有多个consumer
时,消息就能够分发给空闲的consumer
进行处理。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
/** * Work 模式下的生产者 * * @author shui * @create 2018-02-04 **/ @Controller public class WorkSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(int i) { String context = "work "; System.out.println("Sender : " + context + "*****" + i); this.rabbitTemplate.convertAndSend("work", context); } } |
Queue
1 2 3 4 5 6 7 |
@Configuration public class WorkConfig { @Bean public Queue workQueue() { return new Queue("work"); } } |
两个消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@Component public class WorkReceicer1 { @RabbitHandler @RabbitListener(queues = "work") public void process(String message) { System.out.println("Work Receiver1 : " + message); } } @Component public class WorkReceicer2 { @RabbitHandler @RabbitListener(queues = "work") public void process(String message) { System.out.println("Work Receiver2 : " + message); } } |
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@RunWith(SpringRunner.class) @SpringBootTest(classes = Startup.class) public class RabbitMQDirectTest { @Autowired private WorkSender workSender; @Test public void sendWorkTest() { for (int i = 0; i < 20; i++) { workSender.send(i); } } } |
结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Work Receiver1 : work Work Receiver2 : work Work Receiver2 : work Work Receiver1 : work Work Receiver2 : work Work Receiver1 : work Work Receiver2 : work Work Receiver1 : work Work Receiver1 : work Work Receiver2 : work Work Receiver2 : work Work Receiver1 : work Work Receiver2 : work Work Receiver1 : work Work Receiver1 : work Work Receiver2 : work Work Receiver1 : work Work Receiver2 : work Work Receiver2 : work Work Receiver1 : work |
发现消费得很平均,每一个consumer
处理一半的消息。
从上面的两个例子咱们看到producer
产生的消息直接发送给queue
,而后queue
又直接将消息传给consumer
。RabbitMQ 的亮点就在于改变了上面这种消息传递的方式,producer
不会将消息直接传给queue
而是传给exchanges
再由exchangers
传给queue
。然而咱们在前面的两个例子中并无使用exchanges
,那是由于 RabbitMQ 有默认的exchanges
,只要咱们传的参数是""
。在默认模式下,不须要将exchanges
作任何绑定。除此以外exchanges
有如下几种类型:
- Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的 routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
- Topic:按规则转发消息(最灵活)
- Headers:设置header attribute参数类型的交换机
- Fanout:转发消息到全部绑定队列
Queue
如下使用的是Fanout Exchange
转发消息到全部绑定队列。这里要配置两个queue
,而且配置exchanges
,并把queue
和exchanges
绑定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
/** * * public/subscribe 模式 * * @author shui * @create 2018-02-04 **/ @Configuration public class FanoutConfig { /************************************************************************ * 新建队列 fanout.A 、fanout.B ************************************************************************/ @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } /** * 创建一个交换机 * * @return */ @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } /************************************************************************ * 将 fanout.A 、 fanout.B 绑定到交换机 fanoutExchange 上 ************************************************************************/ @Bean Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } } |
生产者
在建立producter
的时候,要将他和exchanges
绑定。
1 2 3 4 5 6 7 8 9 10 11 |
@Controller public class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("fanoutExchange","", context); } } |
两个消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
@Component public class FanoutReceiveA { @RabbitHandler @RabbitListener(queues = "fanout.A") public void process(String message) { System.out.println("fanout Receiver A : " + message); } } @Component public class FanoutReceiveB { @RabbitHandler @RabbitListener(queues = "fanout.B") public void process(String message) { System.out.println("fanout Receiver B : " + message); } } |
测试
1 2 3 4 5 6 7 8 9 10 11 |
@RunWith(SpringRunner.class) @SpringBootTest(classes = Startup.class) public class FanoutTest { @Autowired private FanoutSender fanoutSender; @Test public void setFanoutSender() { fanoutSender.send(); } } |
结果
1 2 |
fanout Receiver B : hi, fanout msg fanout Receiver A : hi, fanout msg |
在前面的Fanout
模式下,消息会直接广播给queue
。若是咱们想让consumer
处理某些特定的消息,就要让他接收消息的队列中没有其余类型的消息,因此能不能让queue
只接收某些消息,而不接收另外一些消息呢?
RabbitMQ 中有一个 Routingkey 的概念。在队列与交换机的绑定过程当中添加Routingkey
表示queue
接收的消息须要带有Routingkey
。
Topic
模式和Direct
模式相似,Direct
模式须要Routingkey
彻底匹配而Topic
模式更加灵活,能够经过通配符进行配置。
- 在这种交换机模式下:路由键必须是一串字符,用句号(.) 隔开,例如:topic.A
- 路由模式必须包含一个星号
*
,主要用于匹配路由键指定位置的一个单词,好比说,一个路由模式是这样子:agreements..b.*,那么就只能匹配路由键是这样子的:第一个单词是 agreements,第四个单词是 b。 井号(#)就表示至关于一个或者多个单词;例如一个匹配模式是agreements.eu.berlin.#,那么,以agreements.eu.berlin开头的路由键都是能够的。
Queue and exchange
另个队列分别为 topic.A,topic.B,将他们绑定到 topicExchange 上。而且设置了规则,topic.A 必须是彻底匹配的也就是Direct
模式,topic.B 使用Topic
模式,只要是Rouctingkey
为 topic 开头的均可以接收。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
@Configuration public class TopicConfig { final static String message = "topic.A"; final static String messages = "topic.B"; @Bean public Queue queueMessage() { return new Queue(TopicConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } } |
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
@Controller public class TopicSend { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String context = "hi, i am message 0"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.1", context); } public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context); } public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context); } } |
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@Component @RabbitListener(queues = "topic.A") public class TopicReceiver { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver1 : " + message); } } @Component @RabbitListener(queues = "topic.B") public class TopicReceiver2 { @RabbitHandler public void process(String message) { System.out.println("Topic Receiver2 : " + message); } } |
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
@RunWith(SpringRunner.class) @SpringBootTest(classes = Startup.class) public class TopicTest { @Autowired private TopicSend sender; @Test public void topic() throws Exception { sender.send(); } @Test public void topic1() throws Exception { sender.send1(); } @Test public void topic2() throws Exception { sender.send2(); } } |
结果
1 2 3 4 5 6 7 |
Sender : hi, i am message 1 Sender : hi, i am messages 2 Sender : hi, i am message 0 Topic Receiver1 : hi, i am message 1 Topic Receiver2 : hi, i am message 1 Topic Receiver2 : hi, i am messages 2 Topic Receiver2 : hi, i am message 0 |
掌握 RabbitMQ 的核心在于如何使用好exchanges
,它有默认模式""
, direct
, topic
, headers
和 fanout
这几种模式。
经过 RabbitMQ 的 routingkey
能够过滤交换机传递给队列的消息。fanout
模式下,须要队列和交换机的routingkey
彻底匹配,而在topic
模式下,能够经过通配符进行配置,变得更加灵活。
Install RabbitMQ server in CentOS 7
Install Erlang and Elixir in CentOS 7