RabbitMQ - 发布订阅模式

Publish/Subscribe 模式

以前讲的 工做队列模式,一个工做队列中的任务只能分发给一个 消费者。而咱们今天要聊的这个 发布/订阅模式 有着更复杂的工做模式, 他能够将一个消息发给多个消费者。以下图所示:.net


Exchanges

从上面的图中,咱们能够看到明显比以前的工做队列模式,多了一个组成部分(即图中的 x ),他就是 exchanges,那么这个东西究竟是什么呢? 这里引用官网的一句话 Exchange is like JFK airport,这是个形象的比喻。下面会具体说明。3d

full messaging model

核心思想就是生产者再也不发消息给 queue, 而是发给 exchanges,而且在这个过程当中,生产者并不知道把消息发到了哪一个 queue。取而代之的是,生产者将消息发给了 exchanges(exchanges 负责接收生产者发来的消息,并将消息传送到队列当中),而exchanges 要根据某种规则来判断怎么处理接收过来的消息(是把消息发给一个队列,仍是发给多个队列, 或者其余)。而这些规则被声明成了 exchange type;code

exchanges type

  1. direct
  2. topic
  3. headers
  4. fanout

示例程序

下面代码 使用了 fanout(负责把消息以相似广播的形式,发送到多个队列) 类型的 exchangs,代码较以前的代码作了微小改动。blog

/**
 * 生产者
 * @author Administrator
 *
 */
   public class Producer {

	private static final String EXCHANGE_NAME = "logs";
	
	public static void main(String[] args) {
		ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    try (Connection connection = factory.newConnection();
	        Channel channel = connection.createChannel()) {
	        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

	        String message = "info: Hello World!";
	        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
	        System.out.println(" [x] Sent '" + message + "'");
	    }catch(Exception e){
	    	e.printStackTrace();
	    }
	}
	
}
**
 * 消费者
 * @author Administrator
 *
 */
public class Consumer {

	private static final String EXCHANGE_NAME = "logs";

	  public static void main(String[] argv) throws Exception {
	    ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("localhost");
	    Connection connection = factory.newConnection();
	    Channel channel = connection.createChannel();

	    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
	    String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");

	    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

	    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
	        String message = new String(delivery.getBody(), "UTF-8");
	        System.out.println(" [x] Received '" + message + "'");
	    };
	    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
	  }
	
}
相关文章
相关标签/搜索