翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.htmlhtml
在前面的教程中,咱们建立了一个工做队列,都是假设一个任务只交给一个消费者。此次咱们作一些彻底不一样的事儿——将消息发送给多个消费者。这种模式叫作“发布/订阅”。java
为了说明这个模式,咱们将构建一个简单日志系统。它包含2段程序:第一个将发出日志消息,第二个接受并打印消息。算法
若是在日志系统中每个接受者(订阅者)都会的获得消息的拷贝。那样的话,咱们能够运行一个接受者(订阅者)程序,直接把日志记录到硬盘。同时运行另外一个接受者(订阅者)程序,打印日志到屏幕上。服务器
说白了,发表日志消息将被广播给全部的接收者。less
Exchanges(转发器)前面的博文汇总,咱们都是基于一个队列发送和接受消息。如今介绍一下完整的消息传递模式。ide
RabbitMQ消息模式的核心理念是:生产者没有直接发送任何消费到队列。实际上,生产者都不知道这个消费是发送给哪一个队列的。翻译
相反,生产者只能发送消息给转发器,转发器是很是简单的。一方面它接受生产者的消息,另外一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则经过转发器的类型进行定义。3d
类型有:Direct、Topic、Headers和Fanout。咱们关注最后一个。如今让咱们建立一个该类型的转发器,定义以下:日志
channel.exchangeDeclare("logs", "fanout");
fanout转发器很是简单,从名字就能够看出,它是广播接受到的消息给全部的队列。而这正好符合日志系统的需求。code
Nameless exchange(匿名转发)以前咱们对转换器一无所知,却能够将消息发送到队列,那是多是咱们用了默认的转发器,转发器名为空字符串""。以前咱们发布消息的代码是:
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数就是转发器的名字,空字符串表示模式或者匿名的转发器。消息经过队列的routingKey路由到指定的队列中去,若是存在的话。
如今咱们能够指定转发器的名字了:
channel.basicPublish( "logs", "", null, message.getBytes());Temporary queues(临时队列)
你可能还记得以前咱们用队列时,会指定一个名字。队列有名字对咱们来讲是很是重要的——咱们须要为消费者指定同一个队列。
但这并非咱们的日志系统所关心的。咱们要监听全部日志消息,而不只仅是一类日志。咱们只对对当前流动的消息感兴趣。解决这些问题,我盟须要完成两件事。
首先,每当我盟链接到RabbitMQ时,须要一个新的空队列。为此咱们须要建立一个随机名字的空队列,或者更好的,让服务器选好年则一个随机名字的空队列给咱们。
其次,一旦消费者断开链接,队列将自动删除。
咱们提供一个无参的queueDeclare()方法,建立一个非持久化、独立的、自动删除的队列,且名字是随机生成的。
String queueName = channel.queueDeclare().getQueue();
queueName是一个随机队列名。看起来会像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
Bindings(绑定)咱们已经建立了一个广播的转发器和一个随机队列。如今须要告诉转发器转发消息到队列。这个关联转发器和队列的咱们叫它Binding。
channel.queueBind(queueName, "logs", "");
这样,日志转发器将附加到日志队列上去。
完整的例子:发送端代码(生产者)EmitLog.java
public class EmitLog { private final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException { /** * 建立链接链接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 设置MabbitMQ所在主机ip或者主机名 factory.setHost("127.0.0.1"); // 建立一个链接 Connection connection = factory.newConnection(); // 建立一个频道 Channel channel = connection.createChannel(); // 指定转发——广播 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for(int i=0;i<3;i++){ // 发送的消息 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 关闭频道和链接 channel.close(); connection.close(); } }
消费者1 ReceiveLogs2Console.java
public class ReceiveLogs2Console { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打开链接和建立频道,与发送端同样 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
消费者2 ReceiveLogs2File.java
public class ReceiveLogs2File { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打开链接和建立频道,与发送端同样 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); print2File(message); // System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } private static void print2File(String msg) { try { String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath(); String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, logFileName + ".log"); FileOutputStream fos = new FileOutputStream(file, true); fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
能够看到咱们1个生产者用于发送log消息,2个消费者,一个用于显示,一个用于记录文件。
生产者声明了一个广播模式的转换器,订阅这个转换器的消费者均可以收到每一条消息。能够看到在生产者中,没有声明队列。这也验证了以前说的。生产者其实只关心exchange,至于exchange会把消息转发给哪些队列,并非生产者关心的。
2个消费者,一个打印日志,一个写入文件,除了这2个地方不同,其余地方如出一辙。也是声明一下广播模式的转换器,而队列则是随机生成的,消费者实例启动后,会建立一个随机实例,这个在管理页面能够看到(如图)。而实例关闭后,随机队列也会自动删除。最后将队列与转发器绑定。
注:运行的时候要先运行2个消费者实例,而后在运行生产者实例。不然获取不到实例。
看看最终的结果吧:
翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html
在前面的教程中,咱们建立了一个工做队列,都是假设一个任务只交给一个消费者。此次咱们作一些彻底不一样的事儿——将消息发送给多个消费者。这种模式叫作“发布/订阅”。
为了说明这个模式,咱们将构建一个简单日志系统。它包含2段程序:第一个将发出日志消息,第二个接受并打印消息。
若是在日志系统中每个接受者(订阅者)都会的获得消息的拷贝。那样的话,咱们能够运行一个接受者(订阅者)程序,直接把日志记录到硬盘。同时运行另外一个接受者(订阅者)程序,打印日志到屏幕上。
说白了,发表日志消息将被广播给全部的接收者。
Exchanges(转发器)前面的博文汇总,咱们都是基于一个队列发送和接受消息。如今介绍一下完整的消息传递模式。
RabbitMQ消息模式的核心理念是:生产者没有直接发送任何消费到队列。实际上,生产者都不知道这个消费是发送给哪一个队列的。
相反,生产者只能发送消息给转发器,转发器是很是简单的。一方面它接受生产者的消息,另外一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则经过转发器的类型进行定义。
类型有:Direct、Topic、Headers和Fanout。咱们关注最后一个。如今让咱们建立一个该类型的转发器,定义以下:
channel.exchangeDeclare("logs", "fanout");
fanout转发器很是简单,从名字就能够看出,它是广播接受到的消息给全部的队列。而这正好符合日志系统的需求。
Nameless exchange(匿名转发)以前咱们对转换器一无所知,却能够将消息发送到队列,那是多是咱们用了默认的转发器,转发器名为空字符串""。以前咱们发布消息的代码是:
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数就是转发器的名字,空字符串表示模式或者匿名的转发器。消息经过队列的routingKey路由到指定的队列中去,若是存在的话。
如今咱们能够指定转发器的名字了:
channel.basicPublish( "logs", "", null, message.getBytes());Temporary queues(临时队列)
你可能还记得以前咱们用队列时,会指定一个名字。队列有名字对咱们来讲是很是重要的——咱们须要为消费者指定同一个队列。
但这并非咱们的日志系统所关心的。咱们要监听全部日志消息,而不只仅是一类日志。咱们只对对当前流动的消息感兴趣。解决这些问题,我盟须要完成两件事。
首先,每当我盟链接到RabbitMQ时,须要一个新的空队列。为此咱们须要建立一个随机名字的空队列,或者更好的,让服务器选好年则一个随机名字的空队列给咱们。
其次,一旦消费者断开链接,队列将自动删除。
咱们提供一个无参的queueDeclare()方法,建立一个非持久化、独立的、自动删除的队列,且名字是随机生成的。
String queueName = channel.queueDeclare().getQueue();
queueName是一个随机队列名。看起来会像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
Bindings(绑定)咱们已经建立了一个广播的转发器和一个随机队列。如今须要告诉转发器转发消息到队列。这个关联转发器和队列的咱们叫它Binding。
channel.queueBind(queueName, "logs", "");
这样,日志转发器将附加到日志队列上去。
完整的例子:发送端代码(生产者)EmitLog.java
public class EmitLog { private final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException { /** * 建立链接链接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 设置MabbitMQ所在主机ip或者主机名 factory.setHost("127.0.0.1"); // 建立一个链接 Connection connection = factory.newConnection(); // 建立一个频道 Channel channel = connection.createChannel(); // 指定转发——广播 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for(int i=0;i<3;i++){ // 发送的消息 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 关闭频道和链接 channel.close(); connection.close(); } }
消费者1 ReceiveLogs2Console.java
public class ReceiveLogs2Console { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打开链接和建立频道,与发送端同样 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
消费者2 ReceiveLogs2File.java
public class ReceiveLogs2File { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打开链接和建立频道,与发送端同样 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); print2File(message); // System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } private static void print2File(String msg) { try { String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath(); String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, logFileName + ".log"); FileOutputStream fos = new FileOutputStream(file, true); fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
能够看到咱们1个生产者用于发送log消息,2个消费者,一个用于显示,一个用于记录文件。
生产者声明了一个广播模式的转换器,订阅这个转换器的消费者均可以收到每一条消息。能够看到在生产者中,没有声明队列。这也验证了以前说的。生产者其实只关心exchange,至于exchange会把消息转发给哪些队列,并非生产者关心的。
2个消费者,一个打印日志,一个写入文件,除了这2个地方不同,其余地方如出一辙。也是声明一下广播模式的转换器,而队列则是随机生成的,消费者实例启动后,会建立一个随机实例,这个在管理页面能够看到(如图)。而实例关闭后,随机队列也会自动删除。最后将队列与转发器绑定。
注:运行的时候要先运行2个消费者实例,而后在运行生产者实例。不然获取不到实例。
看看最终的结果吧:
翻译地址:http://www.rabbitmq.com/tutorials/tutorial-four-java.html
在前篇博文中,咱们创建了一个简单的日志系统。能够广播消息给多个消费者。本篇博文,咱们将添加新的特性——咱们能够只订阅部分消息。好比:咱们能够接收Error级别的消息写入文件。同时仍然能够在控制台打印全部日志。
Bindings(绑定)在上一篇博客中咱们已经使用过绑定。相似下面的代码:
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定表示转换器与队列之间的关系。能够简单的人为:队列对该转发器上的消息感兴趣。
绑定能够设定额外的routingKey参数。为了与避免basicPublish方法(发布消息的方法)的参数混淆,咱们准备把它称做绑定键(binding key)。下面展现如何使用绑定键(binding key)来建立一个绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
绑定键关键取决于转换器的类型。对于fanout类型,忽略此参数。
Direct exchange(直接转发)前面讲到咱们的日志系统广播消息给全部的消费者。咱们想对其扩展,根据消息的严重性来过滤消息。例如:咱们但愿将致命错误的日志消息记录到文件,而不是把磁盘空间浪费在warn和info类型的日志上。咱们使用的fanout转发器,不能给咱们太多的灵活性。它仅仅只是盲目的广播而已。咱们使用direct转发器进行代替,其背后的算法很简单——消息会被推送至绑定键(binding key)和消息发布附带的选择键(routing key)彻底匹配的队列。
在上图中,咱们能够看到direct类型的转发器与2个队列进行了绑定。第一个队列使用的绑定键是orange,第二个队列绑定键为black和green。这样当消息发布到转发器是,附带orange绑定键的消息将被路由到队列Q1中去。附带black和green绑定键的消息被路由到Q2中去。其余消息所有丢弃。
Multiple bindings(多重绑定)使用一个绑定键绑定多个队列是彻底合法的。如上图,绑定键black绑定了2个队列——Q1和Q2。
Emitting logs(发送日志)咱们将这种模式用于日志系统,发送消息给direct类型的转发器。咱们将 提供日志严重性作为绑定键。那样,接收程序能够选择性的接收严重性的消息。首先关注发送日志的代码:
像往常同样首先建立一个转换器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
而后为发送消息作准备:
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
为了简化代码,咱们假定日志的严重性是‘info’,‘warning’,‘error’中之一。
Subscribing(订阅)接收消息跟前面博文中的同样。咱们仅须要修改一个地方:为每个咱们感兴趣的严重性的消息,建立一个新的绑定。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }完整的例子
发送端代码(EmitLogDirect.java)
public class EmitLogDirect { private final static String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException { /** * 建立链接链接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 设置MabbitMQ所在主机ip或者主机名 factory.setHost("127.0.0.1"); // 建立一个链接 Connection connection = factory.newConnection(); // 建立一个频道 Channel channel = connection.createChannel(); // 指定转发——广播 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //全部日志严重性级别 String[] severities={"error","info","warning"}; for(int i=0;i<3;i++){ String severity = severities[i%3];//每一次发送一条不一样严重性的日志 // 发送的消息 String message = "Hello World"+Strings.repeat(".", i+1); //参数1:exchange name //参数2:routing key channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity +"':'"+ message + "'"); } // 关闭频道和链接 channel.close(); connection.close(); } }
消费者1(ReceiveLogs2Console.java)
public class ReceiveLogs2Console { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打开链接和建立频道,与发送端同样 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); //全部日志严重性级别 String[] severities={"error","info","warning"}; for (String severity : severities) { //关注全部级别的日志(多重绑定) channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
消费者2(ReceiveLogs2File.java)
public class ReceiveLogs2File { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打开链接和建立频道,与发送端同样 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); String severity="error";//只关注error级别的日志,而后记录到文件中去。 channel.queueBind(queueName, EXCHANGE_NAME, severity); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); //记录日志到文件: print2File( "["+ envelope.getRoutingKey() + "] "+message); } }; channel.basicConsume(queueName, true, consumer); } private static void print2File(String msg) { try { String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath(); String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, logFileName + ".log"); FileOutputStream fos = new FileOutputStream(file, true); fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
最终结果:
罗哩罗嗦的说这么多,其实就是说了这么一件事:咱们可使用Direct exchange+routingKey来过滤本身感兴趣的消息。一个队列能够绑定多个routingKey。这就是咱们今天的主题——路由选择。
翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html
在前面的教程中,咱们建立了一个工做队列,都是假设一个任务只交给一个消费者。此次咱们作一些彻底不一样的事儿——将消息发送给多个消费者。这种模式叫作“发布/订阅”。
为了说明这个模式,咱们将构建一个简单日志系统。它包含2段程序:第一个将发出日志消息,第二个接受并打印消息。
若是在日志系统中每个接受者(订阅者)都会的获得消息的拷贝。那样的话,咱们能够运行一个接受者(订阅者)程序,直接把日志记录到硬盘。同时运行另外一个接受者(订阅者)程序,打印日志到屏幕上。
说白了,发表日志消息将被广播给全部的接收者。
Exchanges(转发器)前面的博文汇总,咱们都是基于一个队列发送和接受消息。如今介绍一下完整的消息传递模式。
RabbitMQ消息模式的核心理念是:生产者没有直接发送任何消费到队列。实际上,生产者都不知道这个消费是发送给哪一个队列的。
相反,生产者只能发送消息给转发器,转发器是很是简单的。一方面它接受生产者的消息,另外一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则经过转发器的类型进行定义。
类型有:Direct、Topic、Headers和Fanout。咱们关注最后一个。如今让咱们建立一个该类型的转发器,定义以下:
channel.exchangeDeclare("logs", "fanout");
fanout转发器很是简单,从名字就能够看出,它是广播接受到的消息给全部的队列。而这正好符合日志系统的需求。
Nameless exchange(匿名转发)以前咱们对转换器一无所知,却能够将消息发送到队列,那是多是咱们用了默认的转发器,转发器名为空字符串""。以前咱们发布消息的代码是:
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数就是转发器的名字,空字符串表示模式或者匿名的转发器。消息经过队列的routingKey路由到指定的队列中去,若是存在的话。
如今咱们能够指定转发器的名字了:
channel.basicPublish( "logs", "", null, message.getBytes());Temporary queues(临时队列)
你可能还记得以前咱们用队列时,会指定一个名字。队列有名字对咱们来讲是很是重要的——咱们须要为消费者指定同一个队列。
但这并非咱们的日志系统所关心的。咱们要监听全部日志消息,而不只仅是一类日志。咱们只对对当前流动的消息感兴趣。解决这些问题,我盟须要完成两件事。
首先,每当我盟链接到RabbitMQ时,须要一个新的空队列。为此咱们须要建立一个随机名字的空队列,或者更好的,让服务器选好年则一个随机名字的空队列给咱们。
其次,一旦消费者断开链接,队列将自动删除。
咱们提供一个无参的queueDeclare()方法,建立一个非持久化、独立的、自动删除的队列,且名字是随机生成的。
String queueName = channel.queueDeclare().getQueue();
queueName是一个随机队列名。看起来会像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
Bindings(绑定)咱们已经建立了一个广播的转发器和一个随机队列。如今须要告诉转发器转发消息到队列。这个关联转发器和队列的咱们叫它Binding。
channel.queueBind(queueName, "logs", "");
这样,日志转发器将附加到日志队列上去。
完整的例子:发送端代码(生产者)EmitLog.java
public class EmitLog { private final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException { /** * 建立链接链接到MabbitMQ */ ConnectionFactory factory = new ConnectionFactory(); // 设置MabbitMQ所在主机ip或者主机名 factory.setHost("127.0.0.1"); // 建立一个链接 Connection connection = factory.newConnection(); // 建立一个频道 Channel channel = connection.createChannel(); // 指定转发——广播 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for(int i=0;i<3;i++){ // 发送的消息 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } // 关闭频道和链接 channel.close(); connection.close(); } }
消费者1 ReceiveLogs2Console.java
public class ReceiveLogs2Console { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打开链接和建立频道,与发送端同样 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
消费者2 ReceiveLogs2File.java
public class ReceiveLogs2File { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws IOException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); // 打开链接和建立频道,与发送端同样 Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 声明一个随机队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 建立队列消费者 final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); print2File(message); // System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } private static void print2File(String msg) { try { String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath(); String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); File file = new File(dir, logFileName + ".log"); FileOutputStream fos = new FileOutputStream(file, true); fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes()); fos.flush(); fos.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
能够看到咱们1个生产者用于发送log消息,2个消费者,一个用于显示,一个用于记录文件。
生产者声明了一个广播模式的转换器,订阅这个转换器的消费者均可以收到每一条消息。能够看到在生产者中,没有声明队列。这也验证了以前说的。生产者其实只关心exchange,至于exchange会把消息转发给哪些队列,并非生产者关心的。
2个消费者,一个打印日志,一个写入文件,除了这2个地方不同,其余地方如出一辙。也是声明一下广播模式的转换器,而队列则是随机生成的,消费者实例启动后,会建立一个随机实例,这个在管理页面能够看到(如图)。而实例关闭后,随机队列也会自动删除。最后将队列与转发器绑定。
注:运行的时候要先运行2个消费者实例,而后在运行生产者实例。不然获取不到实例。
看看最终的结果吧: