一. 使用RabbitMQ的好处
1.解耦,系统A在代码中直接调用系统B和系统C的代码,若是未来D系统接入,系统A还须要修改代码,过于麻烦!
2.异步,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
3.削峰,并发量大的时候,全部的请求直接怼到数据库,形成数据库链接异常
2、RabbitMQ 中的 broker 是指什么?cluster 又是指什么?java
3、RabbitMQ 概念里的 channel、exchange 和 queue 是逻辑概念,仍是对应着进程实体?分别起什么做用?node
4、vhost 是什么?起什么做用?面试
5、消息基于什么传输?redis
6、消息如何分发?数据库
7、消息怎么路由?安全
8、什么是元数据?元数据分为哪些类型?包括哪些内容?与 cluster 相关的元数据有哪些?元数据是如何保存的?元数据在 cluster 中是如何分布的?并发
9、在单 node 系统和多 node 构成的 cluster 系统中声明 queue、exchange ,以及进行 binding 会有什么不一样?异步
答:当你在单 node 上声明 queue 时,只要该 node 上相关元数据进行了变动,你就会获得 Queue.Declare-ok 回应;而在 cluster 上声明 queue ,则要求 cluster 上的所有 node 都要进行元数据成功更新,才会获得 Qu6eue.Declare-ok 回应。另外,若 node 类型为 RAM node 则变动的数据仅保存在内存中,若类型为 disk node 则还要变动保存在磁盘上的数据。ide
10、如何确保消息正确地发送至RabbitMQ?性能
11、如何确保消息接收方消费了消息?
接收方消息确认机制:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不一样操做)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。这里并无用到超时机制,RabbitMQ仅经过Consumer的链接中断来确认是否须要从新发送消息。也就是说,只要链接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。
下面罗列几种特殊状况:
12、如何避免消息重复投递或重复消费?
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,做为去重和幂等的依据(消息投递失败并重传),避免重复的消息进入队列;在消息消费时,要求消息体中必需要有一个bizId(对于同一业务全局惟一,如支付ID、订单ID、帖子ID等)做为去重和幂等的依据,避免同一条消息被重复消费。
这个问题针对业务场景来答分如下几点:
十3、如何解决丢数据的问题?
1.生产者丢数据
生产者的消息没有投递到MQ中怎么办?从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事物(channel.txSelect()),而后发送消息,若是发送过程当中出现什么异常,事物就会回滚(channel.txRollback()),若是发送成功则提交事物(channel.txCommit())。
然而缺点就是吞吐量降低了。所以,按照博主的经验,生产上用confirm模式的居多。一旦channel进入confirm模式,全部在该信道上面发布的消息都将会被指派一个惟一的ID(从1开始),一旦消息被投递到全部匹配的队列以后,rabbitMQ就会发送一个Ack给生产者(包含消息的惟一ID),这就使得生产者知道消息已经正确到达目的队列了.若是rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你能够进行重试操做。
2.消息队列丢数据
处理消息队列丢数据的状况,通常是开启持久化磁盘的配置。这个持久化配置能够和confirm机制配合使用,你能够在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,若是消息持久化磁盘以前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步
①、将queue的持久化标识durable设置为true,则表明是一个持久的队列
②、发送消息的时候将deliveryMode=2
这样设置之后,rabbitMQ就算挂了,重启后也能恢复数据。在消息尚未持久化到硬盘时,可能服务已经死掉,这种状况能够经过引入mirrored-queue即镜像队列,但也不能保证消息百分百不丢失(整个集群都挂掉)
3.消费者丢数据
启用手动确认模式能够解决这个问题
①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直处处理成功。不会丢失消息,即使服务挂掉,没有处理完成的消息会重回队列,可是异常会让消息不断重试。
②手动确认模式,若是消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其余消费者;若是监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,而后一直抛异常;若是对异常进行了捕获,可是没有在finally里ack,也会一直重复发送消息(重试机制)。
③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会当即在队列移除,不管客户端异常仍是断开,只要发送完就移除,不会重发。
十4、死信队列和延迟队列的使用
死信消息:
消息被拒绝(Basic.Reject或Basic.Nack)而且设置 requeue 参数的值为 false
消息过时了
队列达到最大的长度
过时消息:
在 rabbitmq 中存在2种方可设置消息的过时时间,第一种经过对队列进行设置,这种设置后,该队列中全部的消息都存在相同的过时时间,第二种经过对消息自己进行设置,那么每条消息的过时时间都不同。若是同时使用这2种方法,那么以过时时间小的那个数值为准。当消息达到过时时间尚未被消费,那么那个消息就成为了一个 死信 消息。
队列设置:在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒
单个消息设置:是设置消息属性的 expiration 参数的值,单位为 毫秒
延时队列:在rabbitmq中不存在延时队列,可是咱们能够经过设置消息的过时时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。
有了以上的基础知识,咱们完成如下需求:
需求:用户在系统中建立一个订单,若是超过期间用户没有进行支付,那么自动取消订单。
分析:
一、上面这个状况,咱们就适合使用延时队列来实现,那么延时队列如何建立
二、延时队列能够由 过时消息+死信队列 来时间
三、过时消息经过队列中设置 x-message-ttl 参数实现
四、死信队列经过在队列申明时,给队列设置 x-dead-letter-exchange 参数,而后另外申明一个队列绑定x-dead-letter-exchange对应的交换器。
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个接收被删除的消息的交换机和队列 String EXCHANGE_DEAD_NAME = "exchange.dead"; String QUEUE_DEAD_NAME = "queue_dead"; channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null); channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead"); String EXCHANGE_NAME = "exchange.fanout"; String QUEUE_NAME = "queue_name"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); Map<String, Object> arguments = new HashMap<String, Object>(); // 统一设置队列中的全部消息的过时时间 arguments.put("x-message-ttl", 30000); // 设置超过多少毫秒没有消费者来访问队列,就删除队列的时间 arguments.put("x-expires", 20000); // 设置队列的最新的N条消息,若是超过N条,前面的消息将从队列中移除掉 arguments.put("x-max-length", 4); // 设置队列的内容的最大空间,超过该阈值就删除以前的消息 arguments.put("x-max-length-bytes", 1024); // 将删除的消息推送到指定的交换机,通常x-dead-letter-exchange和x-dead-letter-routing-key须要同时设置 arguments.put("x-dead-letter-exchange", "exchange.dead"); // 将删除的消息推送到指定的交换机对应的路由键 arguments.put("x-dead-letter-routing-key", "routingkey.dead"); // 设置消息的优先级,优先级大的优先被消费 arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); String message = "Hello RabbitMQ: "; for(int i = 1; i <= 5; i++) { // expiration: 设置单条消息的过时时间 AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder() .priority(i).expiration( i * 1000 + ""); channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8")); } channel.close(); connection.close();
十5、使用了消息队列会有什么缺点?
1.系统可用性下降:你想啊,原本其余系统只要运行好好的,那你的系统就是正常的。如今你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。所以,系统可用性下降
2.系统复杂性增长:要多考虑不少方面的问题,好比一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。所以,须要考虑的东西更多,系统复杂性增大。
————————————————
版权声明:本文为CSDN博主「jeffrey_ding」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处连接及本声明。
原文连接:https://blog.csdn.net/jerryDzan/java/article/details/89183625