Rabbit官方文档翻译之Publish Subscribe(三)

Publish/Subscribe 订阅模式 (using the Java Client)

In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".html

在上一个教程中,咱们建立了一个工做队列。 工做队列的意图是每一个任务都交付给一个工做人员。 在这部分中,咱们会作一些彻底不一样的事情 - 咱们将交付同一个消息给全部的Consumer。 这种模式被称为“发布/订阅”。java

To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.git

咱们要创建一个简单的日志记录系统来讲明这个模式。 日志记录系统它分为两个模块,第一个将发出日志消息,第二个将接收并打印它们github

In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen.bash

在咱们的记录系统中,每一个receiver都会收到同一个消息。 这样咱们就能够运行一个receiver用于将receive的log message写入磁盘; 同时咱们能够运行另外一个receiver将log message输出到屏幕上服务器

Essentially, published log messages are going to be broadcast to all the receivers.app

本质上说,订阅模式会将已发布的日志消息将被广播到全部接收者。less

Exchanges

In previous parts of the tutorial we sent and received messages to and from a queue. Now it's time to introduce the full messaging model in Rabbit.dom

在以前的教程中,咱们是以队列做为中介来实现消息的传递。 如今,咱们将在Rabbit完整的消息传递模式。ide

Let's quickly Go over what we covered in the previous tutorials:

  • A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

让咱们快速回顾一下咱们在之前的教程中介绍的内容:

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

RabbitMQ中的消息传递模型的核心思想是,生产者不会把消息直接发送到队列,而是发送到交换机上,消息传送到队列的过程有交换机完成,这部分生产者是不知道的。 实际上,生产者一般甚至不知道是否将消息传递到任何队列。

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

相反,producer只能将message发送到exchange。exchange的功能也很简单。 一方面,它收到来自生产者的消息,另外一方将它们推送到队列。 exchange必然知道接收到的消息如何处理。 应该把它追加到特定队列上? 仍是追加到多个队列上? 或者丢弃它。 这个rule是经过exchange的类型定义。

There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's create an exchange of this type, and call it logs:

exchange的类型为:direct,topic,headers和fanout。 

这里咱们将重点关注最后一个 - fanout。 以下咱们建立一个fanout类型的exchange,并将其命名为logs:

channel.exchangeDeclare("logs", "fanout");//create a fanout exchange,and named logs;

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for our logger.

fanout exchange的工做模式很是简单。 它只是将全部收到的消息广播给它知道的全部的队列。 这正是咱们须要的logger

Listing exchangesTo list the exchanges on the server you can run the ever useful rabbitmqctl:sudo rabbitmqctl list_exchanges In this list there will be some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.Nameless exchangeIn previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string ("").Recall how we published a message before:channel.basicPublish("", "hello", null, message.getBytes()); The first parameter is the the name of the exchange. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified by routingKey, if it exists.

能够在rabbitmqctl中使用sudo rabbitmqctl list_exchanges命令,它会列举出全部的exchanges。

在这个列表中有许多名为amq.*的exchange和还有一个默认的exchange (AMQP default) 。 这些是默认建立的

Listing exchanges ...
amq.headers     headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.match       headers
amq.fanout      fanout
amq.direct      direct
amq.topic       topic            
(default exchange) direct 


可是如今不太可能须要使用它们。咱们在以前的教程里发送消息没有使用过这些default (unnamed) exchange,但他们仍在咱们将消息发送到队列过程当中起到了做用。以前咱们经过空字符串("")来标识exchange,就表明使用default (unnamed)  exchange。

//在以前的教程中,咱们是这样发布消息的
channel.basicPublish(“”,“hello”,null,message.getBytes());
//第一个参数是exchange的名称。 空字符串表示default (unnamed) exchange//第二个参数是routing key。消息将经过指定的exchange的指定routing key传递到绑定的队列(若是routing key存在)。由于咱们使用的是默认的exchange,因此routing key就等于队列名字

默认路由,官方的说明

The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to

, or unbind from the default exchange. It also cannot be deleted.

defalut exchange会隐式的绑定到全部队列上每一个队列上,routing key等于队列名,任何队列都不可以明确的指明绑定到default exchange,也不能从default exchange上解除绑定。default也不可以被删除

Now, we can publish to our named exchange instead:

如今,咱们经过命名的exchange来发布消息了

channel.basicPublish( "logs", "", null, message.getBytes());

Temporary queues 临时队列

As you may remember previously we were using queues which had a specified name (remember hello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.

截至如今,咱们用的queue都是有名字的:第一个是hello,第二个是task_queue。使用有名字的queue,使得在Producer和Consumer以前共享queue成为可能。可见对队列命名是十分重要的

But that's not the case for our logger. We want to hear about all log messages, not just a subset of them. We're also interested only in currently flowing messages not in the old ones. To solve that we need two things.

  可是对于咱们将要构建的日志系统,并不须要有名字的queue。咱们但愿每一个consumer都能receive到全部的日志message,而不只仅是它们中间的一部分。 咱们也只对当前流行的消息不感兴趣。 要解决咱们须要两件事情。

Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.

首先,当consumer connect to Rabbit,须要一个新的队列。 为此,咱们能够建立一个具备随机名称的队列,或者甚至是更好的 - 这里让服务器为咱们选择一个随机队列名称。

Secondly, once we disconnect the consumer the queue should be automatically deleted.

其次,一旦consumer断开链接,队列应该被自动删除。

In the Java client, when we supply no parameters to queueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name:

在Java客户端中,当咱们没有为queueDeclare()提供参数时,意味着咱们建立了一个具备随机名称的非持久,排他,自动删除的队列:

String queueName = channel.queueDeclare().getQueue();

At that point queueName contains a random queue name. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.

此时,queueName包含一个随机队列名称。 例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

Bindings 绑定

We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.

咱们已经建立了一个fanout exchange和queue。 如今咱们须要告诉exchange发送消息给队列。 exchange和队列之间的关系称为绑定。

channel.queueBind(queueName, "logs", "");

From now on the logs exchange will append messages to our queue.

从如今开始,log exchange将追加消息到绑定的队列中。

Listing bindingsYou can list existing bindings using, you guessed it,rabbitmqctl list_bindings

列出绑定您可使用,您猜想它,rabbitmqctl列表绑定列出现有绑定

Putting it all together 把它们放在一块儿

The producer program, which emits log messages, doesn't look much different from the previous tutorial. The most important change is that we now want to publish messages to our logsexchange instead of the nameless one. We need to supply a routingKey when sending, but its value is ignored for fanout exchanges. Here goes the code for EmitLog.Java program:

发送log的producer与以前的教程说起的并无太大的区别。 最重要的改变是咱们如今发布消息的是到logs exchange而不是default exchange。 发送时须要提供一个routingKey,可是对于faount类型的exchange来讲,routing key的值是被忽略的,由于fanout是要广播全部从producer接受到的消息给全部绑定的队列。 如下是EmitLog.Java程序的代码:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

(EmitLog.java source)

As you see, after establishing the connection we declared the exchange. This step is necessary as publishing to a non-existing exchange is forbidden.

如你所见,创建链接后,咱们申明了一个fanout类型的exchange。 这个步骤是必须的,由于publish 消息到不存在的exchange是禁止的

The messages will be lost if no queue is bound to the exchange yet, but that's okay for us; if no consumer is listening yet we can safely discard the message.

若是没有任何队列绑定到交换机,消息将丢失。

The code for ReceiveLogs.java:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}

(ReceiveLogs.java source)

Compile as before and we're done.

javac -cp $CP EmitLog.java ReceiveLogs.java

If you want to save logs to a file, just open a console and type:

java -cp $CP ReceiveLogs > logs_from_rabbit.log

If you wish to see the logs on your screen, spawn a new terminal and run:

java -cp $CP ReceiveLogs

And of course, to emit logs type:

java -cp $CP EmitLog

Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two ReceiveLogs.java programs running you should see something like:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that's exactly what we intended.

To find out how to listen for a subset of messages, let's move on to tutorial 4

相关文章
相关标签/搜索