实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。html
实现推模式推荐的方式是继承 DefaultConsumer 基类,也能够使用Spring AMQP的 SimpleMessageListenerContainer 。java
推模式是最经常使用的,可是有些状况下推模式并不适用的,好比说:sql
因为某些限制,消费者在某个条件成立时才能消费消息架构
须要批量拉取消息进行处理并发
实现拉模式分布式
RabbitMQ的Channel提供了 basicGet 方法用于拉取消息。ide
/** * Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get} * @see com.rabbitmq.client.AMQP.Basic.Get * @see com.rabbitmq.client.AMQP.Basic.GetOk * @see com.rabbitmq.client.AMQP.Basic.GetEmpty * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @return a {@link GetResponse} containing the retrieved message data * @throws java.io.IOException if an error is encountered */ GetResponse basicGet(String queue, boolean autoAck) throws IOException;
basicGet 返回 GetResponse 类。高并发
public class GetResponse { private final Envelope envelope; private final BasicProperties props; private final byte[] body; private final int messageCount; // ...
rabbitmq-client版本4.0.3性能
使用 basicGet 拉取消息须要注意:学习
basicGet DefaultConsumer
示例代码:
private void consume(Channel channel) throws IOException, InterruptedException { while (true) { if (!isConditionSatisfied()) { TimeUnit.MILLISECONDS.sleep(1); continue; } GetResponse response = channel.basicGet(CAOSH_TEST_QUEUE, false); if (response == null) { TimeUnit.MILLISECONDS.sleep(1); continue; } String data = new String(response.getBody()); logger.info("Get message <= {}", data); channel.basicAck(response.getEnvelope().getDeliveryTag(), false); } }
批量拉取消息
RabbitMQ支持客户端批量拉取消息,客户端能够连续调用 basicGet 方法拉取多条消息,处理完成以后一次性ACK。须要注意:
basicGet basicAck
示例代码:
String bridgeQueueName = extractorProperties.getBridgeQueueName(); int batchSize = extractorProperties.getBatchSize(); List<GetResponse> responseList = Lists.newArrayListWithCapacity(batchSize); long tag = 0; while (responseList.size() < batchSize) { GetResponse getResponse = channel.basicGet(bridgeQueueName, false); if (getResponse == null) { break; } responseList.add(getResponse); tag = getResponse.getEnvelope().getDeliveryTag(); } if (responseList.isEmpty()) { TimeUnit.MILLISECONDS.sleep(1); } else { logger.info("Get <{}> responses this batch", responseList.size()); // handle messages channel.basicAck(tag, true); }
关于QueueingConsumer
QueueingConsumer 在客户端本地使用 BlockingQueue 缓冲消息,其nextDelivery方法也能够用于实现拉模式(其本质上是 BlockingQueue.take ),可是 QueueingConsumer 如今已经标记为Deprecated。
欢迎工做一到五年的Java工程师朋友们加入Java架构开发: 855835163 群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交代!