以前讲的 工做队列模式,一个工做队列中的任务只能分发给一个 消费者。而咱们今天要聊的这个 发布/订阅模式 有着更复杂的工做模式, 他能够将一个消息发给多个消费者。以下图所示:.net
从上面的图中,咱们能够看到明显比以前的工做队列模式,多了一个组成部分(即图中的 x ),他就是 exchanges,那么这个东西究竟是什么呢? 这里引用官网的一句话 Exchange is like JFK airport,这是个形象的比喻。下面会具体说明。3d
核心思想就是生产者再也不发消息给 queue, 而是发给 exchanges,而且在这个过程当中,生产者并不知道把消息发到了哪一个 queue。取而代之的是,生产者将消息发给了 exchanges(exchanges 负责接收生产者发来的消息,并将消息传送到队列当中),而exchanges 要根据某种规则来判断怎么处理接收过来的消息(是把消息发给一个队列,仍是发给多个队列, 或者其余)。而这些规则被声明成了 exchange type;code
下面代码 使用了 fanout(负责把消息以相似广播的形式,发送到多个队列) 类型的 exchangs,代码较以前的代码作了微小改动。blog
/** * 生产者 * @author Administrator * */ public class Producer { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "info: Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); }catch(Exception e){ e.printStackTrace(); } } }
** * 消费者 * @author Administrator * */ public class Consumer { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }