在上篇揭开RabbitMQ的神秘面纱一文中,咱们编写了程序来发送和接收来自命名队列的消息。html
本篇咱们将建立一个工做队列,工做队列背后的假设是每一个任务都交付给一个工做者java
本篇是译文,英文原文请移步:http://www.rabbitmq.com/tutorials/tutorial-two-java.html算法
前提:本教程假定RabbitMQ 已在标准端口(15672)上的localhost上安装并运行。若是您使用不一样的主机,端口或凭据,则须要调整链接设置。数组
工做队列(又称:任务队列)背后的主要思想是避免当即执行资源密集型任务,而且必须等待它完成。缓存
相反,咱们安排任务稍后完成。咱们将任务封装 为消息并将其发送到队列。在后台运行的工做进程将弹出任务并最终执行做业。当您运行许多工做程序时,它们之间将共享任务。bash
这个概念在Web应用程序中特别有用,由于在短的HTTP请求窗口中没法处理复杂的任务。服务器
如何理解上面这段话呢?app
咱们能够举个例子,假设用户有多个文件上传请求,然而Web应用对文件上传进行处理每每是一件比较耗时的操做,是没法马上立刻响应返回给客户端结果,这时候咱们就须要一个工做队列来处理。ide
再好比生活中的买票,检票,你们都知道,当咱们买票检票,大多须要排队一个一个处理,二者相似。函数
在上篇中咱们发送了一个Hello World 信息,如今咱们将发送复杂任务的字符串。
可是咱们没有实际的应用场景,因此咱们这里暂时使用 Thread.sleep() 函数来模拟PDF 文件上传实现延迟效果。
咱们建立一个生产者(产生消息,发送消息的一方)发布新的任务,文件名称叫作NewTask.java:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /*** * 生产者 * ********/ public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { //建立和消息队列的链接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //第二个参数为true 确保关闭RabbitMQ服务器时执行持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); //从命令行发送任意消息 String message = getMessage(argv); //将消息标记为持久性 - 经过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN。 channel.basicPublish("", TASK_QUEUE_NAME,//指定消息队列的名称 MessageProperties.PERSISTENT_TEXT_PLAIN,//指定消息持久化 message.getBytes("UTF-8"));//指定消息的字符编码 //打印生产者发送成功的消息 System.out.println(" [x] Sent '" + message + "'"); //关闭资源 channel.close(); connection.close(); } /*** * 一些帮助从命令行参数获取消息 * @param strings 从命令行发送任意消息字符串 * */ private static String getMessage(String[] strings) { if (strings.length < 1) return "Hello World!"; return joinStrings(strings," "); } /** * 字符串数组 * @param delimiter 分隔符 * */ private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
接下来咱们建立咱们的消费者(工做者),它须要为消息体中的每一个点伪造一秒钟的工做。它将处理传递的消息并执行任务。
Worker.java
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { //和消息队列建立链接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //指定消息队列的名称 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //在处理并确认前一个消息以前,不要向工做人员发送新消息。相反,它会将它发送给下一个仍然不忙的工人 int prefetchCount = 1 ; channel.basicQos(prefetchCount); //channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; //boolean autoAck = false; //channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); } /***** * 咱们的假任务是模拟执行时间 * */ private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
使用任务队列的一个优势是可以轻松地并行工做。若是咱们正在积压工做积压,咱们能够添加更多工做者,这样就能够轻松扩展。
首先,让咱们尝试同时让两个工做者工做。他们都会从队列中获取消息,但究竟如何呢?让咱们来看看。
咱们选中Worker.java 右键,Run as -----> Java Application ,执行三次,启动三个实例。
这样一来就至关于有了三个工做者(消费者),
而后咱们同理开始尝试屡次运行生产者,NewTask.java,这样将产生多个任务
而后咱们能够清楚在控制台看到这样的状况,
第一个Work.java
第二个Work.java
第三个work.java
Tips: 上面是worker.java 运行了三次,newTask 运行了四次。
也就是说任务有四个,按照循环调度算法,第一个循环到第二个循环,因此有了两个消息,而其余消息有了一个消息。
第一个work.java 的控制台收到了两条消息后继续等待收消息
第二个work.java 的控制台收到了一条消息后也继续等待接受消息
第三个work.java 的控制台也收到了 一条消息后继续等待接受消息
默认状况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。
平均而言,每一个消费者将得到相同数量的消息。这种分发消息的方式称为循环法。
Tips: 一个圆圈待表收到的一个消息,一个矩形表明一个Worker 实例
虽然执行任务可能须要几秒钟,可是可能咱们会好奇想知道若是其中一个消费者开始执行长任务而且仅在部分完成时死亡会发生什么。
使用咱们当前的代码,一旦RabbitMQ向客户发送消息,它当即将其标记为删除。
在这种状况下,若是你直接关闭一个worker 实例,咱们将丢失它刚刚处理的消息。咱们还将丢失分发给这个特定工做者但还没有处理的全部消息。
但咱们不想失去任何任务。若是一个worker 工做者实例死亡,咱们但愿将任务交付给另外一名工做者Worker。
为了确保消息永不丢失,RabbitMQ支持 message acknowledgments. (消息确认)。
消费者(工做者)发回一个 ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ能够自由删除它。
若是工做者死亡(其通道关闭,链接关闭或TCP链接丢失)而不发送确认,RabbitMQ将理解消息未彻底处理并将从新排队。
若是其余消费者同时在线,则会迅速将其从新发送给其余消费者。这样你就能够确保没有消息丢失,即便Worker偶尔会死亡。
没有任何消息超时; 当消费者死亡时,RabbitMQ将从新发送消息。即便处理消息须要很是长的时间,也不要紧。
默认状况下, Manual message acknowledgments 手动消息已打开。
在前面的示例中,第二个参数咱们经过autoAck = true 标志明确地将它们关闭。
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
Tips: true 即autoAck 的值
一旦咱们完成任务,第二个参数就应该将此标志设置为false并从工做人员发送适当的确认。
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
使用此代码,咱们能够肯定即便您在处理消息时使用CTRL + C杀死一名Worker 实例,也不会丢失任何内容。
由于Worker死后不久,全部未经确认的消息将被从新传递。
确认必须在收到的交付的同一信道上发送。尝试使用不一样的通道进行确认将致使通道级协议异常. 详情看 doc guide on confirmations 了解更多。
错过basicAck是一个常见的错误。这是一个简单的错误,但后果是严重的。(也就是这句话若是忘了写,后果很严重,该消息将一直发送不出去)
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
当您的客户端退出时,消息将被从新传递(这可能看起来像随机从新传递),但RabbitMQ将会占用愈来愈多的内存,由于它没法释听任何未经处理的消息。
为了调试这种错误,您可使用rabbitmqctl 来打印messages_unacknowledged字段
Linux 下:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Windows 下:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
咱们已经学会了如何确保即便消费者死亡,任务也不会丢失。可是若是RabbitMQ服务器中止,咱们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非你告诉它不要。确保消息不会丢失须要作两件事:咱们须要将队列和消息都标记为持久。
首先,咱们须要确保RabbitMQ永远不会丢失咱们的队列。为此,咱们须要声明它是持久的:
boolean durable = true ; channel.queueDeclare(“hello”,durable,false,false,null);
虽然此命令自己是正确的,但它在咱们当前的设置中不起做用。
那是由于咱们已经定义了一个名为hello的队列 ,这个队列不耐用。
RabbitMQ不容许您使用不一样的参数从新定义现有队列,并将向尝试执行此操做的任何程序返回错误。
可是有一个快速的解决方法 - 让咱们声明一个具备不一样名称的队列,例如task_queue:
boolean durable = true ; channel.queueDeclare(“task_queue”,durable,false,false,null);
此queueDeclare更改须要应用于生产者和消费者代码。
此时咱们确信即便RabbitMQ从新启动,task_queue队列也不会丢失。
如今咱们须要将消息标记为持久性 - 经过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN。
Note on message persistence
将消息标记为持久性并不能彻底保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,可是当RabbitMQ接受消息而且还没有保存消息时,仍然有一个短期窗口。此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。持久性保证不强,但对于咱们简单的任务队列来讲已经足够了。若是您须要更强的保证,那么您可使用 发布者确认。
您可能已经注意到调度仍然没法彻底按照咱们的意愿运行。
例如,在有两个工人的状况下,当全部奇怪的消息都很重,甚至消息很轻时,一个工人将常常忙碌而另外一个工做人员几乎不会作任何工做。
好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。
发生这种状况是由于RabbitMQ只是在消息进入队列时调度消息。
它不会查看消费者未确认消息的数量。它只是盲目地向第n个消费者发送每一个第n个消息。
为了战胜咱们可使用basicQos方法和 prefetchCount = 1设置。这告诉RabbitMQ不要一次向一个worker发送一条消息。或者,换句话说,在处理并确认前一个消息以前,不要向工做人员发送新消息。相反,它会将它发送给下一个仍然不忙的工人。
int prefetchCount = 1 ; channel.basicQos(prefetchCount);
关于队列大小的说明
若是全部工做人员都很忙,您的队列就会填满。您将须要密切关注这一点,并可能添加更多工做人员,或者采起其余策略。
使用消息确认和prefetchCount,您能够设置工做队列。即便RabbitMQ从新启动,持久性选项也可使任务生效。
本篇完~