转发请标明出处:http://blog.csdn.net/lmj623565791/article/details/37657225 java
本系列教程主要来自于官网入门教程的翻译,而后本身进行了部分的修改与实验,内容仅供参考。 服务器
上一篇博客中,咱们实现了工做队列,而且咱们的工做队列中的一个任务只会发给一个工做者,除非某个工做者未完成任务意外被杀死,会转发给另外的工做者,若是你还不了解:RabbitMQ (二)工做队列。这篇博客中,咱们会作一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(相似观察者模式)。 less
为了验证这种模式,咱们准备构建一个简单的日志系统。这个系统包含两类程序,一类程序发动日志,另外一类程序接收和处理日志。 spa
在咱们的日志系统中,每个运行的接收者程序都会收到日志。而后咱们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另外一个接收者把接收到的消息展示在屏幕上。 .net
本质上来讲,就是发布的日志消息会转发给全部的接收者。 翻译
一、转发器(Exchanges)
前面的博客中咱们主要的介绍都是发送者发送消息给队列,接收者从队列接收消息。下面咱们会引入Exchanges,展现RabbitMQ的完整的消息模型。 日志
RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,通常的状况生产者甚至不知道消息应该发送到哪些队列。 code
相反的,生产者只能发送消息给转发器(Exchange)。转发器是很是简单的,一边接收从生产者发来的消息,另外一边把消息推送到队列中。转发器必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则经过转发器的类型进行定义。 orm
下面列出一些可用的转发器类型: blog
Direct
Topic
Headers
Fanout
目前咱们关注最后一个fanout,声明转发器类型的代码:
channel.exchangeDeclare("logs","fanout");
fanout类型转发器特别简单,把全部它介绍到的消息,广播到全部它所知道的队列。不过这正是咱们前述的日志系统所须要的。
二、匿名转发器(nameless exchange)
前面说到生产者只能发送消息给转发器(Exchange),可是咱们前两篇博客中的例子并无使用到转发器,咱们仍然能够发送和接收消息。这是由于咱们使用了一个默认的转发器,它的标识符为””。以前发送消息的代码:
channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
第一个参数为转发器的名称,咱们设置为”” : 若是存在routingKey(第二个参数),消息由routingKey决定发送到哪一个队列。
如今咱们能够指定消息发送到的转发器:
channel.basicPublish( "logs","", null, message.getBytes());
三、临时队列(Temporary queues)
前面的博客中咱们都为队列指定了一个特定的名称。可以为队列命名对咱们来讲是很关键的,咱们须要指定消费者为某个队列。当咱们但愿在生产者和消费者间共享队列时,为队列命名是很重要的。
不过,对于咱们的日志系统咱们并不关心队列的名称。咱们想要接收到全部的消息,并且咱们也只对当前正在传递的数据的感兴趣。为了知足咱们的需求,须要作两件事:
第一, 不管什么时间链接到Rabbit咱们都须要一个新的空的队列。为了实现,咱们可使用随机数建立队列,或者更好的,让服务器给咱们提供一个随机的名称。
第二, 一旦消费者与Rabbit断开,消费者所接收的那个队列应该被自动删除。
Java中咱们可使用queueDeclare()方法,不传递任何参数,来建立一个非持久的、惟一的、自动删除的队列且队列名称由服务器随机产生。
String queueName = channel.queueDeclare().getQueue();
通常状况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 相似。
四、绑定(Bindings)
咱们已经建立了一个fanout转发器和队列,咱们如今须要经过binding告诉转发器把消息发送给咱们的队列。
channel.queueBind(queueName, “logs”, ””)参数1:队列名称 ;参数2:转发器名称
五、完整的例子
日志发送端:
- package com.zhy.rabbit._03_bindings_exchanges;
-
- import java.io.IOException;
- import java.util.Date;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class EmitLog
- {
- private final static String EXCHANGE_NAME = "ex_log";
-
- public static void main(String[] args) throws IOException
- {
- // 建立链接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 声明转发器和类型
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
-
- String message = new Date().toLocaleString()+" : log something";
- // 往转发器上发送消息
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
-
- System.out.println(" [x] Sent '" + message + "'");
-
- channel.close();
- connection.close();
-
- }
-
- }
没什么太大的改变,声明队列的代码,改成声明转发器了,一样的消息的传递也交给了转发器。
接收端1 :ReceiveLogsToSave.java:
- package com.zhy.rabbit._03_bindings_exchanges;
-
- import java.io.File;
- import java.io.FileNotFoundException;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.text.SimpleDateFormat;
- import java.util.Date;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
-
- public class ReceiveLogsToSave
- {
- private final static String EXCHANGE_NAME = "ex_log";
-
- public static void main(String[] argv) throws java.io.IOException,
- java.lang.InterruptedException
- {
- // 建立链接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- // 建立一个非持久的、惟一的且自动删除的队列
- String queueName = channel.queueDeclare().getQueue();
- // 为转发器指定队列,设置binding
- channel.queueBind(queueName, EXCHANGE_NAME, "");
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定接收者,第二个参数为自动应答,无需手动应答
- channel.basicConsume(queueName, true, consumer);
-
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
-
- print2File(message);
- }
-
- }
-
- private static void print2File(String msg)
- {
- try
- {
- String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
- String logFileName = new SimpleDateFormat("yyyy-MM-dd")
- .format(new Date());
- File file = new File(dir, logFileName+".txt");
- FileOutputStream fos = new FileOutputStream(file, true);
- fos.write((msg + "\r\n").getBytes());
- fos.flush();
- fos.close();
- } catch (FileNotFoundException e)
- {
- e.printStackTrace();
- } catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
随机建立一个队列,而后将队列与转发器绑定,而后将消费者与该队列绑定,而后写入日志文件。
接收端2:ReceiveLogsToConsole.java
- package com.zhy.rabbit._03_bindings_exchanges;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
-
- public class ReceiveLogsToConsole
- {
- private final static String EXCHANGE_NAME = "ex_log";
-
- public static void main(String[] argv) throws java.io.IOException,
- java.lang.InterruptedException
- {
- // 建立链接和频道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- // 建立一个非持久的、惟一的且自动删除的队列
- String queueName = channel.queueDeclare().getQueue();
- // 为转发器指定队列,设置binding
- channel.queueBind(queueName, EXCHANGE_NAME, "");
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定接收者,第二个参数为自动应答,无需手动应答
- channel.basicConsume(queueName, true, consumer);
-
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(" [x] Received '" + message + "'");
-
- }
-
- }
-
- }
随机建立一个队列,而后将队列与转发器绑定,而后将消费者与该队列绑定,而后打印到控制台。
如今把两个接收端运行,而后运行3次发送端:
输出结果:
发送端:
[x] Sent '2014-7-10 16:04:54 : log something'
[x] Sent '2014-7-10 16:04:58 : log something'
[x] Sent '2014-7-10 16:05:02 : log something'
接收端1:
接收端2:
[*] Waiting for messages. To exit press CTRL+C
[x] Received '2014-7-10 16:04:54 : log something'
[x] Received '2014-7-10 16:04:58 : log something'
[x] Received '2014-7-10 16:05:02 : log something'
这个例子实现了咱们文章开头所描述的日志系统,利用了转发器的类型:fanout。
本篇说明了,生产者将消息发送至转发器,转发器决定将消息发送至哪些队列,消费者绑定队列获取消息。