kafka学习(8)如何保证数据有序,处理数据丢失和数据重复消费

首先了解一下kafka消息生产者和消费者是如何处理消息的

生产者发送消息有两种方式,同步(sync)和异步 (async) 

Kafka 消息发送分同步 (sync)、异步 (async) 两种方式,默认使用同步方式,可经过java

producer.type=sync 同步模式 
producer.type=async 异步模式 redis

//不提供回调
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
//提供回调
public Future<RecordMetadata> send(ProducerRecord<K, V> record, new Callback(
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        // 发送失败
        if (exception != null) {
        // 同步发送,须要设置阻塞时间,否则会一直阻塞
        producer.send(proRecord).get(timeout);
        
        // 发送成功
        }else{
        
        }
}
))
bootstrap.servers=${KAFKA_SERVER_IN}
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.unionpay.cloudatlas.galaxy.common.protocol.kafka.RecordKryoSerializer
#max time to wait,default to 60s
max.block.ms=10000
batch.size=65536
buffer.memory=134217728
retries=3

经过上述代码和配置能够看出apache

  • 最大block事件为1000ms,也就是10s
  • buffer配置的较大,为134M
  • 生产者先是异步发送,若是发送失败,则执行一次同步发送

问题定位bootstrap

  • 在异步发送的的回调里使用了同步的方式再次发送,因为kafka producer的同步发送是阻塞等待,且使用的是不带超时时间的无限期等待(future.get()中未指定超时时间),所以当不被唤醒时会一直wai下去
  • kafka生产者的IO线程(实际执行数据发送的线程)是单线程模型,且回调函数是在IO线程中执行的,所以回调函数的阻塞会直接致使IO线程阻塞,因而生产者缓冲区的数据没法被发送
  • kafka生产者还在不断的被应用调用,所以缓冲区一直累积并增大,当缓冲区满的时候,生产者线程会被阻塞,最大阻塞时间为max.block.time,若是改时间到达以后仍是没法将数据塞入缓冲区,则会抛出一个异常,所以日志中看到达到10s以后,打印出异常栈
  • 因为使用了get没有指定超时时间,且该await一直没法被唤醒,所以这种状况会一直持续,在没有人工干预的状况下,永远不会发送成功

生产建议api

  • kafka生产者推荐使用异步方式发送,而且提供回调以响应发送成功或者失败
  • 若是须要使用future.get的方式模拟同步发送,则须要在get里加上合适的超时时间,避免由于不可预知的外部因素致使线程没法被唤醒,即便用Future.get(long timeout)的api而不是不带超时参数的Future.get()
  • 不要在异步回调中执行阻塞操做或者耗时比较久的操做,若是有必要能够考虑交给另外一个线程(池)去作



做者:sheen口开河
连接:https://www.jianshu.com/p/45258f744425
来源:简书
简书著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。网络

同步异步

kafka同步生产者:这个生产者写一条消息的时候,它就立马发送到某个分区去。follower还须要从leader拉取消息到本地,follower再向leader发送确认,leader再向客户端发送确认。因为这一套流程以后,客户端才能获得确认,因此很慢。
kafka异步生产者:这个生产者写一条消息的时候,先是写到某个缓冲区,这个缓冲区里的数据还没写到broker集群里的某个分区的时候,它就返回到client去了。虽然效率快,可是不能保证消息必定被发送出去了。async

 

经过 request.required.acks 属性进行配置:值可设为 0, 1, -1(all)    -1 和 all 等同函数

0 表明:不等待 broker 的 ack,这一操做提供了一个最低的延迟,broker 一接收到尚未写入磁盘就已经返回,当 broker 故障时有可能丢失数据;性能

1 表明:producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,若是在 follower 同步成功以前 leader 故障,那么将会丢失数据;

-1 表明:producer 等待 broker 的 ack,partition 的 leader 和 follower 所有落盘成功后才返回 ack,数据通常不会丢失,延迟时间长可是可靠性高;可是这样也不能保证数据不丢失,好比当 ISR 中只有 leader 时( ISR 中的成员因为某些状况会增长也会减小,最少就只剩一个 leader),这样就变成了 acks = 1 的状况;

还有第四种,部分保存策略。须要配置。

消费者消息消息的方式,高级api和低级api

另一个就是使用高级消费者存在数据丢失的隐患: 消费者读取完成,高级消费者 API 的 offset 已经提交,可是尚未处理完成DB操做,消费者就挂掉了,此时 offset 已经更新,没法再消费以前丢失的数据. 解决办法消费者使用低级api,读取到消息后,处理完成,在手动commit给kafka,这样子一旦处理失败,offset不会提交,下次还能够继续消费。

消息丢失场景

1.acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等状况时,消息可能丢失;
2.acks=一、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;

如何防止数据丢失

生产者:同步发送消息,且消息配置为-1或all,leader分区和全部follwer都写到磁盘里。

             异步模式下,为防止缓冲区满,能够在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态。可是处于阻塞状态会影响其余正常性能。

生产者:手动提交,即读取到消息后,确认消息消费完毕,才手动提交offset。可是要避免逻辑处理时间过长,致使链接超时,会使消息重复消费。

消息重复的缘由

acks = -1 的状况下,数据发送到 leader 后 ,部分 ISR 的副本同步,leader 此时挂掉。好比 follower1 和 follower2 都有可能变成新的 leader, producer 端会获得返回异常,producer 端会从新发送数据,数据可能会重复

另外, 在高阶消费者中,offset 采用自动提交的方式, 自动提交时,假设 1s 提交一次 offset 的更新,设当前 offset = 10,当消费者消费了 0.5s 的数据,offset 移动了 15,因为提交间隔为 1s,所以这一offset 的更新并不会被提交,这时候咱们写的消费者挂掉,重启后,消费者会去 ZooKeeper 上获取读取位置,获取到的 offset 仍为10,它就会重复消费. 解决办法使用低级消费者.

消息重复解决方案

针对消息重复:将消息的惟一标识保存到外部介质中,每次消费时判断是否处理过便可。好比redis中

消息可使用惟一id标识

生产者(ack=all 表明至少成功发送一次)

消费者 (offset手动提交,业务逻辑成功处理后,提交offset)

落表(主键或者惟一索引的方式,避免重复数据)

业务逻辑处理(选择惟一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)

相关文章
相关标签/搜索