在第一个教程里面,咱们写了一个程序从一个有名字的队列中发送和接收消息,在这里咱们将要建立一个分发耗时任务给多个worker的任务队列。
html
任务队列核心思想就是避免执行一个资源密集型的任务,而程序要等待其执行完毕才能进行下一步的任务。相反地咱们让任务延迟执行,咱们封装一个task做为消息,并把它发送至队列,在后台运行的工做进程将弹出的任务,并最终执行做业。当运行多个worker的时候,task将在他们之间共享。java
在前一节中咱们发送一个包含“HelloWorld!”的消息,如今咱们发送字符串表明一个复杂的任务,咱们没有一个真实的任务,好比格式化图片大小等等,因此咱们使用Thread.sleep()表明一个执行时间较长的任务,这里咱们使用几个点来表明任务的复杂度,每个点表明任务执行一秒的时间,好比hello...就表明执行了3秒。
咱们稍微改变一个上一节中的Send.java,容许从命令行发送任意的消息,程序将从咱们的工做队列中执行任务,因此命名为NewTask.java:git
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");
如下是帮助从命令行参数获取消息体的代码:github
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
上一节中的Rece.java也须要少量改变:须要伪造一个根据点来执行多少秒的任务。它将处理传送过来的消息,而且执行任务,命名为Worker.java:shell
final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } } }; channel.basicConsume(TASK_QUEUE_NAME, true, consumer);
模拟执行时间的任务:缓存
private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
编译:服务器
$ javac -cp rabbitmq-client.jar NewTask.java Worker.java
使用任务队列的优势之一就是很容并行化一个work,若是咱们产生了工做积压,咱们能够很简单的增长worker的数量,来解决问题。
首先,让咱们尝试在同一时间运行两个工人实例。他们都将在队列中获得消息,但究竟如何?让咱们来看看。
您须要三个控制台打开。两个将运行辅助程序。这些控制台将是咱们的两名消费者 - C1和C2。app
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C
在第三个,咱们将发布新的任务。一旦你开始运行消费者就能够发布几条消息:ide
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask First message. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Second message.. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Third message... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fourth message.... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fifth message.....
咱们来看看它是怎样将任务非配给咱们的worker的:fetch
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
默认状况下,RabbitMQ将发送每个在序列中的消息到下一个消费者,平均而言,每个消费者将得到相同数量的消息,发布这种消息的方式叫循环调度。
作一个任务须要几秒钟,那么当一个消费者执行任务到一半的时候挂了怎么办?在咱们的当前代码里面,一旦消息传送给咱们的消费者,消息就从存储中删除了。在这种状况下,若是Kill了一个worker,咱们不只仅失去了它正在执行的消息任务,并且咱们将失去全部分配给它,可是还没执行的消任务。
可是咱们不想丢失任何消息,若是一个worker挂掉,咱们将分配这些任务给其余的消费者。
为了确保消息不会丢失,RabbitMQ支持消息确认。一个ACK(nowledgement)从消费者发送给RabbitMQ一个消息确认当前消息已被接收和处理,RabbitMQ可自由将其删除。
若是消费者死亡(其信道被关闭,关闭链接,或TCP链接丢失),而不发送ACK,RabbitMQ知道消息并无被接收和执行彻底,将从新将它放入队列。若是同一时间存在其余在线的消费者,它将迅速从新传递消息给另外一个消费者。这样,你能够确定没有消息丢失,即便偶尔的消费者死亡。
目前没有任何消息超时,当消费者挂掉的时候,RabbitMQ将从新传递消息,即便处理一个消息须要很长很长的时间也不要紧。
消息确认默认状况下开启。在前面的例子中,咱们明确地经过AUTOACK = true标志将它们关闭。如今是时候删除此标志,一旦咱们与任务完成,将从worker发送适当的确认。
channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } };
使用此代码,咱们能够确定,即便你使用CTRL + C,杀死一个worker,什么都不会丢失。worker死亡后不久,全部未确认的消息会被从新传递。
被遗忘的确认
忘记baseACK是一个常见的错误,这是个简单的错误,可是后果是很严重的。当你的客户端退出的时候(可能看起来就像是随机交还)消息将被从新传递,但RabbitMQ会消耗的愈来愈多的内存,它将没法释听任何unacked的消息。
为了调试这种错误,你可使用rabbitmqctl打印messages_unacknowledged字段。
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
咱们已经学会了如何确保即便消费者死亡,任务也不会丢失。可是若是RabbitMQ的服务器中止,咱们的任务仍然会丢失。
当RabbitMQ的退出或崩溃时,除非你告诉它不要忘记的队列和消息。两件事情都须要确保,消息才不会丢失:咱们须要将队列和消息持久化。
首先,咱们须要确保的RabbitMQ永远不会失去咱们的队列。为了作到这一点,咱们须要把它声明为持久:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
虽然以上的代码自己是对的,可是它不会对咱们的目前设置起做用。这是由于咱们已经定义了一个名为hello的不持久的队列,RabbitMQ不容许你使用不一样的参数从新定义现有队列,并会返回一个错误。可是有一个快速的解决
办法 - 让咱们与声明不一样名称的队列,例如task_queue:
boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);
queueDeclare变化须要被施加到生产者和消费者代码二者。
在这一点上咱们确保即便RabbitMQ的重启task_queue队列也不会被丢失。如今,咱们须要咱们的消息标记为持久性 - 经过设置MessageProperties(实现BasicProperties)的值PERSISTENT_TEXT_PLAIN。
import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
注意消息持久性
将消息标记为持久性并不能彻底保证信息不会丢失。虽然RabbitMQ消息将保存到磁盘,可是有很短的时间内RabbitMQ已经接受了消息,可是还没来得及保存它。此外,RabbitMQ没有为每条消息作FSYNC(2) - 它可能只是保存到缓存,并无真正写入磁盘。持久性的保证不强,可是对于咱们简单的任务队列仍是绰绰有余的。若是你须要一个更强有力的保证,那么你可使用publisher confirms。
你可能已经注意到,调度仍然没有彻底按照咱们真正想要的工做。举个例子,好比有两个消费者的状况,当奇数的消息很是重,可是偶数的消息很是轻的时候,一个消费者将被累死,而另外一个却闲着。RabbitMQ殊不知道,仍然在均匀的给每一个消费者发送消息。
这种状况发生是由于RabbitMQ只负责分发进入到队列的消息,它不看为消费者未确认的消息的数量。它只是盲目分派每第n个消息给第n消费者。
为了杜绝那种状况,咱们可使用basicQos方法与prefetchCount = 1设置。它告诉RabbitMQ不要把多个消息在同一时间给一个消费者。或者,换句话说,只有消费者处理而且确认前一个消息以后才会给它分配下一个消息,相反,消息将被非配给下一个不处于忙碌的消费者。
int prefetchCount = 1; channel.basicQos(prefetchCount);
注意队列大小
若是全部的worker都在忙,你的队列也填满了。您将要留意的是,也许添加更多的worker,或者有一些其余的策略。
NewTask.java
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
Worker.java
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(TASK_QUEUE_NAME, false, consumer); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
原文地址:RabbitMQ之Work Queues
代码地址:https://github.com/aheizi/hi-mq
相关:
1.RabbitMQ之HelloWorld
2.RabbitMQ之任务队列
3.RabbitMQ之发布订阅
4.RabbitMQ之路由(Routing)
5.RabbitMQ之主题(Topic)
6.RabbitMQ之远程过程调用(RPC)