原文地址html
在第一章中,咱们写了经过一个queue来发送和接收message的简单程序。在这一章中,咱们会建立一个workqueue,来将执行时间敏感的任务分发到多个worker中。java
work模式主要的意图是要避免等待完成一个耗时的任务。取而代之地,咱们延迟任务的执行,将任务封装成消息,将之发送到queue。一个运行着的worker进程会弹出这个任务并执行它。当运行多个worker进程时,任务会在它们之间分派。linux
这种模式在web应用中特别有用,由于在一个较短的HTTP请求窗口中不会去执行一个复杂的任务。web
在上一章中,咱们发送了一个”Hello World!"的message。如今咱们将发送一个表明了复杂任务的字符串。这不是一个实际的任务,好比像调整图片大小或是从新渲染pdf文档,咱们通Thead.sleep() 来模拟一个耗时的任务。message中的小圆点表示其复杂度,圆点越多则任务的执行越耗时。好比“Hello..."的message将耗时3秒。windows
咱们简单的修改上一章的Send.java代码,容许在命令行发送任意message。新的类叫作NewTask.javabash
String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
一样的,咱们修改上一章中的Recv.java,让它在处理message的时候根据小圆点进行睡眠。新的类叫Worker.java服务器
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
像在第一章同样编译这两个类并发
javac -cp $CP NewTask.java Worker.java
使用Task模式的一个明显的优点是让并行执行任务变得简单。咱们只须要启动更多的worker就能够消减堆积的message,系统水平扩展简单。学习
首先,咱们在同一时间启动两个worker。他们都会从queue得到message,来看一下具体细节。fetch
打开了三个终端,两个是跑worker的。
java -cp $CP Worker
java -cp $CP Worker
第三个终端里来发布新的任务message。
java -cp $CP NewTask First message. java -cp $CP NewTask Second message.. java -cp $CP NewTask Third message... java -cp $CP NewTask Fourth message.... java -cp $CP NewTask Fifth message.....
让咱们看看worker的处理message的状况.第一个worker收到了第1,3,5message,第二个worker收到了第2,4个message。
默认状况下,RabbitMQ会顺序的将message发给下一个消费者。每一个消费者会获得平均数量的message。这种方式称之为round-robin(轮询).
执行任务须要必定的时间。你可能会好奇若是一个worker开始执行任务,可是中途异常退出,会是什么结果。在咱们如今的代码中,一旦RabbitMQ将消息发送出去了,它会当即将该message删除。这样的话,就可能丢失message。
在实际场景中,咱们不想丢失任何一个task。若是一个worker异常中断了,咱们但愿这个task能分派给另外一个worker。
为了确保不会丢失message,RabbitMQ采用message确认机制。RabbitMQ只有收到该message的Ack以后,才会删除该消息。
若是worker中断退出了( channel关闭了,connection关闭了,或是TCP链接丢失了)而没有发送Ack,RabbitMQ会认为该消息没有完整的执行,会将该消息从新入队。该消息会被发送给其余的worker。这样就不用message丢失,即便是在worker常常异常中断退出的场景下。
不会有任何message会timeout。当消费者中断退出,RabbitMQ会从新分派message。即便消息的执行会花费很长的时间。
默认状况下,message是须要人工确认的。在上面的例子中,咱们经过autoAck=true来关闭了人工确认。像下面这样,咱们将该标志设置为false,worker就须要在完成了任务以后,发送确认。
channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
上面的代码保证即便当worker还在处理一条消息,而强制它退出,也不会丢失message。而后不久,全部未被确认的消息都会被从新分派。
发送确认必须和接收相同的channel。使用不一样的channel进行确认会致使channel-level protocol 异常。
忘记确认消息是一个比较常见的错误,可是其后果是很严重的。当client退出时,message会被从新分派,可是RabbitMQ会占用愈来愈多的内存,因它没法释放那些未被确认的message。
能够经过rabbitmqctl来打印messages_unacknowledged:
##linux sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged ##windows rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
咱们学习了在消费者出现问题的时候不丢失message。可是若是RabbitMQ服务器宕机了,咱们仍是会丢失message。
当RabbitMQ宕机时,默认状况下,它会”忘记“全部的queue和message。为了确保message不丢失,咱们须要确认两件事情:咱们要使得queue和message都是持久的。
首先,咱们要确保RabbitMQ不会丢失咱们设置好的queue。因此,咱们要把它声明成持久的:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
虽然代码没有任何问题,可是光这样是无效的。由于咱们以前已经定义过名字为hello的queue。RabbitMQ不容许你使用不一样的参数去从新定义一个已经存在的queue,并且这还不会反悔任何错误信息。可是咱们仍是有别的方法,让咱们使用一个别的名字,好比task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
声明queue的改变要在生产者和消费者的代码里都进行修改。
接着咱们要设置message的持久性,咱们经过设置MessageProperties为PERSISTENT_TEXT_PLAIN:
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
将message标记成持久的不能100%保证message不会丢失,虽然这告诉RabbitMQ将message保存到磁盘,然而在RabbitMQ从接到message到保存之间,仍然有一小段时间。同时RabbitMQ不会给每一条message执行fsync(2) -- 可能只是保存到了cache而没有写到磁盘上去。因此持久的保证也不是很是强,而后对咱们简单的task queue来讲则足够了。若是须要一个很是强的保证,则可使用 发布确认的方式。
你可能已经注意到分派的工做没有如咱们所指望的来执行。好比在有2个worker的状况系,全部偶数的message耗时很长,而全部奇数的message则耗时很短,这样其中一个worker则一直被分派到偶数的message,而另外一个则一直是奇数的message。RabbitMQ对此并不知晓,进而继续这样分派着message。
这样的缘由是RabbitMQ是在message入queue的时候肯定分派的。它不关心消费者ack的状况。
咱们能够经过basicQos方法和prefetchCount(1)来解决这个问题。这个设置是让RabbitMQ给worker一次一个message。或者这么说,直到worker处理完以前的message并发送ack,才给worker下一个message。不然,Rabbitmq会将message发送给其它不忙的worker。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意queue的大小。若是全部的worker都处于忙碌状态,queue可能会被装满。必须监控queue深度,可能要开启更多的worker,或者采起其余的措施。
NewTask.java的最终版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
Worker.java的最终版本
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用message ack和prefetchCount,来设定work queue。持久化选项则在RabbitMQ重启后能让任务得以恢复。