工做队列模式为一个生产者对应多个消费者,可是只有一个消费者得到消息,即一个队列被多个消费者监听,但一条消息只能被其中的一个消费者获取java
代码以下:git
生产者代码:github
public class WorkSend { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //获取链接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //声明通道 Channel channel = connection.createChannel(); //建立队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", "hello", null, "First.".getBytes()); channel.basicPublish("", "hello", null, "Secode..".getBytes()); channel.basicPublish("", "hello", null, "Third....".getBytes()); channel.basicPublish("", "hello", null, "Fourth....".getBytes()); channel.basicPublish("", "hello", null, "Fifth.....".getBytes()); //六、关闭通道和链接 channel.close(); connection.close(); } }
消费者代码spring
public class WorkRecv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //获取链接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //声明通道 Channel channel = connection.createChannel(); //声明队列队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); //channel.basicAck(); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); // DeliverCallback deliverCallback = new DeliverCallback(){ // @Override // public void handle(String consumerTag, Delivery delivery) throws IOException { // String message = new String(delivery.getBody(), "UTF-8"); // System.out.println(" [x] Received '" + message + "'"); // } // }; // // channel.basicConsume(QUEUE_NAME, true, deliverCallback, new CancelCallback(){ // @Override // public void handle(String consumerTag) throws IOException { // // } // }); } private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
1 生产者将消息交个交换机 2 交换机交给绑定的队列 3 队列由多个消费者同时监听,只有其中一个可以获取这一条消息,造成了资源的争抢,谁的资源空闲大,争抢到的可能越大;app
Round-robin dispatching(轮询分发)ide
使用任务队列的优势之一是可以轻松并行工做。若是咱们这里积压了不少的消息,咱们能够增长work的并行度,这样就能够轻松扩展。fetch
默认状况下,RabbitMQ将每一个消息依次发送给下一个消费者。平均而言,每一个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。ui
Message acknowledgment(消息确认)this
使用咱们当前的代码,RabbitMQ一旦向消费者发送了一条消息,便当即将其标记为删除。在这种状况下,若是咱们kill掉一个worker,咱们将丢失正在处理的消息。而且还将丢失全部发送给该特定worker但还没有处理的消息。 为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(告知),告知RabbitMQ特定的消息已被接收并处理,而且RabbitMQ能够自由删除它。 若是消费者死了(其通道已关闭,链接已关闭或TCP链接丢失)而没有发送确认,RabbitMQ将了解消息未彻底处理,并将从新排队。若是同时有其余消费者在线,它将很快将其从新分发给另外一个消费者。这样,您能够确保即便worker偶尔死亡也不会丢失任何消息。 没有任何消息超时;消费者死亡时,RabbitMQ将从新传递消息。即便处理一条消息花费很是很是长的时间也不要紧。 默认状况下,手动消息确认处于打开状态。咱们经过autoAck = false 标志显式关闭了它们。在消息完成投递的时候,手动确认消息投递成功。spa
消费者代码修改以下:
public class ManWorkRecv { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { //获取链接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //声明通道 Channel channel = connection.createChannel(); //声明队列队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(" [x] Done"); //手动确认消息已经成功投递 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; //设置消息手动确认 channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
上述代码能够确保咱们在kill掉一个消费者的状况下,消息不会丢失。
Message durability(消息持久化)
前面已经讲了即便一个消费者退出,消息不会丢失,可是若是RabbitMQ服务退出时,队列和消息仍然会丢失,这是由于 默认队列和消息都是放在内存中的 。为保证消息和队列不丢失,须要把队列和消息设置为持久化 确保RabbitMQ永远不会丢失咱们的队列。声明持久化代码以下:
boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
因为RabbitMQ已经存在一个hello队列,而且RabbitMQ不支持对已经存在的队列进行参数修改,因此须要咱们删除以前建立的队列或者从新建立一个队列。
其次,咱们须要保证消息的持久化,消息持久化设置以下:
channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "Fifth.....".getBytes());
这样,咱们就能够保证RabbitMQ服务宕机的状况下,消息和队列都不会丢失。
Fair dispatching(公平分发)
RabbitMQ在进行消息分发的时候,能够设置一次分发给某一个消费者多少条消息,如在消费者端设置prefetchCount=1;以下代码
int prefetchCount = 1 ; channel.basicQos(prefetchCount);
该设置表示,RabbitMQ一次分发给消费者一条消息,在消费者处理并确认上一条消息以前,不会再给这个消费者发送一条新消息,而会将其分发给其余的消费者
SpringBoot实现:
@SpringBootApplication @EnableScheduling public class RabbitAmqpTutorialsApplication { public static void main(String[] args) throws Exception { SpringApplication.run(RabbitAmqpTutorialsApplication.class, args); } }
@Configuration public class Tut2Config { @Bean public Queue hello() { return new Queue("hello"); } /** * 这里是启动两个消费者 */ private static class ReceiverConfig{ @Bean public Tut2Receiver receiver1(){ return new Tut2Receiver(1); } @Bean public Tut2Receiver receiver2(){ return new Tut2Receiver(2); } } @Bean public Tut2Sender sender() { return new Tut2Sender(); } }
//监听队列 @RabbitListener(queues = "hello") public class Tut2Receiver { private final int instance; public Tut2Receiver(int i) { this.instance = i; } @RabbitHandler public void receive(String in) throws InterruptedException { StopWatch watch = new StopWatch(); watch.start(); System.out.println("instance " + this.instance + " [x] Received '" + in + "'"); doWork(in); watch.stop(); System.out.println("instance " + this.instance + " [x] Done in " + watch.getTotalTimeSeconds() + "s"); } private void doWork(String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
public class Tut2Sender { @Autowired private RabbitTemplate template; //队列 @Autowired private Queue queue; AtomicInteger dots = new AtomicInteger(0); AtomicInteger count = new AtomicInteger(0); /** * 定时向队列hello发送消息 */ @Scheduled(fixedDelay = 1000, initialDelay = 500) public void send() { StringBuilder builder = new StringBuilder("Hello"); if (dots.incrementAndGet() == 3) { dots.set(1); } for (int i = 0; i < dots.get(); i++) { builder.append('.'); } builder.append(count.incrementAndGet()); String message = builder.toString(); //向队列中发送消息 template.convertAndSend(queue.getName(), message); System.out.println(" [x] Sent '" + message + "'"); } }