RabbitMQ
是一个开源的AMQP
实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP
等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。java
AMQP
,即Advanced message Queuing
Protocol
,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。git
AMQP
的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。github
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP
等,支持AJAX
。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。spring
一般咱们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ
在这个基本概念之上, 多作了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange
). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。segmentfault
任选其一安全
CentOs7.3 搭建 RabbitMQ 3.6 单机服务与使用bash
CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集群服务与使用服务器
代码我已放到 Github ,导入spring-boot-rabbitmq
项目app
github github.com/souyunku/sp…分布式
在项目中添加 spring-boot-starter-amqp
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
spring.application.name=ymq-rabbitmq-spring-boot
spring.rabbitmq.host=10.4.98.15
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
复制代码
1.Direct Exchange 根据route key 直接找到队列
2.Topic Exchange 根据route key 匹配队列
3.Topic Exchange 不处理route key 全网发送,全部绑定的队列都发送
Direct Exchange
是RabbitMQ
默认的交换机模式,也是最简单的模式,根据key
全文匹配去寻找队列。
任何发送到Direct Exchange
的消息都会被转发到RouteKey
中指定的Queue
。
1.通常状况能够使用rabbitMQ
自带的Exchange:""
(该Exchange
的名字为空字符串,下文称其为default Exchange
)。
2.这种模式下不须要将Exchange
进行任何绑定(binding
)操做
3.消息传递时须要一个RouteKey
,能够简单的理解为要发送到的队列名字。
4.若是vhost
中不存在RouteKey
中指定的队列名,则该消息会被抛弃。
@Configuration
public class RabbitDirectConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
@Bean
public Queue directQueue() {
return new Queue("direct");
}
//-------------------配置默认的交换机模式,能够不须要配置如下-----------------------------------
@Bean
DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
//绑定一个key "direct",当消息匹配到就会放到这个队列中
@Bean
Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("direct");
}
// 推荐使用 helloQueue() 方法写法,这种方式在 Direct Exchange 模式 画蛇添足,不必这样写
//---------------------------------------------------------------------------------------------
}
复制代码
@Component
@RabbitListener(queues = "hello")
public class helloReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 helloReceiver," + message);
}
}
复制代码
@Component
@RabbitListener(queues = "direct")
public class DirectReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 DirectReceiver," + message);
}
}
复制代码
package io.ymq.rabbitmq.test;
import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/** * 描述: 默认的交换机模式 * * @author: yanpenglei * @create: 2017/10/25 1:03 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitDirectTest {
@Autowired
private AmqpTemplate rabbitTemplate;
@Test
public void sendHelloTest() {
String context = "此消息在,默认的交换机模式队列下,有 helloReceiver 能够收到";
String routeKey = "hello";
context = "routeKey:" + routeKey + ",context:" + context;
System.out.println("sendHelloTest : " + context);
this.rabbitTemplate.convertAndSend(routeKey, context);
}
@Test
public void sendDirectTest() {
String context = "此消息在,默认的交换机模式队列下,有 DirectReceiver 能够收到";
String routeKey = "direct";
String exchange = "directExchange";
context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;
System.out.println("sendDirectTest : " + context);
// 推荐使用 sendHello() 方法写法,这种方式在 Direct Exchange 画蛇添足,不必这样写
this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
}
}
复制代码
按顺序执行:响应
接收者 helloReceiver,routeKey:hello,context:此消息在,默认的交换机模式队列下,有 helloReceiver 能够收到
接收者 DirectReceiver,context:directExchange,routeKey:direct,context:此消息在,默认的交换机模式队列下,有 DirectReceiver 能够收到
复制代码
任何发送到Fanout Exchange
的消息都会被转发到与该Exchange
绑定(Binding)
的全部Queue上
。
1.能够理解为路由表的模式
2.这种模式不须要 RouteKey
3.这种模式须要提早将Exchange
与Queue
进行绑定,一个Exchange
能够绑定多个Queue
,一个Queue
能够同多个Exchange
进行绑定。
4.若是接受到消息的Exchange
没有与任何Queue
绑定,则消息会被抛弃。
@Configuration
public class RabbitFanoutConfig {
final static String PENGLEI = "fanout.penglei.net";
final static String SOUYUNKU = "fanout.souyunku.com";
@Bean
public Queue queuePenglei() {
return new Queue(RabbitFanoutConfig.PENGLEI);
}
@Bean
public Queue queueSouyunku() {
return new Queue(RabbitFanoutConfig.SOUYUNKU);
}
/** * 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的全部队列上。 */
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeQueuePenglei(Queue queuePenglei, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queuePenglei).to(fanoutExchange);
}
@Bean
Binding bindingExchangeQueueSouyunku(Queue queueSouyunku, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueSouyunku).to(fanoutExchange);
}
}
复制代码
@Component
@RabbitListener(queues = "fanout.penglei.net")
public class FanoutReceiver1 {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 FanoutReceiver1," + message);
}
}
复制代码
@Component
@RabbitListener(queues = "fanout.souyunku.com")
public class FanoutReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 FanoutReceiver2," + message);
}
}
复制代码
package io.ymq.rabbitmq.test;
import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/** * 描述: 广播模式或者订阅模式队列 * * @author: yanpenglei * @create: 2017/10/25 1:08 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitFanoutTest {
@Autowired
private AmqpTemplate rabbitTemplate;
@Test
public void sendPengleiTest() {
String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 能够收到";
String routeKey = "topic.penglei.net";
String exchange = "fanoutExchange";
System.out.println("sendPengleiTest : " + context);
context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;
this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
}
@Test
public void sendSouyunkuTest() {
String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 能够收到";
String routeKey = "topic.souyunku.com";
String exchange = "fanoutExchange";
context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;
System.out.println("sendSouyunkuTest : " + context);
this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
}
}
复制代码
按顺序执行:响应
接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 能够收到
接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 能够收到
接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 能够收到
接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 能够收到
复制代码
任何发送到Topic Exchange
的消息都会被转发到全部关心RouteKey
中指定话题的Queue
上
1.这种模式较为复杂,简单来讲,就是每一个队列都有其关心的主题,全部的消息都带有一个标题``(RouteKey)
,Exchange
会将消息转发到全部关注主题能与RouteKey
模糊匹配的队列。
2.这种模式须要RouteKey
,也许要提早绑定Exchange
与Queue
。
3.在进行绑定时,要提供一个该队列关心的主题,如#.log.#
表示该队列关心全部涉及log的消息(一个RouteKey为MQ.log.error
的消息会被转发到该队列)。
4.#
表示0个或若干个关键字,*
表示一个关键字。如topic.*
能与topic.warn
匹配,没法与topic.warn.timeout
匹配;可是topic.#
能与上述二者匹配。
5.一样,若是Exchange
没有发现可以与RouteKey
匹配的Queue
,则会抛弃此消息。
@Configuration
public class RabbitTopicConfig {
final static String MESSAGE = "topic.message";
final static String MESSAGES = "topic.message.s";
final static String YMQ = "topic.ymq";
@Bean
public Queue queueMessage() {
return new Queue(RabbitTopicConfig.MESSAGE);
}
@Bean
public Queue queueMessages() {
return new Queue(RabbitTopicConfig.MESSAGES);
}
@Bean
public Queue queueYmq() {
return new Queue(RabbitTopicConfig.YMQ);
}
/** * 交换机(Exchange) 描述:接收消息而且转发到绑定的队列,交换机不存储消息 */
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
//綁定队列 queueMessages() 到 topicExchange 交换机,路由键只接受彻底匹配 topic.message 的队列接受者能够收到消息
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange topicExchange) {
return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message");
}
//綁定队列 queueMessages() 到 topicExchange 交换机,路由键只要是以 topic.message 开头的队列接受者能够收到消息
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange topicExchange) {
return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.message.#");
}
//綁定队列 queueYmq() 到 topicExchange 交换机,路由键只要是以 topic 开头的队列接受者能够收到消息
@Bean
Binding bindingExchangeYmq(Queue queueYmq, TopicExchange topicExchange) {
return BindingBuilder.bind(queueYmq).to(topicExchange).with("topic.#");
}
}
复制代码
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 TopicReceiver1," + message);
}
}
复制代码
@Component
@RabbitListener(queues = "topic.message.s")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 TopicReceiver2," + message);
}
}
复制代码
@Component
@RabbitListener(queues = "topic.ymq")
public class TopicReceiver3 {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 TopicReceiver3," + message);
}
}
复制代码
package io.ymq.rabbitmq.test;
import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/** * 描述: 配置转发消息模式队列 * * @author: yanpenglei * @create: 2017/10/25 1:20 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitTopicTest {
@Autowired
private AmqpTemplate rabbitTemplate;
@Test
public void sendMessageTest() {
String context = "此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 能够收到";
String routeKey = "topic.message";
String exchange = "topicExchange";
context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;
System.out.println("sendMessageTest : " + context);
this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
}
@Test
public void sendMessagesTest() {
String context = "此消息在,配置转发消息模式队列下,有 TopicReceiver2 TopicReceiver3 能够收到";
String routeKey = "topic.message.s";
String exchange = "topicExchange";
context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;
System.out.println("sendMessagesTest : " + context);
this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
}
@Test
public void sendYmqTest() {
String context = "此消息在,配置转发消息模式队列下,有 TopicReceiver3 能够收到";
String routeKey = "topic.ymq";
String exchange = "topicExchange";
context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;
System.out.println("sendYmqTest : " + context);
this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
}
}
复制代码
按顺序执行:响应
接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 能够收到
接收者 TopicReceiver1,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 能够收到
接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 能够收到
接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有 TopicReceiver2 TopicReceiver3 能够收到
接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有 TopicReceiver2 TopicReceiver3 能够收到
接收者 TopicReceiver3,context:topicExchange,routeKey:topic.ymq,context:此消息在,配置转发消息模式队列下,有 TopicReceiver3 能够收到
复制代码
代码我已放到 Github ,导入spring-boot-rabbitmq
项目
github github.com/souyunku/sp…