工做队列:把每一个任务只发送给一个工做者。 java
上一篇咱们是从一个指定的队列发送接收消息,在本文中,咱们将建立一个工做队列,用于在多个工做者之间分配耗时的任务。服务器
工做队列(即任务队列)背后的主要思想是避免当即执行资源密集型的任务,而且必须等待任务完成。相反,咱们把任务安排在之后作。咱们将任务封装为消息并将其发送到队列。在后台运行的worker进程将弹出任务并最终执行任务。当您运行多个worker时,这些任务将在它们之间共享。app
由于工做队列,是有多个工人从队列里面取得任务,咱们就须要考虑较多的问题,好比说,消息怎么分发,消息没有传递到位如何,消息传递过程当中,链接断开如何,消费会不会重复发给两个工人处理等等。下面咱们先学习一些跟工做队列相关的概念:ide
循环调度函数
使用任务队列的优势之一是可以轻松地并行工做。若是咱们正在创建一个积压的工做,咱们能够增长更多的工人,这样,规模就很容易。学习
RabbitMQ将按顺序将每一个消息发送给下一个使用者。平均来讲,每一个消费者都会收到相同数量的信息。这种分发消息的方式称为循环。fetch
消息确认ui
完成一项任务可能须要几秒钟。你可能会想,若是其中一个消费者开始了一项很长的任务,而且只完成了部分任务,会发生什么。使用咱们当前的代码,一旦RabbitMQ向客户传递消息,它当即标记为删除。在这种状况下,若是你杀了一个工人,咱们将失去它正在处理的信息。咱们还将丢失已经发送给此特定工做人员但还没有处理的全部消息。code
但咱们不想失去任何任务。若是一个工做者死了,咱们但愿把任务交给另外一个工做者。blog
为了确保消息不会丢失,RabbitMQ支持消息确认。使用者将一个ack(nowledgement)发回给RabbitMQ,告诉它已经接收、处理了一个特定的消息,而且RabbitMQ能够自由地删除它。
若是使用者在没有发送ack的状况下死亡(通道关闭、链接关闭或TCP链接丢失),RabbitMQ将理解消息没有被完整地处理,并将它从新排队。若是同时有其余消费者在线,它将很快从新交付给另外一个消费者。这样你就能够确保没有信息丢失,即便工人偶尔也会死亡。
消息的持久性
咱们已经学会了如何确保即便消费者死亡,任务也不会丢失。可是若是RabbitMQ服务器中止,咱们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样作。确保消息不丢失须要两件事情:咱们须要将队列和消息标记为持久的。
首先,咱们须要确保RabbitMQ永远不会丢失队列。为了作到这一点,咱们须要声明它是持久的.
消息的公平分发机制
为了不当消息分发后,有的工人很是忙,而有的很闲的问题,咱们可使用basicQos方法,并将prefetchCount = 1设置为。这告诉RabbitMQ不要一次向工做人员发送多个消息。或者,换句话说,在处理并确认以前的消息以前,不要向工做人员发送新的消息。相反,它将把它分派给下一个不太忙的员工。
上代码:
生产者:
package com.rabbitmq.HelloWorld; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class WorkQueueProduct { private final static String TASK_QUEUE_NAME = "task"; public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.185"); factory.setUsername("admin"); factory.setPassword("123456"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 设置队列为可持久性的(生产者和消费者都须要设置)注意:RabbitMQ不容许您从新定义具备不一样参数的现有队列 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String[] str = {"ans1","b","c","d","e","f","g","h","1","2","3","4","5"}; String message = getMessage(str); // 将消息设置为持久性,设置消息的其余属性为MessageProperties.PERSISTENT_TEXT_PLAIN,便可 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8")); System.out.println("[x]send:'"+message+"'"); channel.close(); connection.close(); } private static String getMessage(String[] strings) { if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
消费者:两个消费者代码相同
package com.rabbitmq.HelloWorld; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class WorkQueueConsumer1 { private static final String TASK_QUEUE_NAME = "task"; public static void main(String[] args) throws IOException, TimeoutException { // TODO Auto-generated method stub ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.185"); factory.setUsername("admin"); factory.setPassword("123456"); factory.setPort(5672); Connection connction = factory.newConnection(); Channel channel = connction.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 设置公平分发机制参数,设置为1后每次只发送一个消息,而且在没有发送确认消息以前不会再次发送消息 channel.basicQos(1); // 内置回调函数,处理消息 final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // TODO Auto-generated method stub String message = new String(body, "utf-8"); System.out.println(" [x] Received '" + message + "'"); try { dowork(message); } catch (Exception e) { // TODO: handle exception }finally{ System.out.println(" [x] Done"); // 任务处理完以后的消息确认 channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(TASK_QUEUE_NAME, false,consumer); } private static void dowork(String task) { // TODO Auto-generated method stub for(char c :task.toCharArray()){ if(c == '.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block Thread.currentThread().interrupt(); } } } } }