转载RabbitMQ入门(2)--工做队列

工做队列

(使用Java客户端)

java-two 在这第一指南部分,咱们写了经过同一命名的队列发送和接受消息。在这一部分,咱们将会建立一个工做队列,在多个工做者之间使用分布式时间任务。 工做队列(亦称:任务队列)背后主要的思想是避免当即处理一个资源密集型任务而且不得不一直等待完成。相反咱们能够计划着让任务后续执行。咱们将任务封装 成消息,发送到队列中。一个工做者进程在后台运行,获取任务并最终执行任务。当你运行多个工做者,全部的任务将会被他们所共享。java

在web应用程序中,这个理念是特别有用的,你没法在一个短暂的http请求中处理一个复杂的任务。python

准备

在先前的指南中,咱们发送了一个包含"Hello World!"消息。如今咱们将要发送一些字符串,用来表明复杂的任务。咱们没有一个真实的任务,好比图片的调整大小或者pdf文件渲染,因此咱们经过Thread.sleep()函数,假装一个咱们是很忙景象。咱们将会把字符串中点的数量来表明它的复杂度;每个点将要花费一秒的工做。例如,一个使用Hello...描述的假任务会发送三秒。git

咱们将会轻量的修改咱们之前例子中Send.java代码,使其容许任意的消息能够经过命令行发出。这个程序将要计划安排任务到咱们的工做队列中,因此咱们把它命名为NewTask.java:github

String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); 

一些帮助从命令行中获取消息参数:web

private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } 

咱们老的Recv.java程序也要求作些改变:它须要将消息体中每一个点假装成一秒。从队列中获取消息,运行任务,因此咱们将它称之为Worker.java:shell

while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); } 

咱们假装的任务中冒充执行时间:缓存

private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } 

在第一部分指南中那样编译它们(jar 文件须要再工做路径上):服务器

$ javac -cp rabbitmq-client.jar NewTask.java Worker.java 

循环分派

使用任务队列的优点之一是咱们是容易并行处理。若是咱们正在处理一些堆积的文件的话,咱们仅仅须要增长更多的工做者,经过这种方式咱们是容易扩展的。 首先,让咱们试着在同一时间运行两个工做者实例。他们都会从队列中获取消息,可是具体怎样作呢?让咱们一块儿来看一看。 你须要三个打开的控制平台,其中两个用来运行工做者程序。他们将会是咱们的两个消费者-C1和C2。app

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C 
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C 

在这第三个控制平台咱们用来发布新的任务。一旦你启动消费者,你就能够发布消息了:分布式

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask First message. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Second message.. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Third message... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fourth message.... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fifth message..... 

让咱们看看什么被投递到咱们工做者那里:

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....' 
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....' 

默认状况想,RabbitMQ将会把每个消息发送给下一个消费者。平均下来每一个消费者获取的消息数量是相同的。这种分布式消息方式被称为轮询。试试三个或更多的工做者。

消息确认

处理一个任务可能花费数秒时间,你可能会好奇若是一个消费者开始一个长任务,而且在处理完成部分的状况下就死掉了会发生什么状况。就咱们当前的代码 来讲,一旦RabbitMQ将消息传递给消费者,它就会当即将消息从内存中删除。在这种状况下,若是你杀掉一个正在处理的工做者你会丢失它正在处理的消 息。咱们也同时失去了已经分配给这个工做者而且没有开始处理的消息。 可是咱们不想丢失任何任务,若是一个工做者死掉,咱们指望将任务传递给另外一个工做者。 为了保证每个消息不会丢失,RabbitMQ支持消息确认机制。一个消息确认是由消费者发出,告诉RabbitMQ这个消息已经被接受,处理完 成,RabbitMQ 能够删除它了。 若是一个消费者没有发送确认信号,RabbitMQ将会认定这个消息没有彻底处理成功,将会把它传递给另外一个消费者。经过这种方式,即便工做者有时会死 掉,你依旧能够保证没有消息会被丢失。 这里不存在消息超时;RabbitMQ只会在工做者链接死掉才从新传递这个消息。即便一个消息要被处理很长很长时间,也不是问题。 消息确认机制默认状况下是开着的。在先前的例子中咱们是明确的将这个功能关闭no_ack=True。是时候移除这个标识了,一旦咱们完成一个任务,工做者须要发送一个确认信号。

QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //... channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } 

使用这段代码,咱们能够保证即便你将一个正在处理消息的工做者经过CTRL+C来终止它运行,依旧没有消息会丢失。稍后,工做者死亡后没有发送确认的消息会被从新传递。

忘掉确认

这是一个广泛的错误,就是忘记确认。这是一个很简单的错误,可是这后果是严重的。当你的客户端退出,消息会从新传递(看上去是随机传递的),RabbitMQ会愈来愈占用内存,由于它不会释放哪些没有发送确认的消息。

为了调试这种类型的错误,你可使用rabbitmqctl打印出messages_unacknowledged属性:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ... hello 0 0 ...done. 

消息持久化

咱们已经学习了如何在肯定消费者是否已经死掉,而且保证任务不被丢失。可是若是RabbitMQ服务器中止,咱们的任务依旧会丢失。

当RabbitMQ退出或者崩溃,它将会忘记这队列和消息,除非你告诉它不要这样作。两个事情须要作来保证消息不会丢失:咱们标记队列和消息持久化。

首先,咱们须要确保RabbitMQ不会丢失咱们的队列,为了这样作,咱们须要将它声明为持久化:

boolean durable = true; channel.queueDeclare("hello", durable, false, false, null); 

虽然这命令是正确的,但它不会当即在咱们的程序里运行。那是由于咱们已经定义了一个不持久化的hello队列。RabbitMQ不容许你使用不一样的参数从新定义一个存在的队列,若是你试着那样作它会返回一个错误。有个快速的变通方案-让咱们声明一个不一样名字的队列,好比task_queue:

boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null); 

这个queuqDeclare的改变须要应用在生产者和消费者的代码中。 在这点上,咱们能够保证即便RabbitMQ重启,task_queue队列也不会丢失。如今咱们须要标记消息持久化 - 经过设置MessageProperties(实现了BasicProperties)的值为PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); 

注意消息持久化 标记消息持久化不能彻底保证消息不会被丢失,虽然这样会告诉RabbitMQ保存消息到硬盘上。可是对于RabbitMQ依旧有个短暂的时间窗口对于接收 一个消息而且尚未完成保存。一样,RabbitMQ不能让每一个消息同步--它可能仅仅保存在缓存中,尚未真正的写入到硬盘中。这持久化的保证不是健壮 的,可是对咱们的简单的任务队列来讲是足够了。若是你须要更健壮的持久化保证,你可使用出版者确认。

公平分发

你可能注意到了,分发过程并无如咱们想的那样运做。例如,在某一种状况下有两个工做者,当全部奇数消息是不少的而且全部偶数的是少许的,一个工做者会一直忙碌下去,而另外一个则会几乎不作什么事情。好吧,RabbitMQ不会在乎那个事情,它会一直均匀的分发消息。 这种状况发生由于RabbitMQ仅仅分发消息到队列中。它不关心有多少消息没有由发送者发送确认信号。它仅仅盲目的将N个消息发送到N个消费者。 prefetch-count.png

为了解决这个问题,咱们可使用basicQos方法,设置prefetchCount=1。这样将会告知RabbitMQ不要同时给一个工做者超过一个任务,或者换句话说在一个工做者处理完成,发送确认以前不要给它分发一个新的消息。代替,把消息分发到下一个不繁忙的工做者。

int prefetchCount = 1; channel.basicQos(prefetchCount); 

注意队列大小

若是你的全部工做者是在忙碌,你的队列就会被填满。你将会想关注这件事,可能要添加更多的工做者,或者有些其余策略。

把它们放在一块儿

咱们的NewTask.java最终代码:

import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... } 

(NewTask.java source) 咱们的Worker.java代码:

import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done" ); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } //... } 

(Worker.java source) 使用消息确认和预读数量你能够创建一个工做队列。持久化选项使得RabbitMQ重启以后任务依旧存在。

想要了解更多关于通道方法和消息属性,你能够浏览javadocs online

如今咱们能够移到指南3了,学习怎么样将相同的消息传递给多个消费者

相关文章
相关标签/搜索