RabbitMQ是用Erlang语言编写的分布式消息中间件,经常用在大型网站中做为消息队列来使用,主要目的是各个子系统之间的解耦和异步处理。消息中间件的基本模型是典型的生产者-消费者模型,生产者发送消息到消息队列,消费者监听消息队列,收到消息后消费处理。java
在使用RabbitMQ作消息分发时,主要有三个概念要注意:Exchange,RoutingKey,Queue。spring
Exchange能够理解为交换器,RoutingKey能够理解为路由,Queue做为真实存储消息的队列和某个Exchange绑定,具体如何路由到感兴趣的Queue则由Exchange的三种模式决定:多线程
Exchange为fanout时,生产者往此Exchange发送的消息会发给每一个和其绑定的Queue,此时RoutingKey并不起做用;Exchange为topic时,生产者能够指定一个支持通配符的RoutingKey(如demo.*)发向此Exchange,凡是Exchange上RoutingKey知足此通配符的Queue就会收到消息;direct类型的Exchange是最直接最简单的,生产者指定Exchange和RoutingKey,而后往其发送消息,消息只能被绑定的知足RoutingKey的Queue接受消息。(一般若是不指定RoutingKey的具体名字,那么默认的名字实际上是Queue的名字)并发
在一般的使用中(Java项目),咱们通常会结合spring-amqp框架来使用RabbitMQ,spring-amqp底层调用RabbitMQ的java client来和Broker交互,好比咱们会用以下配置来创建RabbitMQ的链接池、声明Queue以及指明监听者的监听行为:框架
<rabbit:connection-factory id="connectionFactory" /> <!-- template非必须,主要用于生产者发送消息--> <rabbit:template id="template" connection-factory="connectionFactory" /> <rabbit:queue name="remoting.queue" /> <rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3"> <rabbit:listener ref="listener" queue-names="remoting.queue" /> </rabbit:listener-container>
listener-container能够设置消费者在监听Queue的时候的各类参数,其中concurrency和prefetch是本篇文章比较关心的两个参数,如下是spring-amqp文档的解释:异步
prefetchCount(prefetch)
The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode
is NONE. This will be increased, if necessary, to match the txSizesocket
concurrentConsumers(concurrency)分布式
The number of concurrent consumers to initially start for each listener.ide
简单解释下就是concurrency设置的是对每一个listener在初始化的时候设置的并发消费者的个数,prefetch是每次从一次性从broker里面取的待消费的消息的个数,上面的配置在监控后台看到的效果以下:函数
图中能够看出有两个消费者同时监听Queue,可是注意这里的消息只有被一个消费者消费掉就会自动ack,另一个消费者就不会再获取到此消息,Prefetch Count为配置设置的值3,意味着每一个消费者每次会预取3个消息准备消费。每一个消费者对应的listener有个Exclusive参数,默认为false, 若是设置为true,concurrency就必须设置为1,即只能单个消费者消费队列里的消息,适用于必须严格执行消息队列的消费顺序(先进先出)。
这里concurrency的实现方式不看源码也能猜到,确定是用多线程的方式来实现的,此时同一进程下打开的本地端口都是56278.下面看看listener-contaner对应的org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer的源码:
protected int initializeConsumers() { int count = 0; synchronized (this.consumersMonitor) { if (this.consumers == null) { this.cancellationLock.reset(); this.consumers = new HashMap<BlockingQueueConsumer, Boolean>(this.concurrentConsumers); for (int i = 0; i < this.concurrentConsumers; i++) { BlockingQueueConsumer consumer = createBlockingQueueConsumer(); this.consumers.put(consumer, true); count++; } } } return count; }
container启动的时候会根据设置的concurrency的值(同时不超过最大值)建立n个BlockingQueueConsumer。
protected void doStart() throws Exception { //some code synchronized (this.consumersMonitor) { int newConsumers = initializeConsumers(); //some code Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>(); for (BlockingQueueConsumer consumer : this.consumers.keySet()) { AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); processors.add(processor); this.taskExecutor.execute(processor); } //some code } }
在doStart()方法中调用initializeConsumers来初始化全部的消费者,AsyncMessageProcessingConsumer做为真实的处理器包装了BlockingQueueConsumer,而AsyncMessageProcessingConsumer其实实现了Runnable接口,由this.taskExecutor.execute(processor)来启动消费者线程。
private final class AsyncMessageProcessingConsumer implements Runnable { private final BlockingQueueConsumer consumer; private final CountDownLatch start; private volatile FatalListenerStartupException startupException; private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) { this.consumer = consumer; this.start = new CountDownLatch(1); } //some code @Override public void run() { //some code } }
那么prefetch的值意味着什么呢?其实从名字上大体能看出,BlockingQueueConsumer内部应该维护了一个阻塞队列BlockingQueue,prefetch应该是这个阻塞队列的长度,看下BlockingQueueConsumer内部有个queue,这个queue不是对应RabbitMQ的队列,而是Consumer本身维护的内存级别的队列,用来暂时存储从RabbitMQ中取出来的消息:
private final BlockingQueue<Delivery> queue; public BlockingQueueConsumer(ConnectionFactory connectionFactory, MessagePropertiesConverter messagePropertiesConverter, ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode, boolean transactional, int prefetchCount, boolean defaultRequeueRejected, Map<String, Object> consumerArgs, boolean exclusive, String... queues) { //some code this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount); }
BlockingQueueConsumer的构造函数清楚说明了每一个消费者内部的队列大小就是prefetch的大小。
前面说过,设置并发的时候,要考虑具体的业务场景,对那种对消息的顺序有苛刻要求的场景不适合并发消费,而对于其余场景,好比用户注册后给用户发个提示短信,是不太在乎哪一个消息先被消费,哪一个消息后被消费,由于每一个消息是相对独立的,后注册的用户先收到短信也并无太大影响。
设置并发消费除了能提升消费的速度,还有另一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue的消息也会被一直阻塞,可是新来的消息仍然能够投递给其余消费者消费,这种状况顶多会致使prefetch个数目的消息消费有问题,而不至于单消费者状况下整个RabbitMQ的队列会由于一个消息有问题而所有堵死。全部在合适的业务场景下,须要合理设置concurrency和prefetch值。
转自:https://www.jianshu.com/p/04a1d36f52ba