工做队列:一个生产者对应多个消费者,生产者直接将消息发送到rabbitmq的队列之中。java
消息分配模式:公平分配ide
队列会先给每一个消费者轮流发送一条信息,消费者接收到信息并对之处理。若是不反馈处理结果,队列就不会再发送信息给该消费者,而是发送给其余已经处理完一条信息并反馈的消费者。code
简而言之,哪一个消费者的处理信息效率更高,则收到的信息也越多。blog
生产者:rabbitmq
package com.example.demo.queue.workfair; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "work_fair_queue"; public static void main(String[] args) { Connection connection = null; Channel channel = null; try { connection = ConnectionUtil.getConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 限定:发送一条信息给消费者A,消费者A未反馈处理结果以前,不会再次发送信息给消费者A channel.basicQos(1); String msg = "msg from producer:"; for(int i=0;i<10;i++) { msg = "msg from producer :" + i; System.out.println("send msg : "+msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } } catch (Exception e) { e.printStackTrace(); } finally { // 关闭通道 try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } // 关闭链接 try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
消费者1:队列
package com.example.demo.queue.workfair; import java.io.IOException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer01 { // 队列名称 private static final String QUEUE_NAME="work_fair_queue"; public static void main(String[] args) { try { // 获取链接 Connection connection = ConnectionUtil.getConnection(); // 建立通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 限定:发送一条信息给消费者A,消费者A未反馈处理结果以前,不会再次发送信息给消费者A channel.basicQos(1); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[1]:receive msg:"+msg); try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("[1]:deal msg successful."); } // 反馈消息处理完毕 channel.basicAck(envelope.getDeliveryTag(), false); } }; boolean autoAck = false;// 取消自动反馈 // 接收信息 channel.basicConsume(QUEUE_NAME, autoAck, consumer); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭通道 // try { // channel.close(); // } catch (IOException e) { // e.printStackTrace(); // } catch (TimeoutException e) { // e.printStackTrace(); // } // 关闭链接 // try { // connection.close(); // } catch (IOException e) { // e.printStackTrace(); // } } } }
消费者2:get
package com.example.demo.queue.workfair; import java.io.IOException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer02 { // 队列名称 private static final String QUEUE_NAME="work_fair_queue"; public static void main(String[] args) { try { // 获取链接 Connection connection = ConnectionUtil.getConnection(); // 建立通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 限定:发送一条信息给消费者A,消费者A未反馈处理结果以前,不会再次发送信息给消费者A channel.basicQos(1); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[2]:receive msg:"+msg); try { Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("[2]:deal msg successful."); } // 反馈消息处理完毕 channel.basicAck(envelope.getDeliveryTag(), false); } }; boolean autoAck = false;// 取消自动反馈 // 接收信息 channel.basicConsume(QUEUE_NAME, autoAck, consumer); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭通道 // try { // channel.close(); // } catch (IOException e) { // e.printStackTrace(); // } catch (TimeoutException e) { // e.printStackTrace(); // } // 关闭链接 // try { // connection.close(); // } catch (IOException e) { // e.printStackTrace(); // } } } }
执行顺序:it
先执行两个消费者(Consumer01,Consumer02)类的main方法,再执行生产者(Producer)的main方法便可io
顺序反过来亦可,可是可能会影响到公平分配的效果。class
生产者终端:
消费者1终端:
消费者2终端:
实现公平分配关键点:
// 限定:发送一条信息给消费者A,消费者A未反馈处理结果以前,不会再次发送信息给消费者A channel.basicQos(1);
boolean autoAck = false;// 取消自动反馈 // 接收信息 channel.basicConsume(QUEUE_NAME, autoAck, consumer);
// 反馈消息处理完毕 channel.basicAck(envelope.getDeliveryTag(), false);