这节咱们主要讲RabbitMQ的分发,由生产者发布一个任务,多个接受者去获取任务来进行加工处理。java
一个队列的优势就是很容易处理并行化的工做能力,可是若是咱们积累了大量的工做,咱们就须要更多的工做者来处理,这里就要采用分布机制了。数据库
咱们建立一个新的生产者NewTask服务器
package com.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, TimeoutException{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); //分发消息 for(int i=0; i<10; i++){ String message = "Hello RabbitMQ" + i; //MessageProperties.PERSISTENT_TEXT_PLAIN设置持久化 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println("NewTask send :" + message); } channel.close(); connection.close(); } }
而后咱们建立一个Work1和Work2去接收任务,其中Work1和Work2代码同样。负载均衡
package com.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP; public class Work1 { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); System.out.println("Worker1 Waiting for messages"); //每次从队列获取的数量 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ String message = new String(body, "UTF-8"); System.out.println("Worker1 Received:" + message); try{ Thread.sleep(1000); // 暂停1秒钟 }catch(Exception e){ //此操做中的全部异常将被丢弃 channel.abort(); }finally{ System.out.println("Worker1 Done"); //消息处理完成后手工确认,即下面的basicConsume第二个参数要为false。 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; //消息完成确认 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } }
package com.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Work2 { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null); System.out.println("Worker2 Waiting for messages"); //每次从队列获取的数量 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ String message = new String(body, "UTF-8"); System.out.println("Worker2 Received:" + message); try{ Thread.sleep(1000); // 暂停1秒钟 }catch(Exception e){ //此操做中的全部异常将被丢弃 channel.abort(); }finally{ System.out.println("Worker2 Done"); //消息处理完成后手工确认,即下面的basicConsume第二个参数要为false。 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; //消息完成确认 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } }
运行结果:ide
NewTask:spa
Work1:3d
Work2:code
channel.basicQos(1);保证一次只分发一个,默认状况下,RabbitMQ将队列消息随机分配给每一个消费者,这时可能出现消息调度不均衡的问题。例若有两台消费者服务器,一个服务器可能很是繁忙,消息不断,另一个却很清闲,没有什么负载。RabbitMQ不会主动介入这些状况,仍是会随机调度消息到每台服务器。这是由于RabbitMQ此时只负责调度消息,不会根据ACK的反馈机制来分析那台服务器返回反馈慢,是否是处理不过来啊?blog
为了解决这个问题,咱们可使用channel.basicQos(1)这个设置。这个设置告诉RabbitMQ,不要一次将多个消息发送给一个消费者。这样作的好处是只有当消费者处理完成当前消息并反馈后,才会收到另一条消息或任务。这样就避免了负载不均衡的事情了。rabbitmq
autoAck是否自动回复,若是为true的话,每次生产者只要发送信息就会从内存中删除,那么若是消费者程序异常退出,那么就没法获取数据,咱们固然是不但愿出现这样的状况,因此才去手动回复,每当消费者收到并处理信息而后在通知生成者。最后从队列中删除这条信息。若是消费者异常退出,若是还有其余消费者,那么就会把队列中的消息发送给其余消费者,若是没有,等消费者启动时候再次发送。所以一旦将autoAck关闭以后,必定要记得处理完消息以后,向服务器确认消息。不然服务器将会一直转发该消息。若是忘记了向服务器确认处理完消息的话,队列中的信息会一直存在。好比将NewTask运行一次,Work1中注释掉channel.basicAck(envelope.getDeliveryTag(), false); 而且一次获取10条信息,那么每运行一次Work1都会收到队列task_queue的消息。
所以忘记确认 忘记经过basicAck返回确认信息是常见的错误。这个错误很是严重,将致使消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,并且RabbitMQ也不会主动删除这些被退回的消息。
可是除了设置ack手动回复之外,仍是不够的,若是RabbitMQ-Server忽然挂掉了,那么尚未被读取的消息仍是会丢失 ,因此咱们可让消息持久化。 只须要在定义Queue时,设置持久化消息就能够了,方法以下:
boolean durable = true; channel.queueDeclare(channelName, durable, false, false, null);
这样设置以后,服务器收到消息后就会马上将消息写入到硬盘,就能够防止忽然服务器挂掉,而引发的数据丢失了。可是服务器若是刚收到消息,还没来得及写入到硬盘,就挂掉了,这样仍是没法避免消息的丢失。
由于RabbitMQ不作实时当即的磁盘同步(fsync)。这种状况下,对于持久化要求不是特别高的简单任务队列来讲,仍是能够知足的。若是须要更强大的保证,那么你能够考虑使用生产者确认反馈机制。
注意,服务器重启后这条队列颇有可能会报错,由于已经定义的队列,再次定义是无效的,这就是幂次原理。RabbitMQ不容许从新定义一个已有的队列信息,也就是说不容许修改已经存在的队列的参数。若是你非要这样作,只会返回异常。
所以一个快速有效的方法就是从新声明另外一个名称的队列,不过这须要修改生产者和消费者的代码,因此,在开发时,最好是将队列名称放到配置文件中。这时,即便RabbitMQ服务器重启,新队列中的消息也不会丢失。
1:不要一次将多个消息给一个消费者,采用负载均衡。
2:channel.basicConsume()里的ack参数。当从队列当中取出一个消息的时候,RabbitMQ须要应用显式地回馈说已经获取到了该消息。若是一段时间内不回馈,RabbitMQ会将该消息从新分配给另一个绑定在该队列上的消费者。另外一种状况是消费者断开链接,可是获取到的消息没有回馈,则RabbitMQ一样从新分配。若是将该参数设置为true,则RabbimtMQ会为下一个AMQP请求添加一个ack属性,告诉AMQP服务器须要等待回馈。否者,不要等待回馈。大多数时候,你也许想要本身手工发送回馈,例如,须要在回馈以前将消息存入数据库。回馈一般是经过调用 channel.basicAck(deliveryTag, multiple)方法。
3:持久化队列。并最好将队列名称写在配置文件中。