RabbitMQ入门(3)--发布和订阅

摘要 一次将消息发送到多个消费者java

RabbitMQ RabbitMQ入门python


目录[-]git

发布和订阅

(使用java 客户端)

在先前的指南中,咱们建立了一个工做队列。这工做队列后面的假想是每个任务都被准确的传递给工做者。在这部分咱们将会作一些彻底不一样的事情–咱们将一个消息传递给多个消费者。这部分被认知为“发布和订阅”。.net

为了说明这个部分,咱们会创建一个简单德日志系统,它是由两个程序组成–第一个发出日志消息,第二个接收和打印它们。

在咱们的日志系统中,每个运行的接收者拷贝程序将会得到信息。经过这个方式咱们能够运行一个接收者,直接的把日志记录到硬盘中;在同一时间咱们能够运行另外一个接收者,在屏幕上看这些日志。 
本质上,发布日志消息等同于广播到全部接收者。

交换

在先前指南部分,咱们将消息发送到队列里,并从队列中接收消息。如今是时候介绍RabbitMQ中全消息模型。 
让咱们快速温习下在先前指南中咱们掌握的:

一个发送消息的生产者是一个用户程序。 
一个存储消息的队列是一个缓冲。 
一个接收消息的消费者是一个用户程序。 
在RabbitMQ消息模型中核心的思想是生产者从不直接将消息发送给队列。实际上,生产者经常甚至不知道是否一个消息会被传递到队列中。

相反,生产者仅能将消息发送到一个交换机。一个交换机是一个很是简单的事物。在它的一遍,它从生产者那里接收消息,另外一边将消息推送到队列中。这个交换所必须清楚的知道它所接收到的消息要如何处理。是否将它附加到一个特别的队列中?是否将它附加到多个队列中?或者是否它应该被丢弃。规则的定义是由交换类型决定的。 
exchanges.png 
有几个交换类型:directtopicdeadersfanout。咱们来关注最后一个–fanout。让咱们建立一个这种类型的交换机而且称呼它为logs:

channel.exchangeDeclare("logs", "fanout");

fanout交换机是很是简单的。经过这个名字你可能已经猜出它的用处了,它会将接收的全部消息都广播到全部它所知道的全部队列。这个真正是咱们的记录器所须要的。

交换机列表 
为了列出服务器中全部交换机,你能够运行着有用的rabbitmqctl

$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
        direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

在这个列表里有一些以amq.打头的交换机和默认(未命名)的交换机。这些是默认建立的,可是不太可能你会在某个时刻使用它们。 
匿名交换机 
在先前的指南中咱们对交换机毫无了解,可是咱们依旧能将消息发送到队列中。那是可能实现的,由于咱们使用的是默认交换机,经过咱们使用空字符串(““)标识它。 
回想一下咱们之前是如何发送消息的:

channel.basicPublish("", "hello", null, message.getBytes());

这第一个参数是交换机的名字。空字符串说明它是默认的或者匿名的交换机:路由关键字存在的话,消息经过路由关键字的名字路由到特定的队列上。

如今,咱们能够发布咱们本身命名的交换机:

channel.basicPublish( "logs", "", null, message.getBytes());

临时队列

你可能会想起先前咱们使用的队列是有特定的名字的(是否记得hellotask_queue)。命名一个队列对咱们来讲是相当重要的–咱们须要指定工做者到这相同的队列上。当你想把队列分享给生产者和消费者,给队列名是重要的。 
可是那不是咱们记录器的实例。咱们想监听全部日志消息,不单单是它们中的子集。咱们一样是对当前的消息流感兴趣,而不是旧的。为了解决这个咱们须要两件事。 
首先,不管咱们何时链接RabbitMQ,咱们须要一个新的,空的队列。为了作到这些,咱们能够建立一个随机名字的队列或者更胜一筹-让服务器为咱们选择一个随机的名字。 
第二部,一旦咱们将消费者的链接断开,队列应该自动删除。 
在Java客户端里,当咱们使用无参数调用queueDeclare()方法,咱们建立一个自动产生的名字,不持久化,独占的,自动删除的队列。

String queueName = channel.queueDeclare().getQueue();

在这点,队列名中包含一个随机队列名。例如名字像amq.gen-JzTY20BRgKO-HjmUJj0wLg

绑定

bindings.png

咱们已经建立了一个fanout交换机和队列。如今咱们须要告诉交换机发送消息给咱们的队列上。这交换机和队列之间的关系称之为一个绑定。

channel.queueBind(queueName, "logs", "");

从如今开始,日志交换所将要附加消息到咱们的队列中。

绑定列表 
你能够列出存在的绑定使用,使用rabbitmqctl list_bindings

把全部放在一块儿

python-three-overall.png 
这发送日志消息的生产者程序,跟之前指南中的程序没有多少不一样。这最重要的改变是咱们将匿名的交换机替换为咱们想要消息发布到的日志交换机。当发送是咱们须要申请一个路由关键字,可是在广播消息是它的值会被忽略。这是EmitLog.java程序的代码:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}

(EmitLog.java source
如你所知,创建链接后咱们声明一个交换机。这个步骤是必须的,由于发布到一个不存在的交换机是禁止的。

若是队列尚未绑定到交换机上,消息将会丢失,可是这个对咱们来讲是ok的;若是没有消费者正在监听,咱们能够安全的丢弃消息。 
ReceiveLogs.java代码:

                  java.lang.InterruptedException {
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException,
                  java.lang.InterruptedException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());

            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

(ReceiveLogs.java source
如之前那样编译,咱们已经作了。

$ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java

若是你想把日志保存到文件中,仅仅打开一个控制平台,键入:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log

若是你想在你的屏幕上看这些日志, 新建一个终端而且运行:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs

固然,为了发出日志键入:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog

使用rabbitmactl list_bindings你能够验证这代码确实建立绑定和咱们想要的队列。随着两个ReceiveLogs.java程序的运行你能够看到一些如:

 $ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

这结果的解释是直白简单的:来自交换机的日志流向服务器安排的两个队列中。而且那确实咱们所指望的。 为了弄明白如何监听一个消息的子集,让咱们移到指南的第四部分。

相关文章
相关标签/搜索