目录java
什么是生产端的可靠性投递?redis
若是想保障消息百分百投递成功,只作到前三步不必定可以保障。有些时候或者说有些极端状况,好比生产端在投递消息时可能就失败了,或者说生产端投递了消息,MQ也收到了,MQ在返回确认应答时,因为网络闪断致使生产端没有收到应答,此时这条消息就不知道投递成功了仍是失败了,因此针对这些状况咱们须要作一些补偿机制。算法
(confirm)
给生产端生产端有一个Confirm Listener
,去异步的监听Broker
回送的响应,从而判断消息是否投递成功,若是成功,去数据库查询该消息,并将消息状态更新为1,表示消息投递成功
假设第二步OK了,在第三步回送响应时,网络忽然出现了闪断,致使生产端的Listener就永远收不到这条消息的confirm应答了,也就是说这条消息的状态就一直为0了数据库
(Retry Send)
,也就是从第二步开始继续往下走固然有些消息可能就是因为一些实际的问题没法路由到Broker,好比routingKey设置不对,对应的队列被误删除了,那么这种消息即便重试屡次也仍然没法投递成功,因此须要对重试次数作限制,好比限制3次,若是投递次数大于三次,那么就将消息状态更新为2,表示这个消息最终投递失败。网络
针对这种状况如何去作补偿呢,能够有一个补偿系统去查询这些最终失败的消息,而后给出失败的缘由,固然这些可能都须要人工去操做。并发
第一种可靠性投递,在高并发的场景下是否适合?异步
对于第一种方案,咱们须要作两次数据库的持久化操做,在高并发场景下显然数据库存在着性能瓶颈。其实在咱们的核心链路中只须要对业务进行入库就能够了,消息就不必先入库了,咱们能够作消息的延迟投递,作二次确认,回调检查。分布式
固然这种方案不必定能保障百分百投递成功,可是基本上能够保障大概99.9%的消息是OK的,有些特别极端的状况只能是人工去作补偿了,或者使用定时任务去作均可以。ide
Upstream Service
上游服务也就是生产端,Downstream service
下游服务也就是消费端,Callback service
就是回调服务。高并发
(Second Send Delay Check)
,即延迟消息投递检查,这里须要设置一个延迟时间,好比5分钟以后进行投递。confirm
消息,也就是回送响应,可是这里响应不是正常的ACK,而是从新生成一条消息,投递到MQ中。Callback service
是一个单独的服务,其实它扮演了第一种方案的存储消息的DB角色,它经过MQ去监听下游服务发送的confirm
消息,若是Callback service
收到confirm
消息,那么就对消息作持久化存储,即将消息持久化到DB中。Callback service
仍是去监听延迟消息所对应的队列,收到Check消息后去检查DB中是否存在消息,若是存在,则不须要作任何处理,若是不存在或者消费失败了,那么Callback service
就须要主动发起RPC通讯给上游服务,告诉它延迟检查的这条消息我没有找到,你须要从新发送,生产端收到信息后就会从新查询业务消息而后将消息发送出去。这么作的目的是少作了一次DB的存储,在高并发场景下,最关心的不是消息100%投递成功,而是必定要保证性能,保证能抗得住这么大的并发量。因此能节省数据库的操做就尽可能节省,能够异步的进行补偿。
其实在主流程里面是没有这个Callback service的,它属于一个补偿的服务,整个核心链路就是生产端入库业务消息,发送消息到MQ,消费端监听队列,消费消息。其余的步骤都是一个补偿机制。
第二种方案也是互联网大厂更为经典和主流的解决方案。可是若对性能要求不是那么高,第一种方案要更简单
简单来讲就是用户对于同一操做发起的一次请求或者屡次请求的结果是一致的。
咱们能够借鉴数据库的乐观锁机制来举个例子:
首先为表添加一个版本字段version
在执行更新操做前呢,会先去数据库查询这个version
而后执行更新语句,以version做为条件,例如:
UPDATE T_REPS SET COUNT = COUNT -1,VERSION = VERSION + 1 WHERE VERSION = 1
若是执行更新时有其余人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操做了,经过这种乐观锁的机制来保障幂等性。
重复消费问题:
当消费者消费完消息时,在给生产端返回ack时因为网络中断,致使生产端未收到确认信息,该条消息会从新发送并被消费者消费,但实际上该消费者已成功消费了该条消息,这就是重复消费问题。
惟一ID:业务表惟一的主键,如商品ID
指纹码:为了区别每次正常操做的码,每次操做时生成指纹码;能够用时间戳+业务编号或者标志位(具体视业务场景而定)
整个思路就是首先咱们须要根据消息生成一个全局惟一的ID,而后还须要加上一个指纹码。这个指纹码它并不必定是系统去生成的,而是一些外部的规则或者内部的业务规则去拼接,它的目的就是为了保障此次操做是绝对惟一的。
将ID + 指纹码拼接好的值做为数据库主键,就能够进行去重了。即在消费消息前呢,先去数据库查询这条消息的指纹码标识是否存在,没有就执行insert操做,若是有就表明已经被消费了,就不须要管了。
这里只提用Redis的原子性去解决MQ幂等性重复消费的问题
注意:MQ的幂等性问题 根本在于的是生产端未正常接收ACK,多是网络抖动、网络中断致使
个人方案:
MQ消费端在消费开始时 将 ID放入到Redis的BitMap中,MQ生产端每次生产数据时,从Redis的BitMap对应位置若不能取出ID,则生产消息发送,不然不进行消息发送。
可是有人可能会说,万一消费端,生产端Redis命令执行失败了怎么办,虽然又出现重复消费又出现Redis非正常执行命令的可能性极低,可是万一呢?
OK,咱们能够在Redis命令执行失败时,将消息落库,每日用定时器,对这种极特殊的消息进行处理。
的核心保障
确认机制流程图
生产端发送消息到Broker,而后Broker接收到了消息后,进行回送响应,生产端有一个Confirm Listener
,去监听应答,固然这个操做是异步进行的,生产端将消息发送出去就能够不用管了,让内部监听器去监听Broker给咱们的响应。
channel.confirmSelect()
addConfirmListener
,监听成功和失败的返回结果,根据具体的结果对消息进行从新发送、或记录日志等后续处理!public class Producer { public static void main(String[] args) throws Exception { //建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //获取Connection Connection connection = connectionFactory.newConnection(); //经过connection建立一个新的Channel Channel channel = connection.createChannel(); //指定咱们的消息投递模式 channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String routingkey = "confirm.save"; //发送一条信息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingkey, null, msg.getBytes()); //添加一个确认监听 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("-------no ack!---------"); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("--------ack!----------"); } }); } }
public class Consumer { public static void main(String[] args) throws Exception{ //建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //获取Connection Connection connection = connectionFactory.newConnection(); //经过connection建立一个新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String routingkey = "confirm.#"; String queueName = "test_confirm_queue"; //声明交换机和队列 而后进行绑定和 设置 最后制定路由key channel.exchangeDeclare(exchangeName, "topic",true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingkey); //建立消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true,queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("消费端:" + msg); } } }
运行说明
先启动消费端,访问管控台:http://ip:15672,检查Exchange和Queue是否设置OK,而后启动生产端,消息被消费端消费,生产端也成功监听到了ACK响应。
Return Listener
用于处理一些不可路由的消息!Return Listener
!
addReturnListener
,生产端去监听这些不可达的消息,作一些后续处理,好比说,记录下消息日志,或者及时去跟踪记录,有可能从新设置一下就行了Mandatory
:若是为true,则监听器会接收到路由不可达的消息,而后进行后续处理,若是为false,那么broker端自动删除该消息!
public class ReturnProducer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 获取Connection Connection connection = connectionFactory.newConnection(); //3 经过Connection建立一个新的Channel Channel channel = connection.createChannel(); String exchange = "test_return_exchange"; //String routingKey = "return.save"; String routingKeyError = "abc.save"; String msg = "Hello RabbitMQ Return Message"; //添加return监听 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, BasicProperties properties, byte[] body) throws IOException { //replyCode:响应码 replyText:响应信息 System.err.println("---------handle return----------"); System.err.println("replyCode: " + replyCode); System.err.println("replyText: " + replyText); System.err.println("exchange: " + exchange); System.err.println("routingKey: " + routingKey); //System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }); //5 发送一条消息,第三个参数mandatory:必须设置为true channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); } }
public class ReturnConsumer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 获取Connection Connection connection = connectionFactory.newConnection(); //3 经过Connection建立一个新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "return.#"; String queueName = "test_return_queue"; //4 声明交换机和队列,而后进行绑定设置路由Key channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //5 建立消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费者: " + msg); } } }
运行说明
先启动消费端,访问管控台:http://ip:15672,检查Exchange和Queue是否设置OK,而后启动生产端。
因为生产端设置的是一个错误的路由key,因此消费端没有任何打印,而生产端打印了以下内容
若是咱们将 Mandatory
属性设置为false,对于不可达的消息会被Broker直接删除,那么生产端就不会进行任何打印了。若是咱们的路由key设置为正确的,那么消费端可以正确消费,生产端也不会进行任何打印。