昨天简单学习了一个消息队列项目——RabbitMQ,今天趁热打铁,将学到的东西记录下来。html
学习的资料主要是官网给出的6个基本的消息发送/接收模型,或者称为6种不一样的使用场景,本文即是对这6种模型加以叙述。java
在学习6种模型以前,咱们首先须要安装RabbitMQ。RabbitMQ支持多种系统平台,各平台的安装方法能够点此查看。安装好以后,咱们使用以下命令启用Web端的管理插件:rabbitmq-plugins enable rabbitmq_management
,而后启动RabbitMQ。接着用浏览器访问http://localhost:15672/
,若能看到RabbitMQ相关Web页面,说明启动成功。web
正所谓万事开头难,咱们先从最简单的Hello World开始。首先固然是新建一个项目,导入RabiitMQ相关jar。我采用Maven来构建项目,所以只须要在pom文件中添加以下依赖:浏览器
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
接下来学习最简单的消息队列模型,以下图:服务器
在图中,P
表明producer
,它是消息的生产者;C
表明consumer
,它是消息的消费者;而红色的矩形正是咱们所谓的消息队列,它位于RabbitMQ
中(RabbitMQ
中能够有不少这样的队列,而且每一个队列都有一个惟一的名字)。生产者(们)能够将消息发送到消息队列中,消费者(们)能够从消息队列中取出消息。dom
这种模型是否是很简单呢?下面咱们使用Java,借助于RabbitMQ来实现这种模型的消息通讯。异步
首先咱们介绍如何send
消息到消息队列。send
以前,固然是和RabbitMQ服务器创建链接:分布式
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection();
接下来咱们建立一个channel
,大多数API都是经过这个对象来调用的:ide
Channel channel = connection.createChannel();
以后,咱们即可以调用channel
的以下方法去声明一个队列:单元测试
channel.queueDeclare("hello", false, false, false, null);
该方法的第一个参数是队列的名称,其他的参数先无论,以后会介绍。咱们能够尝试着去执行以上的5行代码,而后打开Web端,能够看到新建了一个叫做hello
的队列:
有了队列,咱们即可以向其中发送消息了,一样仍是调用channel
对象的API:
channel.basicPublish("", "hello", null, "Hello World".getBytes());
以上代码所作的事情就是发送了一条字符串消息“Hello World”(第4个参数)到消息队列。你可能注意到咱们调用了String对象的getBytes
方法,没错,咱们发送的实际上二进制数据。所以,理论上你可以发送任何数据到消息队列中,而不只仅是文本信息。
第2个参数叫作路由键(routingKey),在该模型下必须与队列名相同,至于为何,和其余参数同样,以后会了解到。
咱们能够修改发送的文本,再次执行上述代码,而后打开Web端查看,即可以查看到咱们发送的消息:
点击上图的name字段下的hello,能够查看hello
队列中的具体信息:
接下来,咱们去尝试着去获取生产者发送的消息,和send
方法同样,咱们一样须要链接服务器,建立channel
,声明队列:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null);
以后咱们能够调用channel
的相关方法去监听队列,接收消息:
channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } });
以上basicConsume
方法中,第一个参数是队列的名字;第二个参数表示是否自动确认消息的接收状况,咱们使用true,自动确认;第三个参数须要传入一个实现了Consumer
接口的对象,咱们简单的new
一个默认的Consumer
的实现类DefaultConsumer
,而后在handleDelivery
方法中去处理接收到的消息(handleDelivery
方法会在接收到消息时被回调)。
运行以上代码,咱们能够打印出以前向队列中send
的数据:
Hello World Hello World2
下面是Hello World的完整代码:
public class App { @Test public void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); channel.basicPublish("", "hello", null, "Hello World2".getBytes()); channel.close(); connection.close(); } @Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } }); synchronized (this){ // 由于以上接收消息的方法是异步的(非阻塞),当采用单元测试方式执行该方法时,程序会在打印消息前结束,所以使用wait来防止程序提早终止。若使用main方法执行,则不须要担忧该问题。 wait(); } } }
接下来咱们学习第二种模型——Work Queues。顾名思义,这种模型描述的是一个生产者(Boss)向队列发消息(任务),多个消费者(worker)从队列接受消息(任务),以下图所示:
下面咱们用代码去实现。先是生产者send
消息到队列,此次咱们多发送些数据:
@Test public void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); for (int i = 0; i < 9; i++) { channel.basicPublish("", "hello", null, String.valueOf(i).getBytes()); } channel.close(); connection.close(); }
而后是消费者接收数据:
@Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); channel.basicConsume("hello", true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); try { // Thread.sleep(1000); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }); synchronized (this) { wait(); } }
代码基本上和Hello World
的代码同样,只是加上句sleep
来模拟消费者(worker)处理消息所花的时间。
咱们能够先执行三次receive
方法(修改sleep
的时间,其中消费者1 sleep 10s,消费者2,3 sleep 1s),让三个消费者(worker)一块儿等待消息的到来,而后执行send
方法发送9条消息,观察三个消费者收到的消息状况。
若不出意外,你会看到以下的打印结果:
// --------消费者1-------- 0 // 10s 后 3 // 10s 后 6 // --------消费者2-------- 1 // 1s 后 4 // 1s 后 7 // --------消费者3-------- 2 // 1s 后 5 // 1s 后 8
经过打印结果,咱们能够总结出Work queues的几个特色:
事实上,RabbitMQ会循环地(一个接一个地)发送消息给消费者,这种分配消息的方式被称为round-robin(轮询)。
看到这里,不知你是否会担忧:因为worker(消费者)执行任务须要必定的时间(以上用sleep模拟),要是某个worker在运行过程当中挂掉,那分配给它的任务岂不是丢失了(永远不可能被执行了)。为解决这个问题,RabbitMQ提供了消息确认机制,即worker须要主动的去确认消息已经接收了,RabbitMQ才认为消息被“真正地接收了”,实现代码以下:
// send的代码不用变,只需改变basicConsume的第二个参数为false,表示不要自动确认 channel.basicConsume("hello", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); try { // 这里把时间加长了一点便于测试 Thread.sleep(8000); } catch (InterruptedException e) { e.printStackTrace(); }finally { // 这里手动地肯定 channel.basicAck(envelope.getDeliveryTag(), false); } } });
下面作测试。首先执行send
方法,发送9条消息到队列,查看web端状况以下:
此时队列中有9条未被分发的消息。接着运行改变后的receive
方法,而后快速地去Web端查看队列中的消息状况(记得刷新):
发现队列中没有待分发(Ready字段)的消息了,而有9条未被确认(Unacked字段)的消息。但控制台打印出数字6
时,关闭程序,再次去web端查看:
此时队列中又有3条待分发的消息了。缘由正是因为咱们提早终止了receive
方法的执行,致使最后3条消息没有被确认而被从新归还到Ready中。
若是你不是一次性跟着本文运行代码到这里,而是次日接着昨天的工做继续进行,你可能会发现昨天你建立的队列和添加到队列里的消息没有了。极可能的缘由就是消息没有持久化,即按照以上代码运行生成的队列和添加到队列中的消息都是储存在内存中的,RabbitMQ一旦关闭它们就没有了。若是你想将下次启动时还能看到关闭前的消息,你应该将其持久化:
// 将第二个参数设为true,表示声明一个须要持久化的队列。 // 须要注意的是,若你已经定义了一个非持久的,同名字的队列,要么将其先删除(否则会报错),要么换一个名字。 channel.queueDeclare("hello", true, false, false, null);
// 修改了第三个参数,这是代表消息须要持久化 channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
总的来讲,Work queues(Task Queuess)的概念在一些Web场景的应用中是颇有用的,好比咱们可以用它来构建一个master-slave结构的分布式爬虫系统:系统中有一个master节点和多个slave节点,master节点负责向各个slave节点分配爬取任务。
但有些时候,咱们可能但愿一条消息可以被多个消费者接受到,好比一些公告信息等,这时候用Work Queue模型显然不合适,而Publish/Subscribe模型正是对应这种使用场景的。
在介绍Publish/Subscribe以前,咱们快速回顾以前的两个模型,它们好像都是生产者将消息直接发送到消息队列,但其实不是这样的,甚至有可能生产者根本就不知道消息发送到了哪个消息队列。
先别着急,下面咱们完整地介绍RabbitMQ消息发送/接受的方式。
事实上,生产者是把消息发送到了交换机(exchange)中,而后交换机负责(决定)将消息发送到(哪个)消息队列中。其模型以下图:
这时候你可能会疑惑:既然消息是被发送到了交换机中,那咱们以前发送的消息是被发送到了哪个交换机中了?它有没有机制可以让特定的消息发送到指定的队列?
先回答第一个问题。还记得咱们在Hello World中写的发送消息的代码吗?
channel.basicPublish("", "hello", null, message.getBytes());
事实上第一个参数即是指定交换机的名字,即指定消息被发送到哪个交换机。空字符串表示默认交换机(Default Exchange),即咱们以前发送的消息都是先发送到默认交换机,而后它再路由到相应的队列中。其实咱们能够经过Web页面去查看全部存在的交换机:
接着回答第二个问题。路由的依据即是经过第二个参数——路由键(routing key)指定的,以前已经提到过。在以前代码中,咱们指定第二个参数为"hello",即是指定消息应该被交换机路由到路由键为hello的队列中。而默认交换机(Default Exchange)有一个很是有用的性质:
每个被建立的队列都会被自动的绑定到默认交换机上,而且路由键就是队列的名字。
交换机还有4种不一样的类型,分别是direct
,fanout
,topic
,headers
,每种类型路由的策略不一样。
direct
类型的交换机要求和它绑定的队列带有一个路由键K,如有一个带有路由键R的消息到达了交换机,交换机会将此消息路由到路由键K = R的队列。默认交换机即是该类型。所以,在下图中,消息会沿着绿色箭头路由:
fanout
类型的交换机会路由每一条消息到全部和它绑定的队列,忽略路由键。
剩下的两种类型以后再作介绍。
在以上概念基础上,咱们来看第3种消息模型:Publish/Subscribe。以下图:
该模型是要让全部的消费者都可以接收到每一条消息。显然,fanout
类型的交换机更符合咱们当前的需求。为此,先建立一个fanout
类型的交换机。
channel.exchangeDeclare("notice", "fanout");
其中,第一个参数是交换机的名称;第二个参数是交换机的类型。
而后咱们能够send
消息了:
channel.basicPublish( "notice", "", null, message.getBytes());
对于消费者,咱们须要为每个消费者建立一个独立的队列,而后将队列绑定到刚才指定的交换机上便可:
// 该方法会建立一个名称随机的临时队列 String queueName = channel.queueDeclare().getQueue(); // 将队列绑定到指定的交换机("notice")上 channel.queueBind(queueName, "notice", "");
如下完整的代码:
@Test public void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("notice", "fanout"); channel.basicPublish( "notice", "", null, "Hello China".getBytes()); channel.close(); connection.close(); } @Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("notice", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "notice", ""); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } }); synchronized (this) { wait(); } }
首先运行两次receive
方法,让两个消费者等待接收消息,而后能够在Web端查看此时的队列状况,以下图所示:
能够看到图中有两个名称随机的队列。接着运行send
方法发送一条消息,最终咱们会看到两个消费者都打印出了Hello China
。而后中止虚拟机让消费者断开链接,再次在Web端查看队列状况,会发现刚才的两个队列被自动删除了。
以上是Publish/Subscribe模式,它已经能让咱们的通知(notice)系统正常运转了。如今再考虑这样一个新需求:对于一些机密通知,咱们只想让部分人看到。这就要求交换机对绑定在其上的队列进行筛选,因而引出了又一个新的模型:Routing。
以前咱们说过,对于direct
类型的交换机,它会根据routing key进行路由,所以咱们能够借助它来实现咱们的需求,模型结构以下图:
下面用代码来实现。先看生产者。
首先要声明一个direct
类型的交换机:
// 这里名称改成notice2 channel.exchangeDeclare("notice2", "direct");
须要注意的是,由于咱们以前声明了一个fanout
类型的名叫notice
的交换机,所以不能再声明一个同名的类型却不同的交换机。
而后能够发送消息了,咱们发送10条消息,其中偶数条消息是秘密消息,只能被routing key 为s的队列接受,其他的消息全部的队列都能接受。
for (int i = 0; i < 10; i++) { String routingKey = "n"; // normal if (i % 2 == 0) { routingKey = "s"; // secret } channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes()); }
接下来是消费者:
// 声明一个名称随机的临时的队列 String queueName = channel.queueDeclare().getQueue(); // 绑定交换机,同时带上routing key channel.queueBind(queueName, "notice2", "n"); // 消费者2号运行时,打开如下注释 // channel.queueBind(queueName, "notice2", "s");
注意,咱们能够屡次调用队列绑定方法,调用时,队列名和交换机名都相同,而routing key不一样,这样可使一个队列带有多个routing key。
如下是完整代码:
@Test public void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("notice2", "direct"); for (int i = 0; i < 10; i++) { String routingKey = "n"; // normal if (i % 2 == 0) { routingKey = "s"; // secret } channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes()); } channel.close(); connection.close(); } @Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("notice2", "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "notice2", "n"); // channel.queueBind(queueName, "notice2", "s"); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } }); synchronized (this) { wait(); } }
测试时,咱们能够先运行一个receive
,而后打开channel.queueBind(queueName, "notice2", "s")
注释,再运行一次receive
,这样就有两个消费者绑定到notice2交换机上,其中消费者1只能收到normal类型的消息,而消费者2既能收到normal类型的消息,又能收到secret类型的消息。接着能够运行send方法。如不出意外,能够看到以下打印结果:
// 消费者1 1 3 5 7 9 // 消费者2 0 1 2 3 4 5 6 7 8 9
有了以上的改进,咱们的notice
系统基本ok了。但有些时候,咱们还须要更加灵活的消息刷选方式。好比咱们对于电影信息,咱们可能须要对它的地区,类型,限制级进行筛选。这时候就要借助Topics模型了。
在Topics模型中,咱们“升级”了routing key,它能够由多个关键词组成,词与词之间由点号(.
)隔开。特别地,规定*
表示任意的一个词;#
号表示任意的0个或多个词。
假设咱们如今须要接收电影信息,每条电影消息附带的routingKey有地区、类型、限制级3个关键字,即:district.type.age
。如今想实现的功能以下图:
如上图所示,队列Q1只关心美国适合13岁以上的电影信息,队列Q2对动做片感兴趣,而队列Q3喜欢中国电影。
下面用Java代码去实现上述功能,相较于以前基本上没有什么改变,下面直接给出代码:
@Test public void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("movie", "topic"); channel.basicPublish("movie", "American.action.13", null, "The Bourne Ultimatum".getBytes()); channel.basicPublish("movie", "American.comedy.R", null, "The Big Lebowski".getBytes()); channel.basicPublish("movie", "American.romantic.13", null, "Titanic".getBytes()); channel.basicPublish("movie", "Chinese.action.13", null, "卧虎藏龙".getBytes()); channel.basicPublish("movie", "Chinese.comedy.13", null, "大话西游".getBytes()); channel.basicPublish("movie", "Chinese.romantic.13", null, "梁山伯与祝英台".getBytes()); channel.close(); connection.close(); } @Test public void receive() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("movie", "topic"); // 队列1 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "movie", "American.*.13"); // 队列2 // String queueName = channel.queueDeclare().getQueue(); // channel.queueBind(queueName, "movie", "*.action.*"); // 队列3 // String queueName = channel.queueDeclare().getQueue(); // channel.queueBind(queueName, "movie", "Chinese.#"); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body, "UTF-8")); } }); synchronized (this) { wait(); } }
运行3次receive
方法,注意打开或关闭相应的注释;再运行send
方法,能够看到控制台输出以下内容:
// 消费者1 The Bourne Ultimatum Titanic // 消费者2 The Bourne Ultimatum 卧虎藏龙 // 消费者3 卧虎藏龙 大话西游 梁山伯与祝英台
第6种模型是用来作RPC(Remote procedure call, 远程程序调用)的。这里直接贴上代码,就不作解释了,想要了解更多细节,可参考这里。代码演示的是,客户端调用服务端的fib
方法,获得返回结果。
RPCServer.java
import com.rabbitmq.client.*; import com.rabbitmq.client.AMQP.BasicProperties; /** * Description: * * @author derker * @Time 2016-10-26 18:24 */ public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); AMQP.BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties .Builder() .correlationId(props.getCorrelationId()) .build(); try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response = "" + fib(n); } catch (Exception e) { System.out.println(" [.] " + e.toString()); response = ""; } finally { channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } }
RPCClient.java
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; import java.util.UUID; /** * Description: * * @author derker * @Time 2016-10-26 18:36 */ public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; } public void close() throws Exception { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(10)"); response = fibonacciRpc.call("10"); System.out.println(" [.] Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (Exception ignore) { } } } } }