RabbitMQ 从入门到精通(二)

1. 消息如何保障百分之百的投递成功?

什么是生产端的可靠性投递?redis

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)确认应答
  • 完善的进行消息补偿机制

若是想保障消息百分百投递成功,只作到前三步不必定可以保障。有些时候或者说有些极端状况,好比生产端在投递消息时可能就失败了,或者说生产端投递了消息,MQ也收到了,MQ在返回确认应答时,因为网络闪断致使生产端没有收到应答,此时这条消息就不知道投递成功了仍是失败了,因此针对这些状况咱们须要作一些补偿机制。算法

 

1.1 方案一:消息落库,对消息状态进行打标

 

  1. 进行数据的入库,好比咱们要发送一条订单消息,首先得把业务数据也就是订单信息存库,而后生成一条消息,把消息也进行入库,这条消息应该包含消息状态属性 Create_Date(建立时间),并设置初始标志 好比0,表示消息建立成功,正在发送中
  2. 首先要保证第一步消息都存储成功了,没有出现任何异常状况,而后生产端再进行消息发送。若是失败了就进行快速失败机制
  3. MQ把消息收到的结果应答(confirm)给生产端
  4. 生产端有一个Confirm Listener,去异步的监听Broker回送的响应,从而判断消息是否投递成功,若是成功,去数据库查询该消息,并将消息状态更新为1,表示消息投递成功
     
    假设第二步OK了,在第三步回送响应时,网络忽然出现了闪断,致使生产端的Listener就永远收不到这条消息的confirm应答了,也就是说这条消息的状态就一直为0了数据库

  5. 此时咱们须要设置一个规则,好比说消息在入库时候设置一个临界值timeout,5分钟以后若是仍是0的状态那就须要把消息抽取出来。这里咱们使用的是分布式定时任务,去定时抓取DB中距离消息建立时间超过5分钟的且状态为0的消息。
  6. 把抓取出来的消息进行从新投递(Retry Send),也就是从第二步开始继续往下走
  7. 固然有些消息可能就是因为一些实际的问题没法路由到Broker,好比routingKey设置不对,对应的队列被误删除了,那么这种消息即便重试屡次也仍然没法投递成功,因此须要对重试次数作限制,好比限制3次,若是投递次数大于三次,那么就将消息状态更新为2,表示这个消息最终投递失败。网络

 

针对这种状况如何去作补偿呢,能够有一个补偿系统去查询这些最终失败的消息,而后给出失败的缘由,固然这些可能都须要人工去操做。并发

第一种可靠性投递,在高并发的场景下是否适合?异步

对于第一种方案,咱们须要作两次数据库的持久化操做,在高并发场景下显然数据库存在着性能瓶颈。其实在咱们的核心链路中只须要对业务进行入库就能够了,消息就不必先入库了,咱们能够作消息的延迟投递,作二次确认,回调检查。分布式

固然这种方案不必定能保障百分百投递成功,可是基本上能够保障大概99.9%的消息是OK的,有些特别极端的状况只能是人工去作补偿了,或者使用定时任务去作均可以。ide

 

1.2 方案二:消息的延迟投递,作二次确认,回调检查

 

Upstream Service上游服务也就是生产端,Downstream service下游服务也就是消费端,Callback service就是回调服务。高并发

 

  1. 先将业务消息进行入库,而后生产端将消息发送出去
  2. 在发送消息以后,紧接着生产端再次发送一条消息(Second Send Delay Check),即延迟消息投递检查,这里须要设置一个延迟时间,好比5分钟以后进行投递。
  3. 消费端去监听指定队列,将收到的消息进行处理。
  4. 处理完成以后,发送一个confirm消息,也就是回送响应,可是这里响应不是正常的ACK,而是从新生成一条消息,投递到MQ中。
  5. 上面的Callback service是一个单独的服务,其实它扮演了第一种方案的存储消息的DB角色,它经过MQ去监听下游服务发送的confirm消息,若是Callback service收到confirm消息,那么就对消息作持久化存储,即将消息持久化到DB中。
  6. 5分钟以后延迟消息发送到MQ了,而后Callback service仍是去监听延迟消息所对应的队列,收到Check消息后去检查DB中是否存在消息,若是存在,则不须要作任何处理,若是不存在或者消费失败了,那么Callback service就须要主动发起RPC通讯给上游服务,告诉它延迟检查的这条消息我没有找到,你须要从新发送,生产端收到信息后就会从新查询业务消息而后将消息发送出去。

这么作的目的是少作了一次DB的存储,在高并发场景下,最关心的不是消息100%投递成功,而是必定要保证性能,保证能抗得住这么大的并发量。因此能节省数据库的操做就尽可能节省,能够异步的进行补偿。

 

其实在主流程里面是没有这个Callback service的,它属于一个补偿的服务,整个核心链路就是生产端入库业务消息,发送消息到MQ,消费端监听队列,消费消息。其余的步骤都是一个补偿机制。

第二种方案也是互联网大厂更为经典和主流的解决方案。可是若对性能要求不是那么高,第一种方案要更简单

 

2. 幂等性

2.1 幂等性是什么?

简单来讲就是用户对于同一操做发起的一次请求或者屡次请求的结果是一致的。

咱们能够借鉴数据库的乐观锁机制来举个例子:

  • 首先为表添加一个版本字段version

  • 在执行更新操做前呢,会先去数据库查询这个version

  • 而后执行更新语句,以version做为条件,例如:

    UPDATE T_REPS SET COUNT = COUNT -1,VERSION = VERSION + 1 WHERE VERSION = 1

  • 若是执行更新时有其余人先更新了这张表的数据,那么这个条件就不生效了,也就不会执行操做了,经过这种乐观锁的机制来保障幂等性。

 

2.2 消息端幂等性保障

重复消费问题:

当消费者消费完消息时,在给生产端返回ack时因为网络中断,致使生产端未收到确认信息,该条消息会从新发送并被消费者消费,但实际上该消费者已成功消费了该条消息,这就是重复消费问题。

 

2.2.1 惟一ID+指纹码机制

惟一ID:业务表惟一的主键,如商品ID

指纹码:为了区别每次正常操做的码,每次操做时生成指纹码;能够用时间戳+业务编号或者标志位(具体视业务场景而定)

 

  • 惟一ID+指纹码机制,利用数据库主键去重
  • SELECT COUNT(1) FROM T_ORDER WHERE ID = 惟一ID and IS_CONSUM= 指纹码
  • 好处:实现简单
  • 坏处:高并发下有数据库写入的性能瓶颈
  • 解决方案:根据ID进行分库分表算法路由

整个思路就是首先咱们须要根据消息生成一个全局惟一的ID,而后还须要加上一个指纹码。这个指纹码它并不必定是系统去生成的,而是一些外部的规则或者内部的业务规则去拼接,它的目的就是为了保障此次操做是绝对惟一的。

将ID + 指纹码拼接好的值做为数据库主键,就能够进行去重了。即在消费消息前呢,先去数据库查询这条消息的指纹码标识是否存在,没有就执行insert操做,若是有就表明已经被消费了,就不须要管了。

 

2.2.2 利用Redis的原子性去实现

这里只提用Redis的原子性去解决MQ幂等性重复消费的问题

注意:MQ的幂等性问题 根本在于的是生产端未正常接收ACK,多是网络抖动、网络中断致使

 

个人方案:

MQ消费端在消费开始时 将 ID放入到Redis的BitMap中,MQ生产端每次生产数据时,从Redis的BitMap对应位置若不能取出ID,则生产消息发送,不然不进行消息发送。

可是有人可能会说,万一消费端,生产端Redis命令执行失败了怎么办,虽然又出现重复消费又出现Redis非正常执行命令的可能性极低,可是万一呢?

OK,咱们能够在Redis命令执行失败时,将消息落库,每日用定时器,对这种极特殊的消息进行处理。

 

3. Confirm机制

3.1 如何理解?

  • 消息的确认,是指生产者投递消息后,若是Broker收到消息,则会给咱们生产者一个应答
  • 生产者进行接收应答,用来肯定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递

的核心保障

 

确认机制流程图

生产端发送消息到Broker,而后Broker接收到了消息后,进行回送响应,生产端有一个Confirm Listener,去监听应答,固然这个操做是异步进行的,生产端将消息发送出去就能够不用管了,让内部监听器去监听Broker给咱们的响应。

 

3.2 怎么实现?

  • 第一步,在channel上开启确认模式:channel.confirmSelect()
  • 第二步,在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行从新发送、或记录日志等后续处理!
public class Producer {
    public static void main(String[] args) throws Exception {
        
        //建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        
        //获取Connection
        Connection connection = connectionFactory.newConnection();
        
        //经过connection建立一个新的Channel
        Channel channel = connection.createChannel();
        
        //指定咱们的消息投递模式
        channel.confirmSelect();
        
        String exchangeName = "test_confirm_exchange";
        String routingkey = "confirm.save";
        
        //发送一条信息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingkey, null, msg.getBytes());
        
        //添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            
            @Override
            public void handleNack(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("-------no ack!---------");
            }
            
            @Override
            public void handleAck(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("--------ack!----------");
            }
        });
    }
}

 

public class Consumer {
    public static void main(String[] args) throws Exception{
        //建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        
        //获取Connection
        Connection connection = connectionFactory.newConnection();
        
        //经过connection建立一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_confirm_exchange";
        String routingkey = "confirm.#";
        String queueName = "test_confirm_queue"; 
        
        //声明交换机和队列 而后进行绑定和 设置 最后制定路由key
        channel.exchangeDeclare(exchangeName, "topic",true);
        channel.queueDeclare(queueName, true, false, false, null);
        
        channel.queueBind(queueName, exchangeName, routingkey);
        
        //建立消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true,queueingConsumer); 
        
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消费端:" + msg);
        }
    }
}

 

运行说明

先启动消费端,访问管控台:http://ip:15672,检查Exchange和Queue是否设置OK,而后启动生产端,消息被消费端消费,生产端也成功监听到了ACK响应。

 

4. Return机制

4.1 如何理解?

  • Return Listener 用于处理一些不可路由的消息!
  • 咱们的消息生产者,经过指定一个Exchange 和Routingkey,把消息送达到某一个队列中去, 而后咱们的消费者监听队列,进行消费处理操做!
  • 可是在某些状况下,若是咱们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候若是咱们须要监听这种不可达的消息,就要使用Return Listener!

 

4.2 如何实现?

  1. 添加return监听:addReturnListener,生产端去监听这些不可达的消息,作一些后续处理,好比说,记录下消息日志,或者及时去跟踪记录,有可能从新设置一下就行了
  2. 发送消息时,设置Mandatory:若是为true,则监听器会接收到路由不可达的消息,而后进行后续处理,若是为false,那么broker端自动删除该消息!

 

public class ReturnProducer {
     public static void main(String[] args) throws Exception {
            //1 建立ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.244.11");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setHandshakeTimeout(20000);
            //2 获取Connection
            Connection connection = connectionFactory.newConnection();
            //3 经过Connection建立一个新的Channel
            Channel channel = connection.createChannel();
            
            String exchange = "test_return_exchange";
            //String routingKey = "return.save";
            String routingKeyError = "abc.save";
            
            String msg = "Hello RabbitMQ Return Message";
            //添加return监听
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange,
                        String routingKey, BasicProperties properties, byte[] body) throws IOException {
                    //replyCode:响应码    replyText:响应信息
                    System.err.println("---------handle  return----------");
                    System.err.println("replyCode: " + replyCode);
                    System.err.println("replyText: " + replyText);
                    System.err.println("exchange: " + exchange);
                    System.err.println("routingKey: " + routingKey);
                    //System.err.println("properties: " + properties);
                    System.err.println("body: " + new String(body));
                }

                
            });
            //5 发送一条消息,第三个参数mandatory:必须设置为true
            channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
        }
}

 

public class ReturnConsumer {
    
    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 获取Connection
        Connection connection = connectionFactory.newConnection();
        //3 经过Connection建立一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_return_exchange";
        String routingKey = "return.#";
        String queueName = "test_return_queue";
        //4 声明交换机和队列,而后进行绑定设置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //5 建立消费者 
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费者: " + msg);
        }
    }
}

 

运行说明

先启动消费端,访问管控台:http://ip:15672,检查Exchange和Queue是否设置OK,而后启动生产端。
因为生产端设置的是一个错误的路由key,因此消费端没有任何打印,而生产端打印了以下内容

若是咱们将 Mandatory 属性设置为false,对于不可达的消息会被Broker直接删除,那么生产端就不会进行任何打印了。若是咱们的路由key设置为正确的,那么消费端可以正确消费,生产端也不会进行任何打印。

相关文章
相关标签/搜索