kafka 幂等生产者及事务(kafka0.11以后版本新特性)

1. 幂等性设计
1.1 引入目的
生产者重复生产消息。生产者进行retry会产生重试时,会重复产生消息。有了幂等性以后,在进行retry重试时,只会生成一个消息。java

1.2 幂等性实现
1.2.1 PID 和 Sequence Number
为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。mysql

PID。每一个新的Producer在初始化的时候会被分配一个惟一的PID,这个PID对用户是不可见的。
Sequence Numbler。(对于每一个PID,该Producer发送数据的每一个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number
Broker端在缓存中保存了这seq number,对于接收的每条消息,若是其序号比Broker缓存中序号大于1则接受它,不然将其丢弃。spring

这样就能够实现了消息重复提交了。可是,只能保证单个Producer对于同一个<Topic, Partition>的Exactly Once语义。不能保证同一个Producer一个topic不一样的partion幂等。sql

标准实现数据库

 

 

发生重试时apache

 

 

实现幂等以后bootstrap

 

 

 发生重试时api

 

 

1.2.2  生成PID的流程缓存

//在执行建立事务时
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//会建立一个Sender,并启动线程,执行以下run方法
Sender{
    void run(long now) {
        if (transactionManager != null) {
            try {
                 ........
                if (!transactionManager.isTransactional()) {
                    // 为idempotent producer生成一个producer id
                    maybeWaitForProducerId();
                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {

1.3.演示实例
enable.idempotence,须要设置为ture,此时就会默认把acks设置为all,因此不须要再设置acks属性了。session

private Producer buildIdempotProducer(){
         // create instance for properties to access producer configs
        Properties props = new Properties(); 
        // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
        props.put("bootstrap.servers", "localhost:9092"); 
        props.put("enable.idempotence",true); 
        //If the request fails, the producer can automatically retry,
        props.put("retries", 3); 
        //Reduce the no of requests less than 0
        props.put("linger.ms", 1); 
        //The buffer.memory controls the total amount of memory available to the producer for buffering.
        props.put("buffer.memory", 33554432); 
        // Kafka消息是以键值对的形式发送,须要设置key和value类型序列化器
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); 
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); 
        Producer<String, String> producer = new KafkaProducer<String, String>(props);        
        return producer;
}
//发送消息    
public void produceIdempotMessage(String topic, String message) {
        // 建立Producer
        Producer producer = buildIdempotProducer();
        // 发送消息
        producer.send(new ProducerRecord<String, String>(topic, message));
        producer.flush();
}

此时,由于咱们并无配置transaction.id属性,因此不能使用事务相关API,如:producer.initTransactions();

不然会出现以下错误:

Exception in thread “main” java.lang.IllegalStateException: Transactional method invoked on a non-transactional producer.

    at org.apache.kafka.clients.producer.internals.TransactionManager.ensureTransactional(TransactionManager.java:777)

    at org.apache.kafka.clients.producer.internals.TransactionManager.initializeTransactions(TransactionManager.java:202)

    at org.apache.kafka.clients.producer.KafkaProducer.initTransactions(KafkaProducer.java:544)

2.事务
2.1 事务属性
事务属性是2017年Kafka 0.11.0.0引入的新特性。相似于数据库事务,只是这里的数据源是Kafka,kafka事务属性是指一系列的生产者生产消息和消费者提交偏移量的操做在一个事务,或者说是是一个原子操做),同时成功或者失败。

注意:在理解消息的事务时,一直处于一个错误理解就是以下代码中,把操做db的业务逻辑跟操做消息当成是一个事务。

其实这个是有问题的,操做DB数据库的数据源是DB,消息数据源是kfaka,这是彻底不一样两个数据,一种数据源(如mysql,kafka)对应一个事务。

因此它们是两个独立的事务:kafka事务指kafka一系列 生产、消费消息等操做组成一个原子操做;db事务是指操做数据库的一系列增删改操做组成一个原子操做。

void  kakfa_in_tranction(){
  // 1.kafa的操做:读取消息或者生产消息
 kafkaOperation(); 
   // 2.db操做
  dbOperation()
 
}

2.2 引入目的
在事务属性以前先引入了生产者幂等性,它的做用为:

生产者屡次发送消息能够封装成一个原子操做,要么都成功,要么失败
consumer-transform-producer模式下,由于消费者提交偏移量出现问题,致使在重复消费消息时,生产者重复生产消息。

须要将这个模式下消费者提交偏移量操做和生成者一系列生成消息的操做封装成一个原子操做。
消费者提交偏移量致使重复消费消息的场景:消费者在消费消息完成提交偏移量o2以前挂掉了(假设它最近提交的偏移量是o1),此时执行再均衡时,其它消费者会重复消费消息(o1到o2之间的消息)。

2.3 操做的API

  //producer提供的事务方法
   /**
     * 初始化事务。须要注意的有:
     * 一、前提
     * 须要保证transation.id属性被配置。
     * 二、这个方法执行逻辑是:
     *   (1)Ensures any transactions initiated by previous instances of the producer with the same
     *      transactional.id are completed. If the previous instance had failed with a transaction in
     *      progress, it will be aborted. If the last transaction had begun completion,
     *      but not yet finished, this method awaits its completion.
     *    (2)Gets the internal producer id and epoch, used in all future transactional
     *      messages issued by the producer.
     *
     */
    public void initTransactions();
 
    /**
     * 开启事务
     */
    public void beginTransaction() throws ProducerFencedException ;
 
    /**
     * 为消费者提供的在事务内提交偏移量的操做
     */
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                                         String consumerGroupId) throws ProducerFencedException ;
 
    /**
     * 提交事务
     */
    public void commitTransaction() throws ProducerFencedException;
 
    /**
     * 放弃事务,相似回滚事务的操做
     */
    public void abortTransaction() throws ProducerFencedException ;

2.4 演示实例
在一个原子操做中,根据包含的操做类型,能够分为三种状况:

a) 只有Producer生产消息;
b) 消费消息和生产消息并存,这个是事务场景中最经常使用的状况,就是咱们常说的“consume-transform-produce ”模式
c) 只有consumer消费消息,
前两种状况是事务引入的场景,最后一种状况没有使用价值(跟使用手动提交效果同样)。

2.4.1 属性配置说明
使用kafka的事务api时的一些注意事项:

a) 须要消费者的自动模式设置为false,而且不能再手动的执行consumer#commitSync或者consumer#commitAsyc
b) 生产者配置transaction.id属性
c) 生产者不须要再配置enable.idempotence,由于若是配置了transaction.id,则此时enable.idempotence会被设置为true
d) 消费者须要配置Isolation.level。在consume-trnasform-produce模式下使用事务时,必须设置为READ_COMMITTED。

2.4.2 只有写

    /**
     * 在一个事务只有生产消息操做
     */
    public void onlyProduceInTransaction() {
        Producer producer = buildProducer(); 
        // 1.初始化事务
        producer.initTransactions(); 
        // 2.开启事务
        producer.beginTransaction();
 
        try {
            // 3.kafka写操做集合
            // 3.1 do业务逻辑 
            // 3.2 发送消息
            producer.send(new ProducerRecord<String, String>("test", "transaction-data-1")); 
            producer.send(new ProducerRecord<String, String>("test", "transaction-data-2"));
            // 3.3 do其余业务逻辑,还能够发送其余topic的消息。
 
            // 4.事务提交
            producer.commitTransaction(); 
 
        } catch (Exception e) {
            // 5.放弃事务
            producer.abortTransaction();
        } 
    }
    /**
     * 须要:
     * 一、设置transactional.id
     * 二、设置enable.idempotence
     * @return
     */
    private Producer buildProducer() { 
        // create instance for properties to access producer configs
        Properties props = new Properties(); 
        // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
        props.put("bootstrap.servers", "localhost:9092"); 
        // 设置事务id
        props.put("transactional.id", "first-transactional"); 
        // 设置幂等性
        props.put("enable.idempotence",true); 
        //Set acknowledgements for producer requests.
        props.put("acks", "all"); 
        //If the request fails, the producer can automatically retry,
        props.put("retries", 1); 
        //Specify buffer size in config,这里不进行设置这个属性,若是设置了,还须要执行producer.flush()来把缓存中消息发送出去
        //props.put("batch.size", 16384); 
        //Reduce the no of requests less than 0
        props.put("linger.ms", 1); 
        //The buffer.memory controls the total amount of memory available to the producer for buffering.
        props.put("buffer.memory", 33554432); 
        // Kafka消息是以键值对的形式发送,须要设置key和value类型序列化器
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); 
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer"); 
        Producer<String, String> producer = new KafkaProducer<String, String>(props); 
        return producer;
    }

2.4.3 消费-生产并存

    /** 
     * 在一个事务内,即有生产消息又有消费消息,即常说的Consume-tansform-produce模式
     */
    public void consumeTransferProduce() {
        // 1.构建上产者
        Producer producer = buildProducer();
        // 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操做
        producer.initTransactions();
        // 3.构建消费者和订阅主题
        Consumer consumer = buildConsumer();
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            // 4.开启事务
            producer.beginTransaction();
            // 5.1 接受消息
            ConsumerRecords<String, String> records = consumer.poll(500);
            try {
                // 5.2 do业务逻辑;
                System.out.println("customer Message---");
                Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
                for (ConsumerRecord<String, String> record : records) {
                    // 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer records.
                    System.out.printf("offset = %d, key = %s, value = %s\n",
                            record.offset(), record.key(), record.value());
 
                    // 5.2.2 记录提交的偏移量
                    commits.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset()));
 
                    // 6.生产新的消息。好比外卖订单状态的消息,若是订单成功,则须要发送跟商家结转消息或者派送员的提成消息
                    producer.send(new ProducerRecord<String, String>("test", "data2"));
                }
 
                // 7.提交偏移量
                producer.sendOffsetsToTransaction(commits, "group0323");
 
                // 8.事务提交
                producer.commitTransaction();
 
            } catch (Exception e) {
                // 7.放弃事务
                producer.abortTransaction();
            }
        }
    }
    /**
     * 须要:
     * 一、关闭自动提交 enable.auto.commit
     * 二、isolation.level为read_committed
     * 并且在代码里面也不能使用手动提交commitSync( )或者commitAsync( )
     * @return
     */
    public Consumer buildConsumer() {
        Properties props = new Properties();
        // bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开
        props.put("bootstrap.servers", "localhost:9092");
        // 消费者群组
        props.put("group.id", "group0323");
        // 设置隔离级别
        props.put("isolation.level","read_committed");
        // 关闭自动提交
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer
                <String, String>(props);
        return consumer;
    }


2.4.4 只有读

    /**
     * 在一个事务只有消费消息操做
     * 这种操做其实没有什么意义,跟使用手动提交效果同样,没法保证消费消息操做和提交偏移量操做在一个事务。
     */
    public void onlyConsumeInTransaction() {
        Producer producer = buildProducer();
        // 1.初始化事务
        producer.initTransactions();
        // 2.开启事务
        producer.beginTransaction();
        // 3.kafka读消息的操做集合
        Consumer consumer = buildConsumer();
        while (true) {
            // 3.1 接受消息
            ConsumerRecords<String, String> records = consumer.poll(500);
 
            try {
                // 3.2 do业务逻辑;
                System.out.println("customer Message---");
                Map<TopicPartition, OffsetAndMetadata> commits = Maps.newHashMap();
                for (ConsumerRecord<String, String> record : records) {
                    // 3.2.1 处理消息 print the offset,key and value for the consumer records.
                    System.out.printf("offset = %d, key = %s, value = %s\n",
                            record.offset(), record.key(), record.value());
 
                    // 3.2.2 记录提交偏移量
                    commits.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset()));
                }
 
                // 4.提交偏移量
                producer.sendOffsetsToTransaction(commits, "group0323");
 
                // 5.事务提交
                producer.commitTransaction();
 
            } catch (Exception e) {
                // 6.放弃事务
                producer.abortTransaction();
            }
        }
 
    }

3 幂等性和事务性的关系
3.1 二者关系
事务属性实现前提是幂等性,即在配置事务属性transaction id时,必须还得配置幂等性;可是幂等性是能够独立使用的,不须要依赖事务属性。

幂等性引入了Porducer ID
事务属性引入了Transaction Id属性。
使用场景

enable.idempotence = true,transactional.id不设置:只支持幂等性。
enable.idempotence = true,transactional.id设置:支持事务属性和幂等性
enable.idempotence = false,transactional.id不设置:没有事务属性和幂等性的kafka
enable.idempotence = false,transactional.id设置:没法获取到PID,此时会报错

3.2 tranaction id 、producerId 和 epoch
一个app有一个tid,同一个应用的不一样实例PID是同样的,只是epoch的值不一样。如:

同一份代码运行两个实例,分步执行以下:在实例1没有进行提交事务前,开始执行实例2的初始化事务

step1  实例1-初始化事务。的打印出对应productId和epoch,信息以下:

[2018-04-21 20:56:23,106] INFO [TransactionCoordinator id=0] Initialized transactionalId first-transactional with producerId 8000 and producer epoch 123 on partition __transaction_state-12 (kafka.coordinator.transaction.TransactionCoordinator)

step2 实例1-发送消息。

step3 实例2-初始化事务。初始化事务时的打印出对应productId和epoch,信息以下:

18-04-21 20:56:48,373] INFO [TransactionCoordinator id=0] Initialized transactionalId first-transactional with producerId 8000 and producer epoch 124 on partition __transaction_state-12 (kafka.coordinator.transaction.TransactionCoordinator)

step4  实例1-提交事务,此时报错

org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.

我今天使用Flink-connector-kafka-0.11时,遇到这个现象

 

step5 实例2-提交事务

为了不这种错误,同一个事务ID,只有保证以下顺序epch小producer执行init-transaction和committransaction,而后epoch较大的procuder才能开始执行init-transaction和commit-transaction,以下顺序:

有了transactionId后,Kafka可保证:

跨Session的数据幂等发送。当具备相同Transaction ID的新的Producer实例被建立且工做时,旧的且拥有相同Transaction ID的Producer将再也不工做【上面的实例能够验证】。

kafka保证了关联同一个事务的全部producer(一个应用有多个实例)必须按照顺序初始化事务、和提交事务,不然就会有问题,这保证了同一事务ID中消息是有序的(不一样实例得按顺序建立事务和提交事务)。

3.3 事务最佳实践-单实例的事务性
经过上面实例中能够看到kafka是跨Session的数据幂等发送,即若是应用部署多个实例时常会遇到上面的问题“org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.”,必须保证这些实例生成者的提交事务顺序和建立顺序保持一致才能够,不然就没法成功。其实,在实践中,咱们更多的是如何实现对应用单实例的事务性。能够经过spring-kafaka实现思路来学习,即每次建立生成者都设置一个不一样的transactionId的值,以下代码:

在spring-kafka中,对于一个线程建立一个producer,事务提交以后,还会关闭这个producer并清除,后续同一个线程或者新的线程从新执行事务时,此时就会从新建立producer。

public class ProducerFactoryUtils{
/**
     * Obtain a Producer that is synchronized with the current transaction, if any.
     * @param producerFactory the ConnectionFactory to obtain a Channel for
     * @param <K> the key type.
     * @param <V> the value type.
     * @return the resource holder.
     */
    public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(
            final ProducerFactory<K, V> producerFactory) {
 
        Assert.notNull(producerFactory, "ProducerFactory must not be null");
 
        // 1.对于每个线程会生成一个惟一key,而后根据key去查找resourceHolder
        @SuppressWarnings("unchecked")
        KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
                .getResource(producerFactory);
        if (resourceHolder == null) {
            // 2.建立一个消费者
            Producer<K, V> producer = producerFactory.createProducer();
            // 3.开启事务
            producer.beginTransaction();
            resourceHolder = new KafkaResourceHolder<K, V>(producer);
            bindResourceToTransaction(resourceHolder, producerFactory);
        }
        return resourceHolder;
    }
}
//建立消费者代码
public class DefaultKafkaProducerFactory{
    protected Producer<K, V> createTransactionalProducer() {
        Producer<K, V> producer = this.cache.poll();
        if (producer == null) {
            Map<String, Object> configs = new HashMap<>(this.configs);
            // 对于每一次生成producer时,都设置一个不一样的transactionId
            configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                    this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
            producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
            // 1.初始化话事务。
            producer.initTransactions();
            return new CloseSafeProducer<K, V>(producer, this.cache);
        }
        else {
            return producer;
        }
    }
}

3.4 Consume-transform-Produce的流程

 

流程1 :查找Tranaction Corordinator。

Producer向任意一个brokers发送 FindCoordinatorRequest请求来获取Transaction Coordinator的地址。

流程2:初始化事务 initTransaction

Producer发送InitpidRequest给事务协调器,获取一个Pid。InitpidRequest的处理过程是同步阻塞的,一旦该调用正确返回,Producer就能够开始新的事务。

TranactionalId经过InitpidRequest发送给Tranciton Corordinator,而后在Tranaciton Log中记录这<TranacionalId,pid>的映射关系。

除了返回PID以外,还具备以下功能:

对PID对应的epoch进行递增,这样能够保证同一个app的不一样实例对应的PID是同样的,可是epoch是不一样的。
回滚以前的Producer未完成的事务(若是有)。
流程3: 开始事务beginTransaction

执行Producer的beginTransacion(),它的做用是Producer在本地记录下这个transaction的状态为开始状态。

注意:这个操做并无通知Transaction Coordinator。

流程4: Consume-transform-produce loop

流程4.0: 经过Consumtor消费消息,处理业务逻辑

流程4.1: producer向TransactionCordinantro发送AddPartitionsToTxnRequest

在producer执行send操做时,若是是第一次给<topic,partion>发送数据,此时会向Trasaction Corrdinator发送一个AddPartitionsToTxnRequest请求,

Transaction Corrdinator会在transaction log中记录下tranasactionId和<topic,partion>一个映射关系,并将状态改成begin。AddPartionsToTxnRequest的数据结构以下:

      AddPartitionsToTxnRequest => TransactionalId PID Epoch [Topic [Partition]]
      TransactionalId => string
      PID => int64
      Epoch => int16
      Topic => string
      Partition => int32

流程4.2:  producer#send发送 ProduceRequst,生产者发送数据,虽然没有尚未执行commit或者absrot,可是此时消息已经保存到kafka上,

能够参考以下图断点位置处,此时已经能够查看到消息了,并且即便后面执行abort,消息也不会删除,只是更改状态字段标识消息为abort状态。

 

流程4.3: AddOffsetCommitsToTxnRequest,Producer经过KafkaProducer.sendOffsetsToTransaction 向事务协调器器发送一个AddOffesetCommitsToTxnRequests:

    AddOffsetsToTxnRequest => TransactionalId PID Epoch ConsumerGroupID
    TransactionalId => string
     PID => int64
     Epoch => int16
     ConsumerGroupID => string

在执行事务提交时,能够根据ConsumerGroupID来推断_customer_offsets主题中相应的TopicPartions信息。这样在

流程4.4: TxnOffsetCommitRequest,Producer经过KafkaProducer.sendOffsetsToTransaction还会向消费者协调器Cosumer Corrdinator发送一个TxnOffsetCommitRequest,在主题_consumer_offsets中保存消费者的偏移量信息。

TxnOffsetCommitRequest   => ConsumerGroupID
                            PID
                            Epoch
                            RetentionTime
                            OffsetAndMetadata
  ConsumerGroupID => string
  PID => int64
  Epoch => int32
  RetentionTime => int64
  OffsetAndMetadata => [TopicName [Partition Offset Metadata]]
    TopicName => string
    Partition => int32
    Offset => int64
    Metadata => string

流程5: 事务提交和事务终结(放弃事务),经过生产者的commitTransaction或abortTransaction方法来提交事务和终结事务,这两个操做都会发送一个EndTxnRequest给Transaction Coordinator。

流程5.1:EndTxnRequest。Producer发送一个EndTxnRequest给Transaction Coordinator,而后执行以下操做:

Transaction Coordinator会把PREPARE_COMMIT or PREPARE_ABORT 消息写入到transaction log中记录
执行流程5.2
执行流程5.3
流程5.2:WriteTxnMarkerRequest

WriteTxnMarkersRequest => [CoorinadorEpoch PID Epoch Marker [Topic [Partition]]]
 CoordinatorEpoch => int32
 PID => int64
 Epoch => int16
 Marker => boolean (false(0) means ABORT, true(1) means COMMIT)
 Topic => string
 Partition => int32

对于Producer生产的消息。Tranaction Coordinator会发送WriteTxnMarkerRequest给当前事务涉及到每一个<topic,partion>的leader,leader收到请求后,会写入一个COMMIT(PID) 或者 ABORT(PID)的控制信息到data log中
对于消费者偏移量信息,若是在这个事务里面包含_consumer-offsets主题。Tranaction Coordinator会发送WriteTxnMarkerRequest给Transaction Coordinartor,Transaction Coordinartor收到请求后,

会写入一个COMMIT(PID) 或者 ABORT(PID)的控制信息到 data log中
流程5.3:Transaction Coordinator会将最终的COMPLETE_COMMIT或COMPLETE_ABORT消息写入Transaction Log中以标明该事务结束。

只会保留这个事务对应的PID和timstamp。而后把当前事务其余相关消息删除掉,包括PID和tranactionId的映射关系。

3.4.1 文件类型和查看命令

 kafka文件主要包括broker的data(主题:test)、事务协调器对应的transaction_log(主题:__tranaction_state)、偏移量信息(主题:_consumer_offsets)三种类型。

以下图

这三种文件类型其实都是topic的分区,因此对于每个目录都包含*.log、*.index、*.timeindex、*.txnindex文件(仅这个文件是为了实现事务属性引入的)。

查看文件内容:

bin/kafka-run-class.sh kafka.tools.DumpLogSegments   –files /kafka-logs/firtstopic-0/00000000000000000002.log   –print-data-log
相关文章
相关标签/搜索