4-RabbitMQ交换机-direct

RabbitMQ交换机-direct

direct交换机

RabbitMQ交换机的概念上次已经介绍过了,今天说说direct的交换机模式。上一次我们说到了交换机的绑定

//将队列绑定到交换机上

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

实质上是队列对这个交换机上过来的消息感兴趣。但有的时候我们希望特定的消息流向某个指定的队列,而不是由交换机随机分配。这个时候就应该采用direct的交换机,并且绑定一个标识码,我们称其绑定键

// 声明fanout交换机、设置持久化

boolean durable2 =true;

channel.exchangeDeclare(EXCHANGE_NAME,"direct",durable2);

//将队列绑定到交换机上

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");

channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME,"error");


直接交换

       在这个设置中,我们可以看到直接交换X与两个绑定的队列。第一个队列与绑定键橙色绑定,第二个队列有两个绑定,一个绑定键为黑色,另一个绑定为绿色

在这样的设置中,发布到具有路由**橙色的交换机的消息 将被路由到队列Q1。具有黑色 绿色路由**的消息将转到Q2。所有其他消息将被丢弃。

 

多重绑定

使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以在XQ1Q2之间添加黑色绑定键。在这种情况下,direct交换将表现得像fanout交换了,并将消息广播到所有匹配的队列。具有路由**黑色的消息将传送到 Q1Q2

 

我们现在将用此模型进行日志记录系统,将infowarningerror日志绑定到一个队列中,error绑定到另外一个队列中,具体代码如下。

生产者

package MQ.Exchange.Direct;

 

import java.io.IOException;

import java.util.concurrent.TimeoutException;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

 

/**

 * @Title: MQ.WorkQueues.NewTask.java

 * @Package MQ.WorkQueues

 * @Description:TODO(MQ消息发送到direct的交换机上)

 * @Copyright: Copyright (c) 2017 YUANH All Rights Reserved

 * @authoryuanh

 * @date 2017-5-10下午3:50:35

 */

public classNewTask {

   private final static String EXCHANGE_NAME="direct_logs";

   private final static String QUEUE_NAME ="hello_direct";

   private final static String QUEUE_NAME2 ="hello_direct1";

   private final static String[] service =new String[] { "error","info",

         "warning"};

   private final static String[] service2 =new String[] { "error"};

 

   public static void main(String[] args)throws IOException,TimeoutException {

      // 创建连接连接到MabbitMQ

      ConnectionFactoryfactory = newConnectionFactory();

      // 设置MabbitMQ所在主机ip或者主机名

      factory.setHost("127.0.0.1");

      factory.setUsername("yuanh");

      factory.setPassword("yuanh");

      factory.setPort(5672);

      factory.setVirtualHost("y_yuanh");

      Connectionconnection = factory.newConnection();

      // 创建一个频道

      Channelchannel = connection.createChannel();

      // 声明队列、设置队列持久化

      boolean durable =true;

      channel.queueDeclare(QUEUE_NAME, durable,false, false,null);

      channel.queueDeclare(QUEUE_NAME2, durable,false, false,null);

      // 声明fanout交换机、设置持久化

      boolean durable2 =true;

      channel.exchangeDeclare(EXCHANGE_NAME,"direct",durable2);

      // 将队列绑定到交换机上

      for (String severity :service) {

         channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,severity);

      }

      for (String severity :service2) {

         channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,severity);

      }

      Stringmessage = "info";

      Stringmessage2 = "error";

      Stringmessage3 = "warning";

      // 将消息放到队列里面

      // channel.basicPublish("", QUEUE_NAME, null,message.getBytes());

      // 将消息放到交换机上

      channel.basicPublish(EXCHANGE_NAME,message,null,message.getBytes());

      channel.basicPublish(EXCHANGE_NAME,message2,null,message2.getBytes());

      channel.basicPublish(EXCHANGE_NAME,message3,null, message3.getBytes());

      System.out.println("发送 '" + message +"'");

      // 关闭通道和连接

      channel.close();

      connection.close();

   }

 

 

}

 

执行后队列中会出现



消费者

消费者和之前的一样,只不过是将对列名称改成你需要取的对列名称

 

package MQ.Exchange.Direct;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

 

@SuppressWarnings("deprecation")

/**

 * @Title: MQ.WorkQueues.Worker.java

 * @Package MQ.WorkQueues

 * @Description:TODO(MQ消息发送到direct的交换机上)

 * @Copyright: Copyright (c) 2017 YUANH All Rights Reserved

 * @authoryuanh

 * @date 2017-5-10下午3:48:59

 */

public classWorkerResponse {

   // private final static String EXCHANGE_NAME ="direct_logs";

   private final static String QUEUE_NAME ="hello_direct";

 

   // private final static String QUEUE_NAME2 ="hello_direct1";

 

   public static void main(String[] argv)throws Exception {

 

      // 创建连接连接到MabbitMQ

      ConnectionFactoryfactory = newConnectionFactory();

      // 设置MabbitMQ所在主机ip或者主机名

      factory.setHost("127.0.0.1");

      factory.setUsername("yuanh");

      factory.setPassword("yuanh");

      factory.setPort(5672);

      factory.setVirtualHost("y_yuanh");

      Connectionconnection = factory.newConnection();

      Channelchannel = connection.createChannel();

      // 1声明队列、设置队列持久化

      boolean durable =true;

      channel.queueDeclare(QUEUE_NAME, durable,false, false,null);

      QueueingConsumerconsumer = newQueueingConsumer(channel);

      // 2消费者指定消费队列,打开应答机制,注意false才是打开手动应对,true为自动应答

      boolean ack =false;

      channel.basicConsume(QUEUE_NAME, ack, consumer);

      // 3消费者设置最大服务转发消息数量,公平转发

      int prefetchCount = 1;

      channel.basicQos(prefetchCount);

      try {

         while (true) {

            QueueingConsumer.Deliverydelivery = consumer.nextDelivery();

            Stringmessage = newString(delivery.getBody());

            System.out.println("接收 '" + message +"'");

            try {

                doWork(message);

            }finally{

                System.out.println("结束");

                // 另外需要在每次处理完成一个消息后,手动发送一次应答(ack=false)

                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),

                      false);

            }

         }

      }catch(Exception e) {

         channel.close();

         connection.close();

      }

   }

 

   private static void doWork(String task)throws InterruptedException {

      for (char ch :task.toCharArray()) {

         if (ch =='.') {

            Thread.sleep(1000);

         }else{

            Thread.sleep(1000);

         }

 

      }

   }

}