原文地址:https://www.rabbitmq.com/tutorials/tutorial-two-java.htmlhtml
在第一篇教程中咱们编写了发送消息到队列并从中接收消息的程序。在本篇教程中咱们将会建立一个能够在多个消费者中分发耗时任务的工做队列(Work Queue)。java
工做队列(也叫任务队列)的主要做用是你不须要当即运行一个耗时的任务并等待它完成。相反,咱们能够将这个任务延后运行。咱们将一个任务(task)封装成消息并发送到一个队列。一个后台运行的工做进程(worker process)将会取出这个任务并运行它。当你运行了多个工做线程时,任务会在它们之中进行分配。git
若是你须要在一个较短的HTTP请求窗口中处理一个复杂任务,这个概念会显得尤其重要。github
在上一篇教程中咱们发送了一条包含" Hello World! "的消息。此次咱们将发送表明着复杂任务的字符串。因为咱们并无诸如调整图片大小或者渲染PDF文件这些真正耗时的任务,因此咱们使用==Thread.sleep()==来模拟。咱们将字符串中"."的数量做为它的复杂度,每个点表示一秒钟的“工做”。好比,一个用==Hello...==描述的任务将会消耗三秒钟。shell
为了经过命令行发送任意的消息,咱们将对上一篇教程中的Send.java的代码进行一些修改。这段程序将会把咱们的任务放入工做队列,因此咱们叫它==NewTask.java==。api
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
咱们以前的Recv.java代码也须要进行一些修改:它须要把消息体中的每一个“.”当作一秒钟的任务耗时。它将会接收消息并运行任务,因此咱们把它叫作==Worker.java==。缓存
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
咱们模拟任务耗时的代码以下所示:安全
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
使用任务队列的优点之一就是就能够将任务并行化。若是咱们的任务没法及时处理致使积压,能够很轻松的经过增长更多工做线程来解决。bash
首先,让咱们同时开启两个工做线程。它们将同时从队列中获取消息,而后咱们看下具体状况如何。并发
你须要打开三个命令行窗口(译者注:你能够IDE完成下列操做,不须要命令行)。其中两个用来跑工做线程。这些控制台称做咱们的消费者——C1和C2.
# shell 1 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C
在第三个窗口中咱们发布新的任务。一旦你启动好了消费者,你就能够开始发布一些消息。
# shell 3 java -cp $CP NewTask First message. # => [x] Sent 'First message.' java -cp $CP NewTask Second message.. # => [x] Sent 'Second message..' java -cp $CP NewTask Third message... # => [x] Sent 'Third message...' java -cp $CP NewTask Fourth message.... # => [x] Sent 'Fourth message....' java -cp $CP NewTask Fifth message..... # => [x] Sent 'Fifth message.....'
让咱们看下咱们的消费者收到了什么东西:
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
默认状况下,RabbitMQ会依次将消息发给每一个消费者。通常状况下每一个消费者收到的消息数量是相同的。这种分发消息的方式咱们叫作轮询(round-robin)。你能够试下开启三个或者更多工做线程会有什么结果。
任务的运行可能须要花费一些时间。你可能想要知道当某个消费者运行了一个耗时很长的任务可是中途失败的时候会发生什么。在咱们当前的代码中,一旦RabbitMQ将某个消息发送给了消费者,它会当即将该消息标记为删除。在这种状况下,若是你杀掉了那个进程咱们就会丢失掉它正在处理的消息。一样的,咱们会丢失全部发送到当前进程但还未处理的消息。
可是咱们并不想损失任何消息。当一个工做进程挂掉以后,咱们但愿其余的工做进程能够接受这个任务。
为了保证咱们的每一条消息都不会丢失,RabbitMQ支持消息确认机制。确认消息是消费者用来告诉RabbitMQ某个消息已经被正确收到并处理,能够随时被删除。
若是某个消费者由于挂掉(channel关闭,connection关闭或者TCP链接断开)而没有发送确认消息,RabbitMQ会认为这条信息未处理完成并将它从新入队。若是此时有其余消费者存活,它会将消息转发给对应消费者。经过这种方式你能够保证消息永远不会丢失,即便消费者会随机的挂掉。
当前的消息没有任何超时设置:RabbitMQ在消费者挂掉的时候始终会从新转发消息。 即便处理一条消息花费很是很是长的时间也不要紧。
手动消息确认默认是开启状态。在前一章的例子中咱们经过设置==autoAck=true==手动关闭了它。如今是时候把它设置为false,并在任务完成后发送合适的确认消息了。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
经过这段代码咱们能够保证即便在任务处理消息的时候使用CTRL+C关闭了进程,也不会有任何消息丢失。工做线程挂掉后不久,全部未确认的消息会被从新发送。
确认消息的发送必须与消息的接收位于同一个channel中。尝试使用一个不一样的channel发送确认消息将会收到一个channel级别的协议异常。详情能够查看该文档。
忘记确认
忘记调用basicAck进行确认是一种很常见的错误。这个错误很简单,可是后果很严重。消息会在你的客户端退出的时候从新发送(看起来像是随机发送),可是RabbitMQ会由于一直持有这些未回复的消息而占用大量的内存。
你可使用==rabbitmqctl==打印==messages_unacknowledged==参数来调试这个错误
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged在windwos上,使用下列命令
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
咱们已经学习了当消费者挂掉时如何保证消息不丢失。可是当RabbitMQ服务端挂掉时,消息依然会丢失。
当RabbitMQ退出或崩溃时,默认会丢掉全部的队列和消息。若是想要保证消息不丢失咱们须要作两件事:将队列和消息都设置为持久化。
首先,咱们须要保证RabbitMQ永远不会丢掉咱们的队列。为了达到这个目的,咱们须要把队列声明为持久化的:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
虽然上边的代码自己是正确的,可是却没法在咱们当前的设置下使用。由于咱们已经定义了名叫hello的非持久化队列。RabbitMQ不容许使用不一样的参数从新声明已经存在的队列,而且会返回一个错误。最简单的解决方法就是,从新声明一个名字不一样的队列,好比==task_queue==:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
生产者和消费者都须要修改队列声明的代码。
如今咱们能够保证即便RabbitMQ重启,==task_queue==队列也不会丢失。而后咱们须要将咱们的消息也设置为持久化的——经过设置MessageProperties的PERSISTENT_TEXT_PLAIN参数:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
消息持久化的注意事项
将消息设置为持久化的并不意味着它必定不会丢失。虽然它告诉RabbitMQ将这条消息保存到磁盘上,可是从接收消息到存到磁盘之间仍有一个短暂的时间窗口。一样的,RabbitMQ也不会每一个消息都调用==fsync(2)==——此时消息只会被保存在系统缓存而不是真正的写入磁盘。因此消息的持久化并不是万无一失,可是对于咱们简单的任务队列来讲是足够了。若是你想要一个更加安全的持久化你能够参考生产者确认
你可能已经注意到消息分发并不像咱们想象中的那样工做。考虑这样一个场景,有两个消费者,RabbitMQ发出的单数消息都很复杂,而双数的消息都很简单,这就会致使一个消费者始终处于忙碌状态,而另外一个消费者几乎无事可作。然而,RabbitMQ并不知道这种状况,他依然会均匀的分发消息。
发生这种状况是由于RabbitMQ在消息进入队列时才调度消息。 它并不会关注消费者回复确认消息的数量。 它只是盲目地将每第n条消息发送给第n个消费者。
为了杜绝这种状况发生,咱们可使用==basicQos==方法设置prefetchCount=1。这会告诉RabbitMQ一次最多只能给消费者一条消息。或者换句话说,在消费者处理完消息并回复确认以前,否则发送新的消息。这样的话,RabbitMQ就会将消息分发到另外一个空闲的消费者。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意队列的容量
若是全部的消费者都处于忙碌状态,你的队列可能被填满。你须要注意这一点,或者增长更多消费者,或者使用其余策略。
==NewTask.java==
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
==Worker.java==
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
经过消息确认机制和 prefetchCount参数你能够建立一个工做队列。而持久化选项使得即便 RabbitMQ重启,消息依然会存在。
若是你想要知道关于 Channel类和 MessageProperties的更多信息,能够查阅JavaDocs。