消息队列的消费幂等性如何保证

什么是幂等?

任意屡次执行所产生的影响均与一次执行的影响相同就能够称为幂等java

什么是消息幂等?

当出现消费者对某条消息重复消费的状况时,重复消费的结果与消费一次的结果是相同的,而且屡次消费并未对业务系统产生任何负面影响git

为何咱们要保证幂等性,不保证幂等性,会不会有问题?

这个问题其实无法准确回答。回答这个问题的根源得从业务场景上进行分析。好比正常业务状况下,咱们是不容许同个订单重复支付,这种业务场景咱们就须要确保幂等性。再好比日志记录,这种业务场景,咱们可能就不须要作幂等判断。github

所以是否要保证幂等性,得基于业务进行考量redis

消息队列的消费幂等性如何保证?

无法保证。前面说了要保证幂等性,得基于业务场景进行考量。消息队列他自己就不是给你用来作业务幂等性用的。若是你要实现业务幂等性,靠消息队列是无法帮你完成的,你本身得根据自身业务场景,来实现幂等。spring

经常使用的业务幂等性保证方法

一、利用数据库的惟一约束实现幂等

好比将订单表中的订单编号设置为惟一索引,建立订单时,根据订单编号就能够保证幂等数据库

二、去重表

这个方案本质也是根据数据库的惟一性约束来实现。其实现大致思路是:首先在去重表上建惟一索引,其次操做时把业务表和去重表放在同个本地事务中,若是出现重现重复消费,数据库会抛惟一约束异常,操做就会回滚apache

三、利用redis的原子性

每次操做都直接set到redis里面,而后将redis数据定时同步到数据库中bootstrap

四、多版本(乐观锁)控制

此方案多用于更新的场景下。其实现的大致思路是:给业务数据增长一个版本号属性,每次更新数据前,比较当前数据的版本号是否和消息中的版本一致,若是不一致则拒绝更新数据,更新数据的同时将版本号+1springboot

五、状态机机制

此方案多用于更新且业务场景存在多种状态流转的场景服务器

六、token机制

生产者发送每条数据的时候,增长一个全局惟一的id,这个id一般是业务的惟一标识,好比订单编号。在消费端消费时,则验证该id是否被消费过,若是还没消费过,则进行业务处理。处理结束后,在把该id存入redis,同时设置状态为已消费。若是已经消费过了,则不进行处理。

演示

例子使用springboot2加kafka来演示一下使用token机制如何实现消费端幂等

一、application.yml
spring:
  redis:
    host: localhost
    port: 6379
    # 链接超时时间(毫秒)
    timeout: 10000
    jedis:
      pool:
        # 链接池中的最大空闲链接
        max-idle: 8
        # 链接池中的最小空闲链接
        min-idle: 10
        # 链接池最大链接数(使用负值表示没有限制)
        max-active: 100
        # 链接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: -1
    password:
  kafka:
    # 以逗号分隔的地址列表,用于创建与Kafka集群的初始链接(kafka 默认的端口号为9092)
    bootstrap-servers: localhost:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息须要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: com.github.lybgeek.kafka.serialization.ObjectSerializer
      # acks=0 : 生产者在成功写入消息以前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当全部参与复制的节点所有收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 须要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的状况下该做何处理:
      # latest(默认值)在偏移量无效的状况下,消费者将从最新的记录开始读取数据(在消费者启动以后生成的记录)
      # earliest :在偏移量无效的状况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了不出现重复数据和数据丢失,能够把它设置为false,而后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: com.github.lybgeek.kafka.serialization.ObjectDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 1
      #listner负责ack,每调用一次,就当即commit
      ack-mode: manual_immediate
二、实现kafka的自定义序列和反序列

:kakfa默认的序列化和反序列方式是StringSerializer和StringDeserializer。咱们要改形成支持对象的序列化和反序列化

a、序列化

public class ObjectSerializer implements Serializer<Object> {


    @Override
    public byte[] serialize(String topic, Object object) {
        return BeanUtils.serialize(object);
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

b、反序列化

public class ObjectDeserializer implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] bytes) {
        return BeanUtils.deserialize(bytes);
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }
}
三、消息对象
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDTO<T> implements Serializable {

    private String messageId;


    private T data;
}
四、生产者

:本例子简单模拟生产者屡次生产同个消息,进而达到屡次消费的效果

@Slf4j
@Component
public class KafkaProducer implements CommandLineRunner {


    @Autowired
    private KafkaTemplate kafkaTemplate;

    private int threadNum = 2;

    private ExecutorService executorService = Executors.newFixedThreadPool(threadNum);

    private CountDownLatch countDownLatch = new CountDownLatch(threadNum);


    @Override
    public void run(String... args) throws Exception {
          send();
    }


    private void send(){
        for(int i = 0; i < threadNum; i++){
            executorService.submit(()->{
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                   log.error(e.getMessage(),e);
                }
                String messageId = "b14701b8-4b08-4bbd-8a4e-70f76a432e99";

                MessageDTO messageDTO = MessageDTO.builder().messageId(messageId).data("hello").build();
                kafkaTemplate.send(Constant.TOPIC,messageDTO);
            });

            countDownLatch.countDown();
        }

    }
}
五、消费者
@Component
@Slf4j
public class KafkaConsumer {

    @Autowired
    private RedisUtils redisUtils;

    @KafkaListener(id = "msgId",topics = {Constant.TOPIC})
    public void receive(ConsumerRecord<String, MessageDTO<String>> record,Acknowledgment ack){

        boolean isRepeateConsume = checkRepeateConsume(record.value().getMessageId());
        if(isRepeateConsume){
            log.error("重复消费。。。。");
            //手工确认
            ack.acknowledge();
            return;
        }


       doBiz(record,ack);
    }

    private boolean checkRepeateConsume(String messageId){
        Object consumeStatus = redisUtils.get(messageId);
        /**
         * 一、若是redis存在消息ID,且消费状态为已消费,则说明是重复消费,此时消费端丢弃该消息
         */
        if(Objects.nonNull(consumeStatus) && "已消费".equals(consumeStatus.toString())){
           // log.error("重复消费。。。。");
            return true;
        }

        /**
         * 二、若是redis不存在消息id,或者状态不是已消费,则从业务方面进行判重
         *
         *  业务判重的能够考虑以下方法:
         *  若是该业务是存在状态流转,则采用状态机策略进行判重。
         *  若是该业务不是状态流转类型,则在新增时,根据业务设置一个惟一的属性,好比根据订单编号的惟一性;
         *  更新时,能够采用多版本策略,在须要更新的业务表上加上版本号
         */
        return checkRepeateByBiz(messageId);
    }



    /**
     * 模拟业务消费
     * @param messageDTO
     * @param ack
     */
    private void doBiz(ConsumerRecord<String, MessageDTO<String>> record,Acknowledgment ack){
        System.out.println("------模拟业务处理-----------");
        System.out.println("--------执行业务处理:"+record.value()+"------------");
        System.out.println("--------------一、业务处理完毕-----------");
        try {
            redisUtils.setEx(record.value().getMessageId(), "已消费",12, TimeUnit.HOURS);
            System.out.println("-------------二、业务处理完毕后,把全局ID存入redis,并设置值为已消费");
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("----------三、业务处理完毕后,消费端手工确认");
        //手工确认
        ack.acknowledge();

    }

}
六、效果
2020-08-09 16:25:32.701  INFO 9552 --- [    msgId-0-C-1] io.lettuce.core.KqueueProvider           : Starting without optional kqueue library
------模拟业务处理-----------
--------执行业务处理:MessageDTO(messageId=b14701b8-4b08-4bbd-8a4e-70f76a432e99, data=hello)------------
--------------一、业务处理完毕-----------
-------------二、业务处理完毕后,把全局ID存入redis,并设置值为已消费
----------三、业务处理完毕后,消费端手工确认
2020-08-09 16:25:36.021 ERROR 9552 --- [    msgId-0-C-1] c.g.l.kafka.consumer.KafkaConsumer       : 重复消费。。。。

总结

消息队列无法帮你作到消费端的幂等性,消费端的幂等性得基于业务场景进行实现。不过消息队列必须得保证消息不能丢,至少保证被消费一次,否则消息都丢了,没数据搞啥业务幂等。在实现消费端处理业务时,要确保消费端是采用手工确认应答机制,而不是自动应答机制。这样可以确保消费端一旦业务处理失败,生产者还能再次发送同个消息给消费端

demo连接

https://github.com/lyb-geek/s...
相关文章
相关标签/搜索