前两篇博客介绍了两种队列模式,这篇博客介绍订阅、路由和通配符模式,之因此放在一块儿介绍,是由于这三种模式都是用了Exchange交换机,消息没有直接发送到队列,而是发送到了交换机,通过队列绑定交换机到达队列。java
一个生产者,多个消费者,每个消费者都有本身的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每一个队列绑定交换机,生产者发送的消息通过交换机,到达队列,实现一个消息被多个消费者获取的目的。须要注意的是,若是将消息发送到一个没有队列绑定的exchange上面,那么该消息将会丢失,这是由于在rabbitMQ中exchange不具有存储消息的能力,只有队列具有存储消息的能力。服务器
生产者:spa
public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到链接以及mq通道 Connection connection = ConnectionUtil.getConnection(); //从链接中建立通道 Channel channel = connection.createChannel(); // 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息内容 String message = "商品已经新增,id = 1000"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "test_queue_fanout_1"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到链接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, true, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 前台系统: '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
可是若是生产者有新增商品,修改商品,删除商品的消息,消费者包快前台系统和搜索系统,要求前台系统接收修改和删除商品的消息,搜索系统接收新增商品、修改商品和删除商品的消息。因此使用这种订阅模式实现商品数据的同步并不合理。所以咱们介绍下一种模式:路由模式。
3d
这种模式添加了一个路由键,生产者发布消息的时候添加路由键,消费者绑定队列到交换机时添加键值,这样就能够接收到须要接收的消息。code
public class Send { private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 获取到链接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息内容 String message = "删除商品, id = 1001"; channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "test_queue_direct_1"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 获取到链接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 前台系统: '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
public class Recv2 { private final static String QUEUE_NAME = "test_queue_direct_2"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 获取到链接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 搜索系统: '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }若是生产者发布了insert消息,那么消费者2能够收到,消费者 1收不到,若是发布了update或者delete消息,两个消费者均可以收到。若是发布ABC消息两个消费者都收不到,由于没有绑定这个键值。这种模式基本知足了咱们的需求,可是还不够灵活,下面介绍另一个模式。
基本思想和路由模式是同样的,只不过路由键支持模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配很少很多一个词blog
生产者:队列
public class Send { private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 获取到链接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息内容 String message = "删除商品,id = 1001"; channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "test_queue_topic_1"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 获取到链接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 前台系统: '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消费者2:路由
public class Recv2 { private final static String QUEUE_NAME = "test_queue_topic_2"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 获取到链接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#"); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 搜索系统: '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
1.与简单模式和work模式对比,前面两种同一个消息只能被一个消费者获取,而今天的这三种模式,能够实现一个消息被多个消费者 获取。get
2.fanout这种模式没有加入路由器,队列与exchange绑定后,就会接收到全部的消息,其他两种增长了路由键,而且第三种增长通配符,更加便利。同步