RabbitMQ消息队列:任务分发机制

        咱们解决了从发送端(Producer)向接收端(Consumer)发送“Hello World”的问题。在实际的应用场景中,这是远远不够的。从本篇文章开始,咱们将结合更加实际的应用场景来说解更多的高级用法。java

        有时Consumer须要大量的运算时,RabbitMQ Server须要必定的分发机制来balance每一个Consumer的load。试想一下,对于web application来讲,在一个不少的HTTP request里是没有时间来处理复杂的运算的,只能经过后台的多个工做线程来完成,队列中的任务将会被工做线程共享执行,这样的概念在web应用这很是有用。接下来咱们分布讲解。 
web

   应用场景就是RabbitMQ Server会将queue的Message分发给不一样的Consumer以处理计算密集型的任务:
安全

 

1. 准备

       实际应用Consumer可能作的是计算密集型的工做,那就不能简单的字符串了。在现实应用中,Consumer有可能作的是一个图片的resize,或者是pdf文件的渲染或者内容提取。可是做为Demo,仍是用字符串模拟吧:经过字符串中的.的数量来决定计算的复杂度,每一个.都会消耗1s,即sleep(1)。并发

发送端:app

package com.zhy.rabbitMq._02_workqueue;学习

import java.io.IOException;测试

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;fetch

public class NewTask{
     //队列名称
     private final static String QUEUE_NAME = "queue2";spa

     public static void main(String[] args) throws IOException{
          //建立链接和频道
          ConnectionFactory factory = new ConnectionFactory();
          factory.setHost("localhost");
          Connection connection = factory.newConnection();
          Channel channel = connection.createChannel();
          //声明队列
          channel.queueDeclare(QUEUE_NAME, false, false, false, null);
          //发送10条消息,依次在消息后面附加1-10个点
          for (int i = 0; i < 10; i++){
               String dots = "";
               for (int j = 0; j <= i; j++){
                    dots += ".";
           }
           String message = "helloworld" + dots+dots.length();
           channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
           System.out.println(" [x] Sent '" + message + "'");
      }
      //关闭频道和资源
      channel.close();
      connection.close();.net

     }
}

接收端:

package com.zhy.rabbitMq._02_workqueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Work{
     //队列名称
     private final static String QUEUE_NAME = "workqueue";

     public static void main(String[] argv) throws java.io.IOException,
       java.lang.InterruptedException{
      //区分不一样工做进程的输出
      int hashCode = Work.class.hashCode();
      //建立链接和频道
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      //声明队列
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      System.out.println(hashCode
        + " [*] Waiting for messages. To exit press CTRL+C");
 
      QueueingConsumer consumer = new QueueingConsumer(channel);
      // 指定消费队列
      channel.basicConsume(QUEUE_NAME, true, consumer);
      while (true){
       QueueingConsumer.Delivery delivery = consumer.nextDelivery();
       String message = new String(delivery.getBody());

       System.out.println(hashCode + " [x] Received '" + message + "'");
       doWork(message);
       System.out.println(hashCode + " [x] Done");

      }

 }

 /**
  * 每一个点耗时1s
  * @param task
  * @throws InterruptedException
  */
 private static void doWork(String task) throws InterruptedException{
      for (char ch : task.toCharArray()){
           if (ch == '.')
            Thread.sleep(1000);
      }
     }
}

2. Round-robin dispatching 循环分发

        RabbitMQ的分发机制很是适合扩展,并且它是专门为并发程序设计的。若是如今load加剧,那么只须要建立更多的Consumer来进行任务处理便可。固然了,对于负载还要加大怎么办?我没有遇到过这种状况,那就能够建立多个virtual Host,细化不一样的通讯类别了。

     首先开启个Consumer,即运行两个工做者。

[x] Sent 'helloworld.1'
[x] Sent 'helloworld..2'
[x] Sent 'helloworld...3'
[x] Sent 'helloworld....4'

工做者1:
605645 [*] Waiting for messages. To exit press CTRL+C
605645 [x] Received 'helloworld.1'
605645 [x] Done
605645 [x] Received 'helloworld....3'
605645 [x] Done

工做者2:
18019860 [*] Waiting for messages. To exit press CTRL+C
18019860 [x] Received 'helloworld..2'
18019860 [x] Done
18019860 [x] Received 'helloworld.....4'
18019860 [x] Done

能够看到,默认的,RabbitMQ会一个一个的发送信息给下一个消费者(consumer),而无论每一个任务的时长等等,且是一次性分配,并不是一个一个分配。平均的每一个消费者将会得到相等数量的消息。这样分发消息的方式叫作round-robin。中种分发还有问题,接着了解吧!

 

3. Message acknowledgment 消息确认

每一个Consumer可能须要一段时间才能处理完收到的数据。你可能担忧一个工做者(Consumer)在这个过程当中出错了,异常退出了,而数据尚未处理完成,那么很是不幸这段数据就丢失了。由于咱们采用no-ack的方式进行确认,一旦RabbitMQ交付了一个消息给消费者,会立刻从内存中移除这条信息。也就是说,每次Consumer接到数据后,而不论是否处理完成,RabbitMQ Server会当即把这个Message标记为完成,而后从queue中删除了。

     上述问题是很是严重的,可是若是一个Consumer异常退出了,它处理的数据可以被另外的Consumer处理,这样数据在这种状况下就不会丢失了(注意是这种状况下)。

      为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不只仅是被Consumer收到,那么咱们不能采用no-ack。而应该是在处理完数据后发送ack。

    在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ能够去安全的删除它了。

    若是Consumer退出了可是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的状况下数据也不会丢失。

    这里并无用到超时机制。RabbitMQ仅仅经过Consumer的链接中断来确认该Message并无被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来作数据处理。

    默认状况下,消息确认是打开的(enabled)。上面代码中咱们经过autoAsk= True 关闭了ack。从新修改一下callback,以在消息处理完成后发送ack:

boolean ack = false ; //打开应答机制  

channel.basicConsume(QUEUE_NAME, ack, consumer);  

//另外须要在每次处理完成一个消息后,手动发送一次应答。  

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

这样即便你经过Ctr-C中断了Consumer,那么Message也不会丢失了,它会被分发到下一个Consumer。

      若是忘记了ack,那么后果很严重。当Consumer退出时,Message会从新分发。而后RabbitMQ会占用愈来愈多的内存,因为RabbitMQ会长时间运行,所以这个“内存泄漏”是致命的。去调试这种错误,能够经过一下命令打印

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

4. Message durability消息持久化

        咱们学习了消费者被杀死,Message也不会丢失。可是若是RabbitMQ Server退出呢?软件都有bug,即便RabbitMQ Server是完美毫无bug的,它仍是有可能退出的:被其它软件影响,或者系统重启了,系统panic了。。。

    为了保证在RabbitMQ退出或者crash了数据仍没有丢失,须要将queue和Message都要持久化。

queue的持久化须要在声明时指定durable=True:

第一, 咱们须要确认RabbitMQ永远不会丢失咱们的队列。为了这样,咱们须要声明它为持久化的。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
注:RabbitMQ不容许使用不一样的参数从新定义一个队列,因此已经存在的队列,咱们没法修改其属性。
第二, 咱们须要标识咱们的信息为持久化的。经过设置MessageProperties(implements BasicProperties)值为PERSISTENT_TEXT_PLAIN。
channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
如今你能够执行一个发送消息的程序,而后关闭服务,再从新启动服务,运行消费者程序作下实验。

5. Fair dispatch 公平分发

    你可能也注意到了,分发机制不是那么优雅。默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。固然n是取余后的。它无论Consumer是否还有unacked Message,只是按照这个默认机制进行分发。

   那么若是有个Consumer工做比较重,那么就会致使有的Consumer基本没事可作,有的Consumer倒是毫无休息的机会。那么,RabbitMQ是如何处理这种问题呢?

  经过 basic.qos 方法设置prefetch_count=1 。这样RabbitMQ就会使得每一个Consumer在同一个时间点最多处理一个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。 设置方法以下:

int prefetchCount = 1;  

channel.basicQos(prefetchCount);  

测试:改变发送消息的代码,将消息末尾点数改成3-2个,而后首先开启两个工做者,接着发送消息:
[x] Sent 'helloworld...3'
[x] Sent 'helloworld..2'

工做者1:
18019860 [*] Waiting for messages. To exit press CTRL+C
18019860 [x] Received 'helloworld...3'
18019860 [x] Done

工做者2:
31054905 [*] Waiting for messages. To exit press CTRL+C
31054905 [x] Received 'helloworld..2'
31054905 [x] Done

能够看出此时并无按照以前的Round-robin机制进行转发消息,而是当消费者不忙时进行转发。且这种模式下支持动态增长消费者,由于消息并无发送出去,动态增长了消费者立刻投入工做。而默认的转发机制会形成,即便动态增长了消费者,此时的消息已经分配完毕,没法当即加入工做,即便有不少未完成的任务。

6. 最终版本

发送端:

package com.zhy.rabbitMq._02_workqueue;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask
{
 // 队列名称
 private final static String QUEUE_NAME = "workqueue_persistence";

 public static void main(String[] args) throws IOException
 {
  // 建立链接和频道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  // 声明队列
  boolean durable = true;// 一、设置队列持久化
  channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
  // 发送10条消息,依次在消息后面附加1-10个点
  for (int i = 5; i > 0; i--)
  {
   String dots = "";
   for (int j = 0; j <= i; j++)
   {
    dots += ".";
   }
   String message = "helloworld" + dots + dots.length();
   // MessageProperties 二、设置消息持久化
   channel.basicPublish("", QUEUE_NAME,
     MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
   System.out.println(" [x] Sent '" + message + "'");
  }
  // 关闭频道和资源
  channel.close();
  connection.close();

 }

}

接收端:

package com.zhy.rabbitMq._02_workqueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Work
{
 // 队列名称
 private final static String QUEUE_NAME = "workqueue_persistence";

 public static void main(String[] argv) throws java.io.IOException,
   java.lang.InterruptedException
 {
  // 区分不一样工做进程的输出
  int hashCode = Work.class.hashCode();
  // 建立链接和频道
  ConnectionFactory factory = new ConnectionFactory();
  factory.setHost("localhost");
  Connection connection = factory.newConnection();
  Channel channel = connection.createChannel();
  // 声明队列
  boolean durable = true;
  channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
  System.out.println(hashCode
    + " [*] Waiting for messages. To exit press CTRL+C");
  //设置最大服务转发消息数量
  int prefetchCount = 1;
  channel.basicQos(prefetchCount);
  QueueingConsumer consumer = new QueueingConsumer(channel);
  // 指定消费队列
  boolean ack = false; // 打开应答机制
  channel.basicConsume(QUEUE_NAME, ack, consumer);
  while (true)
  {
   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
   String message = new String(delivery.getBody());

   System.out.println(hashCode + " [x] Received '" + message + "'");
   doWork(message);
   System.out.println(hashCode + " [x] Done");
   //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

  }

 }

 /**
  * 每一个点耗时1s
  *
  * @param task
  * @throws InterruptedException  */ private static void doWork(String task) throws InterruptedException {  for (char ch : task.toCharArray())  {   if (ch == '.')    Thread.sleep(1000);  } }}

相关文章
相关标签/搜索