RabbitMQ指南之二:工做队列(Work Queues)

在上一章的指南中,咱们写了一个命名队列:生产者往该命名队列发送消息、消费从从该命名队列中消费消息。在本章中,咱们将建立一个工做队列,用于在多个工做者之间分配耗时的任务。工做队列(即任务队列)的主要思想是避免当即执行那些须要等他们执行完成的资源密集型任务。相反,咱们将任务安排在稍后完成。咱们将任务封装为消息并将其发送到队列,后台运行的工做进程将取出任务并执行完成。若是你启动了多个工做者,这些任务将在多个工做者之间分享。java

这个概念也即咱们说的异步,在项目中,有时候一个简单的Web请求,后台要作一系统的操做,这时候,若是后台执行完成以后再给前台返回消息将会致使浏览器页面等待从而出现假死状态。所以,一般的作法是,在这个Http请求到后台,后台获取到正确的参数等信息后当即给前台返回一个成功标志,而后后台异步地进行后续的操做。程序员

一、准备

  本章中,咱们将发送字符串消息来模拟复杂的任务。这里由于没有一个真实的复杂任务,所以用Thread.sleep()方法来模拟复杂耗时的任务。咱们用字符串中的含点(“.")的数量来表示任务的复杂程度,一个点表示一秒钟的耗时,例如:一个发送”Hello ...“字符串的任务将会耗时3秒钟。浏览器

  咱们能够直接将上一章中的Send.java代码拿过来修改,容许从命令行发送消息。本程序将会把任务调试到工做队列,所以,咱们将类名改成NewTask.java:缓存

String message = String.join(" ", argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

   此时完整的NewTask.java代码为:服务器

1 public class NewTask {
 2 
 3     private final static String QUEUE_NAME = "hello";
 4 
 5     public static void main(String[] argv) throws IOException, TimeoutException {
 6 
 7         ConnectionFactory connectionFactory = new ConnectionFactory();
 8         connectionFactory.setHost("HOST");
 9 
10         try(Connection connection = connectionFactory.newConnection();
11             Channel channel = connection.createChannel()) {
12 
13             channel.queueDeclare(QUEUE_NAME,false,false,false,null);
14 
15             String message = String.join(" ", argv);
16             
17             channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
18             System.out.println(" [x] Sent '" + message + "'");
19         }
20     }
21 }

  以前的Recv.java也要作一些修改:模拟字符串消息中的每一个点耗时1秒钟,它将处理传送过来的消息并执行任务,所以,咱们修改成Work.java:app

1 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 2   String message = new String(delivery.getBody(), "UTF-8");
 3 
 4   System.out.println(" [x] Received '" + message + "'");
 5   try {
 6     doWork(message);
 7   } finally {
 8     System.out.println(" [x] Done");
 9   }
10 };
11 boolean autoAck = true; // acknowledgment is covered below
12 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

  咱们模拟执行过程当中耗时的伪任务:异步

1 private static void doWork(String task) throws InterruptedException {
2     for (char ch: task.toCharArray()) {
3         if (ch == '.') Thread.sleep(1000);
4     }
5 }

  此时完整的Work.java为:性能

1 public class Worker {
 2     private final static String TASK_QUEUE_NAME = "hello";
 3 
 4     public static void main(String[] args) throws Exception {
 5 
 6         ConnectionFactory connectionFactory = new ConnectionFactory();
 7         connectionFactory.setHost("HOST");
 8 
 9         Connection connection = connectionFactory.newConnection();
10         Channel channel = connection.createChannel();
11         channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
12 
13         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
14             String message = new String(delivery.getBody(), "UTF-8");
15 
16             System.out.println(" [x] Received '" + message + "'");
17             try {
18                 doWork(message);
19             } catch (InterruptedException e) {
20                 e.printStackTrace();
21             } finally {
22                 System.out.println(" [x] Done");
23             }
24         };
25 
26         boolean autoAck = true; // acknowledgment is covered below
27         channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
28     }
29 
30     private static void doWork(String task) throws InterruptedException {
31         for (char ch: task.toCharArray()) {
32             if (ch == '.') Thread.sleep(1000);
33         }
34     }
35 }

二、循环调度

  使用工做队列的优势之一是可以轻松地进行并行化操做。假设咱们在作一个后台日志收集系统,咱们能够很容易地增长更多的Worker从而提升系统性能。测试

  首先,咱们同时启动两个Worker,一样地,我这里也放到IDEA中启动:fetch

  

  接下来,咱们前后启动5个Task,并分别经过main()参数传入五个字符串消息:

1 First message.
2 Second message..
3 Third message...
4 Fourth message....
5 Fifth message.....

  

   执行五个发送任务以后,来看一下两个Worker都接收到了什么样的消息:

  

        

  默认状况下,RabbitMQ将按顺序将每一个消息发送给下一个使用者。平均每一个消费者将获得相同数量的消息。这种消息的调度方式称之为循环调度,你能够开启更多的Worker来进行测试。

三、消息回执

  由于消费者执行一个任务会有时间耗时,假设一个消费者在执行一个任务执行一半的时候挂掉了将会怎样?消息会不会所以丢失?在咱们目前的代码里,一旦RabbitMq将一条消息转发给了一个消费者后,将会当即将消息删除(注意Worker.java里的autoAck),所以,在咱们上面例子里,如kill掉一个正在处理数据的Worker,那么该数据将会丢失。不只如此,全部那些指派给该Worker的还未处理的消息也会丢失。

  但在实际工做的,咱们并不但愿一个Worker挂掉以后就会丢失数据,咱们但愿的是:若是该Worker挂掉了,全部转发给该Worker的消息将会从新转发给其余Worker进行处理(包括处理了一半的消息)。为了确保一条消息永不丢失,RabbitMq支持消息回执。消费者在接收到一条消息,而且成功处理完成以后会给RabbitMq回发一条确认ack确认消息,RabbitMq此时才会删除该条消息。

  若是一个Worker正在处理一条消息时挂掉了(信道关闭、链接关闭、TCP链接丢失),它将没有机会发送ack回执,RabbitMq就认为该消息没有消费成功,因而便会将该消息从新放到队列中,若是此时有其余消费者仍是在线状态,RabbitMq会当即将该条消息再转发给其余在线的消费者。这种机制能够保证任何消息都不会丢失。

  默认状况下,须要手动进行消息确认,在前面的例子里,咱们经过autoAck=true显示地关闭了手动消息确认,所以,RabbitMq将采用自动消息确认的机制。如今,咱们修改咱们的程序,采用手动发送回执的方式,当咱们完成对消息的处理后,再手动发送回执确认:

1 channel.basicQos(1); // accept only one unack-ed message at a time (see below)
 2 
 3 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
 4   String message = new String(delivery.getBody(), "UTF-8");
 5 
 6   System.out.println(" [x] Received '" + message + "'");
 7   try {
 8     doWork(message);
 9   } finally {
10     System.out.println(" [x] Done");
11     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
12   }
13 };
14 boolean autoAck = false;
15 channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

  ack发送信道必须和接收消息的信道(channel)是同一个,若是尝试经过一个不一样的信道发送ack回执,将会抛出channel等级协议异常(官网说会抛出异常,可是我在实际测试中并无抛异常,只是该条消息得不到回执,从而也没法删除)。

  一个常见的错误是忘了手动回执,虽然只是一个简单的错误,可是带来的后果倒是严重的,它将致使已经消费掉的消费不会被删除,而且当消费该消息的消费者在退出以后,RabbitMq会将该条消息从新进行转发,内存将被慢慢耗尽。咱们能够经过正面的命令来检查这种错误:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

  该命令有三列内容,第一列是在监听的队列名称,第二列是Ready状态的消息数量,第三列是Unacked的消息数量。

四、消息的持久化

  在3中咱们讲解了如何保证当消费者挂掉以后消息不被丢失,可是,若是RabbitMq服务或者部署RabbitMq的服务器挂掉了以后,消息仍然会丢失。当RabbitMq崩溃以后,它将会忘记全部的队列和消息,除非,有什么机制让RabbitMq将队列信息和消息保存下来。

  要确保消息和队列不会丢失,咱们必需要确保两件事情。

  首先,咱们要确保RabbitMq永远不丢失队列,要作到这点,咱们在定义的时候就须要告诉RabbitMq它是须要持久化的,经过指定durable参数实现:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

  虽然这个命令自己是正确的,可是在咱们目前它不能工做。由于咱们前面已经定义了一个非持久化的hello队列,RabbitMq不容许从新定义一个已经存在的队列(用不一样的参数),不然会抛出异常:

Exception in thread "main" java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:962)
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333)
    at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:23)
    Suppressed: com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, 
reply-text=PRECONDITION_FAILED - parameters for queue 'hello' in vhost '/' not equivalent, class-id=50, method-id=10)
        at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:396)
        at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:292)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:607)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:541)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:534)
        at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68)
        at myblog.myblog.java8.methodreference.rabbitmq.workqueue.NewTask.main(NewTask.java:29)

  要么重启RabbitMq让该临时队列消失,要么在控制台将该队列删除,或者从新建立一个新的队列:

1 boolean durable = true;
2 channel.queueDeclare("task_queue", durable, false, false, null);

  生产者和消费者要作同步修改。

  上面这一步,咱们保证了队列(task_quee)的持久化,此时,即使RabbitMq崩溃了也不会丢失该队列,当RabbitMq重启后将自动从新加载该队列。

  其次,咱们须要确保咱们的消息也被持久化,要作到这一点,在生产者发布消息的时候须要指定消息的属性为:PERSISTENT_TEXT_PLAIN。

1 import com.rabbitmq.client.MessageProperties;
2 
3 channel.basicPublish("", "task_queue",
4             MessageProperties.PERSISTENT_TEXT_PLAIN,
5             message.getBytes());

  注意,即使设置了消息的持久化属性也不能保证消息会被100%地写入到磁盘中,由于RabbitMq在接收到消息和写入到磁盘不是同步的,有可能消息只是被写入到缓存中而还没来和及写入磁盘的时候,RabbitMq崩溃了,此时也会丢失消息。但不管如何,比前面简单的消息队列已经强大了不少。

五、公平调度

  您可能已经注意到,任务调度仍然不能彻底按照咱们但愿的方式工做。举个例子,在只有两个Worker的环境中,奇数的消息比较重,偶数的消息比较轻时,一个Worker将会一直处于忙碌状态,而另外一个Worker将会一直处于空闲状态,但RabbitMq并不知道这种状况,它会依然均衡地向两个Worker传递消息。

  发生这种状况是由于,当一个消息进入队列以后,RabbitMq只是盲目地将该第n个消息转发给第n个消费者,它并不关注每一个消费者发了多少个回执。

  为了解决这个问题,咱们能够经过调用basicQos方法,给它传入1。这将告诉RabbitMq不要同时给一个队列转发多于1条的消息,换句话说,在一个消费者没有完成并回执前一条消息时,不要再给它转发其余消息。

1 int prefetchCount = 1;
2 channel.basicQos(prefetchCount);

六、完整的代码

  1、NewTask.java

1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 import com.rabbitmq.client.MessageProperties;
 5 
 6 public class NewTask {
 7 
 8   private static final String TASK_QUEUE_NAME = "task_queue";
 9 
10   public static void main(String[] argv) throws Exception {
11     ConnectionFactory factory = new ConnectionFactory();
12     factory.setHost("localhost");
13     try (Connection connection = factory.newConnection();
14          Channel channel = connection.createChannel()) {
15         channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
16 
17         String message = String.join(" ", argv);
18 
19         channel.basicPublish("", TASK_QUEUE_NAME,
20                 MessageProperties.PERSISTENT_TEXT_PLAIN,
21                 message.getBytes("UTF-8"));
22         System.out.println(" [x] Sent '" + message + "'");
23     }
24   }
25 
26 }

2、Worker.java

1 import com.rabbitmq.client.Channel;
 2 import com.rabbitmq.client.Connection;
 3 import com.rabbitmq.client.ConnectionFactory;
 4 import com.rabbitmq.client.DeliverCallback;
 5 
 6 public class Worker {
 7 
 8   private static final String TASK_QUEUE_NAME = "task_queue";
 9 
10   public static void main(String[] argv) throws Exception {
11     ConnectionFactory factory = new ConnectionFactory();
12     factory.setHost("localhost");
13     final Connection connection = factory.newConnection();
14     final Channel channel = connection.createChannel();
15 
16     channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
17     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
18 
19     channel.basicQos(1);
20 
21     DeliverCallback deliverCallback = (consumerTag, delivery) -> {
22         String message = new String(delivery.getBody(), "UTF-8");
23 
24         System.out.println(" [x] Received '" + message + "'");
25         try {
26             doWork(message);
27         } finally {
28             System.out.println(" [x] Done");
29             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
30         }
31     };
32     channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
33   }
34 
35   private static void doWork(String task) {
36     for (char ch : task.toCharArray()) {
37         if (ch == '.') {
38             try {
39                 Thread.sleep(1000);
40             } catch (InterruptedException _ignored) {
41                 Thread.currentThread().interrupt();
42             }
43         }
44     }
45   }
46 }

点关注,不迷路,这是一个程序员都想要关注的二维码

相关文章
相关标签/搜索