前面咱们把每一个Message都是deliver到某个单一的Consumer。今天咱们将了解如何把同一个Message deliver到多个Consumer中。这个模式也被称做 "publish / subscribe"。
首先咱们将建立一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer)。 咱们将构建两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。java
关于exchange的概念在在这里作一下简单的介绍。web
RabbitMQ 的Messaging Model就是Producer并不会直接发送Message到queue。实际上,Producer并不知道它发送的Message是否已经到达queue。less
RabbitMQ消息模型的核心理念是生产者永远不会直接发送给任何的消息队列,通常状况下Producer是不知道消息应该发送到那个队列的。Producer发送的Message其实是发到了Exchange中。它的功能也很简单:从Producer接收Message,而后投递到queue中。Exchange须要知道如何处理Message,是把它放到某个queue中,仍是放到多个queue中?这个rule是经过Exchange 的类型定义的。
spa
咱们知道有三种类型的Exchange:direct, topic ,Headers和fanout。fanout就是广播模式,会将全部的Message都放到它所知道的queue中。建立一个名字为logs,类型为fanout的Exchange:日志
channel.exchange_declare(exchange='logs',type='fanout');orm
fanout类型转发器特别简单,吧全部他接受到的消息,广播多有的他知道的队列。rabbitmq
前面说到的生产者只能发送详细给转发器(Exchange),可是咱们以前的例子中并无使用到转发器啊,咱们仍然能够发送和接收消息,这是为何呢?是匿名转发器(nameless exchange)搞的鬼。由于咱们使用了一个默认转发器。他的标识为" ".队列
channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());get
第一个参数为转发器名,第二个为消息队列名,若是不为空由其决定发送到那个队列中。消息队列
如今咱们能够指定消息发送到转发器中。
channel.basicPublish( "logs","", null, message.getBytes());
Listing exchanges
经过rabbitmqctl能够列出当前全部的Exchange:
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs fanout
amq.direct direct
amq.topic topic
amq.fanout fanout
amq.headers headers
...done.
注意 amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认建立的。
如今咱们能够经过exchange,而不是routing_key来publish Message了:
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
截至如今,咱们用的queue都是有名字的,可以为队列命名对咱们来讲很关键。使用有名字的queue,使得在Producer和Consumer以前共享queue成为可能。
可是对于咱们将要构建的日志系统,并不须要有名字的queue。咱们但愿获得全部的log,而不是它们中间的一部分。并且咱们只对当前的log感兴趣。为了实现这个目标,咱们须要两件事情:
1) 每当Consumer链接时,咱们须要一个新的,空的queue。由于咱们不对老的log感兴趣。幸运的是,若是在声明queue时不指定名字,那么RabbitMQ会随机为咱们选择这个名字。
2)当Consumer关闭链接时,这个queue要被deleted。
String queueName = channel.queueDeclare().getQueue();
经过result.method.queue 能够取得queue的名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj1wLg。
如今咱们已经建立了fanout类型的exchange和没有名字的queue(其实是RabbitMQ帮咱们取了名字)。那exchange怎么样知道它的Message发送到哪一个queue呢?答案就是经过bindings:绑定。
channel.queueBind(queueName, “logs”, ””)参数1:队列名称 ;参数2:转发器名称
如今logs的exchange就将它的Message附加到咱们建立的queue了。Listing bindings
使用命令rabbitmqctl list_bindings。
咱们最终实现的数据流图以下:
package com.zhy.rabbit._03_bindings_exchanges;
import java.io.IOException;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog
{
private final static String EXCHANGE_NAME = "ex_log";
public static void main(String[] args) throws IOException
{
// 建立链接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明转发器和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
String message = new Date().toLocaleString()+" : log something";
// 往转发器上发送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
接收端:
package com.zhy.rabbit._03_bindings_exchanges;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogsToSave
{
private final static String EXCHANGE_NAME = "ex_log";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException
{
// 建立链接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 建立一个非持久的、惟一的且自动删除的队列
String queueName = channel.queueDeclare().getQueue();
// 为转发器指定队列,设置binding
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定接收者,第二个参数为自动应答,无需手动应答
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
print2File(message);
}
}
private static void print2File(String msg)
{
try
{
String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
String logFileName = new SimpleDateFormat("yyyy-MM-dd")
.format(new Date());
File file = new File(dir, logFileName+".txt");
FileOutputStream fos = new FileOutputStream(file, true);
fos.write((msg + "\r\n").getBytes());
fos.flush();
fos.close();
} catch (FileNotFoundException e)
{
e.printStackTrace();
} catch (IOException e)
{
e.printStackTrace();
}
}
}
接收端:
package com.zhy.rabbit._03_bindings_exchanges;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogsToConsole
{
private final static String EXCHANGE_NAME = "ex_log";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException
{
// 建立链接和频道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 建立一个非持久的、惟一的且自动删除的队列
String queueName = channel.queueDeclare().getQueue();
// 为转发器指定队列,设置binding
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定接收者,第二个参数为自动应答,无需手动应答
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}