本示例程序所有来自rabbitmq官方示例程序,rabbitmq-demo;
官方共有6个demo,针对不一样的语言(如 C#,Java,Spring-AMQP等),都有不一样的示例程序;
本示例程序主要是Spring-AMQP的参考示例,若是须要其余语言的参考示例,能够参考官网;html
4种不一样的Exchange,对routingKey的解释都不相同;
对routingKey的不一样解释,决定了Exchange路由Message到Queue的不一样方案;java
aaa.bbb.xxx
、*.ccc.dd.#
,相似正则表达式匹配;<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring.profiles.active=hello-world, sender, receiver
Work queues官方示例
application.properties配置github
spring.profiles.active=work-queues, sender, receiver #spring.profiles.active=work-queues, sender #spring.profiles.active=work-queues, receiver
详细描述参见:单生产者-多消费者详细正则表达式
Publish/Subscribe官方示例
spring
application.properties配置缓存
spring.profiles.active=pub-sub, receiver , sender
详细描述参见:发布/订阅详细架构
Routing官方示例app
a message goes to the queues whose binding key
exactly matches the routing key
of the message;(相等时才路由)异步
两个Queue使用相同的BingingKey(black) ==> 效果相似于:发布/订阅模式(demo3);
application.properties配置
pring.profiles.active=routing, receiver , sender
详细描述参见:发布/订阅详细
*.aaa.bbb.#
;*
: 能够匹配一个word;#
: 能够匹配0个或多个words;application.properties配置
pring.profiles.active=topics, receiver , sender
详细描述参见:Topics
application.properties配置
spring.profiles.active=rpc,server #spring.profiles.active=rpc,client
详细描述参见:RPC
消费者注册后,rabbitmq将消息交付给消费者时,都会带有一个“Delivery Tags”,这个是惟一的ID标识,id以整数的递增的方式实现。
缺点:
批量确认
;delete
,不支持批量确认
;假设:在Channel(ch)上有5,6,7,8这4个delivery tags未确认;
delivery_tag=8 & multiple=true
: 则5,6,7,8这4个tags都将被确认;delivery_tag=8 & multiple=false
:则只有8被确认,而5,6,7将不会被被确认;异步
完成的,手动确认也是异步
的;但愿控制未被确认消息的size,防止无界的缓存
;prefetch count
:使用basic.qos
方法设置该值能够控制未被确认消息的max size;basic.qos
方法有效,对basic.get
方法无效;假设:在Channel(Ch)上有5,6,7,8共4个未被确认
的消息,且ch的prefetch count=4
;
结果:rabbitmq将不会再交付任何消息到该Channel上,除非有消息被确认;
prefetch
:提升向消费者传递消息的速度;应避免:
自动确认模式
;手动确认模式
+ 无限制的prefetch
;结论:
状况1
和状况2
均可能致使交付但将来得及处理
的Message增长,增大RAM的消耗;推荐值:
prefetch
: 100~300,能够有效提升吞吐量,并避免RAM消耗过多的风险;当消息发送给消费端后,若是出现以下状况,则消息会从新reQueue
,会被再次发送;
消费端没法对同一个消息确认超过一次,当超过一次以后,将抛出Channel error: PRECONDITION_FAILED - unknown delivery tag XXXX
delivery tag
;每一个消息单独确认
和批量消息确认
;prefetchCount
:能够控制消息端的吞吐量,避免消费端消费过慢,产生RAM大量消耗;TCP链接中断
或消费端挂掉
,都会引发消息从新入队列,从新消费(手动消息确认时);确认
,不然会抛出异常;在 AMQP 0-9-1中,保证消息不丢失的惟一方法,就是使用事务;
confirm.select
: 应用于Channel时,表示使用确认模式
;事务
和确认模式
没法共存:两者只能选择其一;confirm.select
;broker
发送basic.ack
来确认Message已被处理;delivery-tag
: 消息序列,具备惟一性;multiple=true
: 用于设置批量消息确认
;confirmed(OK)
,要么被nack(fail)
,且only once;Java示例:(发送端发送大量messages,使用确认模式)
程序-确认模式
异常状况时,服务端没法处理消息,则broker
发送basic.nack
来进行否认确认
;
basic.ack
应答;异步处理应答
、批量发送消息
;当使用异步发送和持久化消息时,broker对消息的确认顺序
可能和发送者的消息发送顺序
不一致;
Delivery tag is a 64 bit long value, and thus its maximum value is 9223372036854775807.Since delivery tags are scoped per channel, it is very unlikely that a publisher or consumer will run over this value in practice.