在上一篇博客《RabbitMQ入门:发布/订阅(Publish/Subscribe)》中,咱们认识了fanout类型的exchange,它是一种经过广播方式发送消息的路由器,全部和exchange创建的绑定关系的队列都会接收到消息。可是有一些场景只须要订阅到一部分消息,这个时候就不能使用fanout 类型的exchange了,这个就引出来今天的“猪脚”--Direct Exchange,经过Routing Key来决定须要将消息发送到哪一个或者哪些队列中。html
接下来请收看详细内容:react
1、Direct Exchange(直接路由器)ide
在上文中介绍exchange的时候,对direct exchange进行了简单介绍,它是一种彻底按照routing key(路由关键字)进行投递的:当消息中的routing key和队列中的binding key彻底匹配时,才进行会将消息投递到该队列中。这里提到了一个routing key和binding key(绑定关键字),是什么东东?ui
在发送消息的时候,basicPublish的第二个参数就是routing key,因为上次是fanout 类型的exchange 进行广播方式投递,这个字段不会影响投递结果,所以咱们这里就传入了“”,可是在direct 类型的exchange中咱们就不能传入""了,须要指定具体的关键字。spa
咱们在前文中创建绑定关系的时候,queueBind的第三个参数就是绑定关键字debug
咱们声明direact exchange的时候使用:code
2、多重绑定htm
多个队列以相同的绑定键绑定到同一个路由器的状况,咱们称之为多重绑定。blog
工做模型为(P表明生产者,X表明路由器,红色的Q表明队列,C表明消费者):rabbitmq
3、代码实例
预备知识了解完了,如今来写个程序感觉下。
public class LogDirectSender { // exchange名字 public static String EXCHANGE_NAME = "directExchange"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立链接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.为通道声明direct类型的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 3.发送消息到指定的exchange,队列指定为空,由exchange根据状况判断须要发送到哪些队列 String routingKey = "debug"; String msg = " hello rabbitmq, I am " + routingKey; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("product send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 4.关闭链接 if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
和上次博客中生产者的区别就是黑字粗体部分:1.路由器类型改成direct 2.消息发布的时候指定了routing key
public class LogDirectReciver { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立链接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.为通道声明direct类型的exchange channel.exchangeDeclare(LogDirectSender.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 3.建立随机名字的队列 String queueName = channel.queueDeclare().getQueue(); // 4.创建exchange和队列的绑定关系 String[] bindingKeys = { "error", "info", "debug" }; // String[] bindingKeys = { "error" }; for (int i = 0; i < bindingKeys.length; i++) { channel.queueBind(queueName, LogDirectSender.EXCHANGE_NAME, bindingKeys[i]); System.out.println(" **** LogDirectReciver keep alive ,waiting for " + bindingKeys[i]); } // 5.经过回调生成消费者并进行监听 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { // 获取消息内容而后处理 String msg = new String(body, "UTF-8"); System.out.println("*********** LogDirectReciver" + " get message :[" + msg + "]"); } }; // 6.消费消息 channel.basicConsume(queueName, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
和上次博客中消费者的区别就是黑字粗体部分:1.路由器类型改成direct 2.创建绑定关系的时候指定了binding key
**** LogDirectReciver keep alive ,waiting for error **** LogDirectReciver keep alive ,waiting for info **** LogDirectReciver keep alive ,waiting for debug
这个消费者咱们视为消费者1,它会接收error,info,debug三个关键字的消息。
**** LogDirectReciver keep alive ,waiting for error
这个消费者咱们视为消费者2,它只会接收error 关键字的消息。
第一次执行:
product send a msg: hello rabbitmq, I am debug
第二次执行:
product send a msg: hello rabbitmq, I am info
第三次执行:
product send a msg: hello rabbitmq, I am error
消费者1: **** LogDirectReciver keep alive ,waiting for error **** LogDirectReciver keep alive ,waiting for info **** LogDirectReciver keep alive ,waiting for debug *********** LogDirectReciver get message :[ hello rabbitmq, I am debug] *********** LogDirectReciver get message :[ hello rabbitmq, I am info] *********** LogDirectReciver get message :[ hello rabbitmq, I am error] 消费者2: **** LogDirectReciver keep alive ,waiting for error *********** LogDirectReciver get message :[ hello rabbitmq, I am error]
exchanges标签页里面多了个direct类型的路由器。进入详细页面:
有4个绑定关系,其中三个的队列是同一个。切换到Queues标签页:
有两个临时队列。
若是关掉消费者1和消费者2,会发现队列自动删除了,绑定关系也不存在了。