这篇博客介绍订阅、路由和通配符模式,之因此放在一块儿介绍,是由于这三种模式都是用了Exchange交换机,消息没有直接发送到队列,而是发送到了交换机,通过队列绑定交换机到达队列。java
性能排序:fanout > direct >> topic。比例大约为11:10:6
服务器
一个生产者,多个消费者,每个消费者都有本身的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每一个队列绑定交换机,生产者发送的消息通过交换机,到达队列,实现一个消息被多个消费者获取的目的。须要注意的是,若是将消息发送到一个没有队列绑定的exchange上面,那么该消息将会丢失,这是由于在rabbitMQ中exchange不具有存储消息的能力,只有队列具有存储消息的能力。ide
任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的全部Queue上。性能
1.能够理解为路由表的模式url
2.这种模式不须要RouteKeyspa
3.这种模式须要提早将Exchange与Queue进行绑定,一个Exchange能够绑定多个Queue,一个Queue能够同多个Exchange进行绑定。.net
4.若是接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。3d
示例代码:
orm
生产者:blog
[java] view plain copy
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到链接以及mq通道
Connection connection = ConnectionUtil.getConnection();
//从链接中建立通道
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息内容
String message = "商品已经新增,id = 1000";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1:
[java] view plain copy
public class Recv {
private final static String QUEUE_NAME = "test_queue_fanout_1";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到链接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 前台系统: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者2的代码和消费者1的代码大体相同,只是队列的名称不同,这样两个消费者有本身的队列,均可以接收到生产者发送的消息
可是若是生产者有新增商品,修改商品,删除商品的消息,消费者包快前台系统和搜索系统,要求前台系统接收修改和删除商品的消息,搜索系统接收新增商品、修改商品和删除商品的消息。因此使用这种订阅模式实现商品数据的同步并不合理。所以咱们介绍下一种模式:路由模式。
这种模式添加了一个路由键,生产者发布消息的时候添加路由键,消费者绑定队列到交换机时添加键值,这样就能够接收到须要接收的消息。
任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
1.通常状况可使用rabbitMQ自带的Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)。
2.这种模式下不须要将Exchange进行任何绑定(binding)操做
3.消息传递时须要一个“RouteKey”,能够简单的理解为要发送到的队列名字。
4.若是vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
示例代码:
生产者:
[java] view plain copy
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到链接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息内容
String message = "删除商品, id = 1001";
channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1:接收更新和删除消息
[java] view plain copy
public class Recv {
private final static String QUEUE_NAME = "test_queue_direct_1";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到链接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 前台系统: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者2:接收insert,update,delete的消息
[java] view plain copy
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_direct_2";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到链接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 搜索系统: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
若是生产者发布了insert消息,那么消费者2能够收到,消费者 1收不到,若是发布了update或者delete消息,两个消费者均可以收到。若是发布ABC消息两个消费者都收不到,由于没有绑定这个键值。这种模式基本知足了咱们的需求,可是还不够灵活,下面介绍另一个模式。
基本思想和路由模式是同样的,只不过路由键支持模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配很少很多一个词
任何发送到Topic Exchange的消息都会被转发到全部关心RouteKey中指定话题的Queue上
1.这种模式较为复杂,简单来讲,就是每一个队列都有其关心的主题,全部的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到全部关注主题能与RouteKey模糊匹配的队列。
2.这种模式须要RouteKey,也许要提早绑定Exchange与Queue。
3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心全部涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,没法与“log.warn.timeout”匹配;可是“log.#”能与上述二者匹配。
5.一样,若是Exchange没有发现可以与RouteKey匹配的Queue,则会抛弃此消息。
生产者:
[java] view plain copy
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到链接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息内容
String message = "删除商品,id = 1001";
channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1:
[java] view plain copy
public class Recv {
private final static String QUEUE_NAME = "test_queue_topic_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到链接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 前台系统: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者2:
[java] view plain copy
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_topic_2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到链接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" 搜索系统: '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者1是按需索取,并无使用通配符模式,而是用的彻底匹配,消费者2使用通配符模式,这样以item.开头的消息都会所有接收。
1.与简单模式和work模式对比,前面两种同一个消息只能被一个消费者获取,而今天的这三种模式,能够实现一个消息被多个消费者 获取。
2.fanout这种模式没有加入路由器,队列与exchange绑定后,就会接收到全部的消息,其他两种增长了路由键,而且第三种增长通配符,更加便利。
本文出自https://blog.csdn.net/ww130929/article/details/72842234