【译】RabbitMQ系列(三) - 发布/订阅模式

发布订阅模式

在以前的文章里,建立了work queue。work queue中,每个task都会派发给一个worker。在本章中,咱们会完成彻底不同的事情 - 咱们会派发一条message给多个消费者。咱们称之为发布订阅模式。html

为了更好来讲明,咱们将要构建一个简单的日志系统。会由两部分代码构成,第一部分来发送日志message,第二部分会接受并打印日志。java

在咱们的日志系统中,每个接收程序都会收到日志message。这种方式下,咱们能够运行一个接收程序将日志保存到磁盘,同时使用另一个接收程序将日志打印到屏幕。bash

本质上来讲,发布的日志message会广播到全部运行的接收者。服务器

Exchanges

在以前的章节咱们经过queue收发message。如今开始介绍Rabbit中的full messaging model。学习

首先让咱们快速的回忆一下以前的章节spa

  • producer是一个发送message的用户程序。
  • queue是保存message的缓冲区
  • consumer是接收message的用户程序

RabbitMQ的messaging model的核心思想是producer不会直接向queue发送message。实际上,不少时候producer也不知道message会发送到哪些queue。日志

这里,producer将message发送到exchange。exchange是一个很是简单的东西。一方面它从producer侧接收message,另外一方面它把message推送到queue去。 exchange必须知道对接收到的message接着要去作什么。是转发到特定的queue?仍是转发到多个queue?仍是干脆丢弃掉。这个规则取决于定义时exchange的类型。code

图片描述

exchange有四种可选的类型:direct, topic, headers和fanout. 今天咱们聚焦于最后一种-fanout。让咱们建立一个fanout类型的exchange,命名为logshtm

channel.exchangeDeclare("logs","fanout");

fanout类型的exchange是很是简单的。能够从名字上大概猜出其用途,它广播全部的message到它所知道的queue去。这也正是日志应用所指望的。rabbitmq

列出全部的exhange,可使用rabbitmqctl命令 sudo rabbitmqctl list_exchanges,在列表总会出现一些amq.* 的exchange,和默认的exchange。这些是默认自动建立的,咱们不会使用到它们。

没有名字的exchange。在以前的章节里咱们没有提到过exchanges,咱们直接将message发送到queue。其实咱们是用到了默认的exchange,用空字符串”“来标识。回想一下,咱们像下面这样发布message:
channel.basicPublish("","hello",null,message.getBytes()); 第一个参数就是exchange的名字。空字符串表明了没有名字的exchange:message被路由到了由routingKey指定名字的queue。

如今,咱们能够向有名字的exchange发布message。

channel.basicPublish("logs","",null,message.getBytes());

Temporary Queue

以前咱们使用queue时都会指定名字,如hello和task_queue。给一个queue命名是很重要的,由于咱们要给worker指出相同的queue。当须要在生产者和消费者间共享一个queue时,就必须给queue取好名字。

可是在咱们日志应用中,状况却有所不一样。咱们须要接收到全部的log message。咱们也关注当前流动的message。咱们须要搞定2个事情。

首先,当链接到Rabbit时,咱们须要一个全新的,空的queue。所以咱们能够本身建立一个随意名字的queue,或是由服务器选择随意的queue名字,这固然是更好的选择。

其次,当咱们断开接收者时,该queue能够被自动删除。

在java客户端中,当咱们使用无参的queueDeclare()时,咱们建立的是使用自动生成名字的一个不持久的,自动删除queue:

String queueName = channel.queueDeclare().getQueue();

能够经过这里来学习到exclusive标志和其余queue的相关属性。

这时queue就具备一个随机的名字,好比像amq.gen-JzTY20BRgKO-HjmUJj0wLg.

Bindings

图片描述
咱们已经建立了一个fanout exchange和queue.如今咱们要设置exchange,让它把message发送到咱们的queue。exchange和queue这种关系的创建咱们称之为binding.

channel.queueBind(queueName,"logs","");

从如今开始logs这个exchange就会将message推向咱们的队列了。

可使用命令rabbitmqctl list_bindings 来列出当前全部的binding。

开始执行

图片描述
生产者程序,和以前章节的代码变化不大,主要的变化是咱们将message发送到exchange而不是一个queue。你发现咱们在发送的时候会填上一个routingKey,这个值在fanout类型的exchange中是被忽略的。下面是生产者EmitLog.java的代码

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = argv.length < 1 ? "info: Hello World!" :
                            String.join(" ", argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }
}

如你所见,在创建connection以后咱们声明了exchange.这一步是必要的,发布Message到一个不存在的exchange是不容许的。

若是没有queue绑定到exchange的时候,发布的message是会丢失的,但在如今这个场景是OK的。下面是ReceiveLogs.java的代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

首先进行编译

javac -cp $CP EmitLog.java ReceiveLog.java

若是要把日志保存到文件,则

java -cp $CP ReceiveLogs > logs_from_rabbit.log

若是要在控制台看日志,在另外一个终端

java -cp $CP ReceiveLogs

最后来发送日志

java -cp $CP EmitLog

使用rabbitmqctl list_bindings,来确认程序建立了咱们在代码中指定的binding和queue. 运行两个ReceiveLogs程序,你会看到像下面的输出

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.
相关文章
相关标签/搜索