假设有这一些比较耗时的任务,按照上一次的那种方式,咱们要一直等前面的耗时任务完成了以后才能接着处理后面耗时的任务,那要等多久才能处理完?别担忧,咱们今天的主角--工做队列就能够解决该问题。咱们将围绕下面这个索引展开:java
废话少说,直接展开。ide
1、什么是工做队列fetch
工做队列--用来将耗时的任务分发给多个消费者(工做者),主要解决这样的问题:处理资源密集型任务,而且还要等他完成。有了工做队列,咱们就能够将具体的工做放到后面去作,将工做封装为一个消息,发送到队列中,一个工做进程就能够取出消息并完成工做。若是启动了多个工做进程,那么工做就能够在多个进程间共享。spa
2、代码准备3d
public class NewTask { //队列名称 public static final String QUEUE_NAME = "TASK_QUEUE"; //队列是否须要持久化 public static final boolean DURABLE = false; //须要发送的消息列表 public static final String[] msgs = {"task 1", "task 2", "task 3", "task 4", "task 5", "task 6"}; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.connection & channel connection = factory.newConnection(); channel = connection.createChannel(); // 2.queue channel.queueDeclare(QUEUE_NAME, DURABLE, false, false, null); // 3.publish msg for (int i = 0; i < msgs.length; i++) { channel.basicPublish("", QUEUE_NAME, null, msgs[i].getBytes()); System.out.println("** new task ****:" + msgs[i]); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
public class Work { public static void main(String[] args) { System.out.println("*** Work ***"); ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { //1.connection & channel final Channel channel = factory.newConnection().createChannel(); //2.queue channel.queueDeclare(NewTask.QUEUE_NAME, NewTask.DURABLE, false, false, null); //3. consumer instance Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); //deal task doWork(msg); } }; //4.do consumer boolean autoAck = true; channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } private static void doWork(String msg) { try { System.out.println("**** deal task begin :" + msg); //伪装task比较耗时,经过sleep()来模拟须要消耗的时间 if ("sleep".equals(msg)) { Thread.sleep(1000 * 60); } else { Thread.sleep(1000); } System.out.println("**** deal task finish :" + msg); } catch (InterruptedException e) { e.printStackTrace(); } } }
3、循环分发code
咱们先启动Work和Work2,而后启动NewTask,运行结果以下:blog
NewTask运行结果:索引
Work运行结果:rabbitmq
Work2运行结果:队列
咱们发现,消息生产者发送了6条消息,消费者work和work2分别分到了3个消息,并且是循环轮流分发到的,这种分发的方式就是循环分发。
4、消息确认
假如咱们在发送的消息里面添加“sleep"
//须要发送的消息列表 public static final String[] msgs = {"sleep", "task 1", "task 2", "task 3", "task 4", "task 5", "task 6"};
根据代码中的实现,这个sleep要耗时1分钟,万一在这1分钟以内,工做进程崩溃了或者被kill了,会发生什么状况呢?根据上面的代码:
//4.do consumer boolean autoAck = true; channel.basicConsume(NewTask.QUEUE_NAME, autoAck, consumer);
自动确认为true,每次RabbitMQ向消费者发送消息以后,会自动发确认消息(我工做你放心,不会有问题),这个时候消息会当即从内存中删除。若是工做者挂了,那将会丢失它正在处理和未处理的全部工做,并且这些工做还不能再交由其余工做者处理,这种丢失属于客户端丢失。
咱们来验证下,和刚才的步骤同样执行程序:
1.NewTask的控制台打印结果: ** new task ****:sleep ** new task ****:task 1 ** new task ****:task 2 ** new task ****:task 3 ** new task ****:task 4 ** new task ****:task 5 ** new task ****:task 6 2.Work的控制台打印结果: **** deal task begin :sleep 3.Work2的控制台打印结果: **** deal task begin :task 1 **** deal task finish :task 1 **** deal task begin :task 3 **** deal task finish :task 3 **** deal task begin :task 5 **** deal task finish :task 5
根据上面的内容,消息生产者发送了7条消息, work2消费了一、三、5 三条,那剩下的sleep、二、四、6 这四条消息确定是work来处理,只是sleep耗时一分钟 ,时间差后面的还没来得及处理,这个时候咱们kill掉work,去看下RabbitMQ 管理页面,没有未处理的消息,消息随着work被kill也跟着丢失了。
是否是很可怕?
为了应对这种状况,RabbitMQ支持消息确认。消费者处理完消息以后,会发送一个确认消息告诉RabbitMQ,消息处理完了,你能够删掉它了。
代码修改(Work.java和Work2.java同步修改):1.将自动确认改成false,2.消息处理以后再经过channel.basicAck进行消息确认
修改完后,执行程序:
1.NewTask的控制台打印结果: ** new task ****:sleep ** new task ****:task 1 ** new task ****:task 2 ** new task ****:task 3 ** new task ****:task 4 ** new task ****:task 5 ** new task ****:task 6 2.Work的控制台打印结果: **** deal task begin :sleep 3.Work2的控制台打印结果: **** deal task begin :task 1 **** deal task finish :task 1 **** deal task begin :task 3 **** deal task finish :task 3 **** deal task begin :task 5 **** deal task finish :task 5
而后kill掉work,去看RabbitMQ管理页面,会发现有4条未确认:
再去看下work2的控制台,work2将work未处理完和将来得及处理的消息都给处理了:
等work2处理完后,你再去看RabbitMQ管理页面,会发现页面的消息数值也都变成0 了。
5、公平分发
按照上面那种循环分发的方式,每一个消费者会分到相同数量的任务,这样会有一个问题:假若有一些task很是耗时,以前的任务尚未完成,后面又来了那么多任务,来不及处理,那咋办? 有的消费者忙的不可开交,有的消费者却很快处理完事情而后无所事事浪费资源,那咋整?答案就是:公平分发。 怎么实现呢?
发生上述问题的缘由就是RabbitMQ收到消息后就当即分发出去,而没有确认各个工做者未返回确认的消息数量。所以咱们可使用basicQos
方法,并将参数prefetchCount
设为1,告诉RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样RabbitMQ就不会轮流分发了,而是寻找空闲的工做者进行分发。
代码修改(work和Work2同步修改):
执行代码:
1.NewTask的控制台打印结果: ** new task ****:sleep ** new task ****:task 1 ** new task ****:task 2 ** new task ****:task 3 ** new task ****:task 4 ** new task ****:task 5 ** new task ****:task 6 2.Work的控制台打印结果: **** deal task begin :sleep **** deal task finish :sleep 3.Work2的控制台打印结果: **** deal task begin :task 1 **** deal task finish :task 1 **** deal task begin :task 2 **** deal task finish :task 2 **** deal task begin :task 3 **** deal task finish :task 3 **** deal task begin :task 4 **** deal task finish :task 4 **** deal task begin :task 5 **** deal task finish :task 5 **** deal task begin :task 6 **** deal task finish :task 6
Work只处理了sleep,Work2处理了一、二、三、四、五、6 这个六条消息。
6、消息持久化
上面说到消息确认的时候,提到了工做者被kill的状况。那若是RabbitMQ被stop掉了呢?咱们来看下:
此次只启动Work和NewTask,不启动Work2,全部消息都交给Work来处理,控制台打印信息:
1.NewTask的控制台打印结果: ** new task ****:sleep ** new task ****:task 1 ** new task ****:task 2 ** new task ****:task 3 ** new task ****:task 4 ** new task ****:task 5 ** new task ****:task 6 2.Work的控制台打印结果: **** deal task begin :sleep
在work处理sleep的过程当中,咱们停掉RabbitMQ服务
而后从新start服务并执行rabbitmq-plugins enable rabbitmq_management命令,而后查看管理页面:
你会发现,全部消息都将被清空了。这种丢失属于服务端丢失。
所以须要将消息进行持久化来应对这种状况。
持久化须要作两件事情:
队列持久化,在声明队列的时候,将第二个参数设为true
另外,因为RabbitMQ不容许从新定义已经存在的队列,不然就会报错(上一篇博客中已经提到过了),所以咱们将此次的队列名改下:
消息持久化,在发送消息的时候,将第三个参数设为2
而后运行代码,在work处理sleep的时候将服务停掉,并从新启动且执行rabbitmq-plugins enable rabbitmq_management命令,而后查看管理页面:
一共7条消息,未确认的1条(sleep)和ready的6条(一、二、三、四、五、6)。消息被保存了下来。
从新启动Work,全部消息被消费: