Kafka 消息发送分同步 (sync)、异步 (async) 两种方式,默认使用同步方式,可经过java
producer.type=sync 同步模式
producer.type=async 异步模式 redis
//不提供回调 public Future<RecordMetadata> send(ProducerRecord<K, V> record); //提供回调 public Future<RecordMetadata> send(ProducerRecord<K, V> record, new Callback( public void onCompletion(RecordMetadata metadata, Exception exception) { // 发送失败 if (exception != null) { // 同步发送,须要设置阻塞时间,否则会一直阻塞 producer.send(proRecord).get(timeout); // 发送成功 }else{ } } ))
bootstrap.servers=${KAFKA_SERVER_IN} key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=com.unionpay.cloudatlas.galaxy.common.protocol.kafka.RecordKryoSerializer #max time to wait,default to 60s max.block.ms=10000 batch.size=65536 buffer.memory=134217728 retries=3
经过上述代码和配置能够看出apache
问题定位bootstrap
生产建议api
做者:sheen口开河
连接:https://www.jianshu.com/p/45258f744425
来源:简书
简书著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。网络
同步异步
kafka同步生产者:这个生产者写一条消息的时候,它就立马发送到某个分区去。follower还须要从leader拉取消息到本地,follower再向leader发送确认,leader再向客户端发送确认。因为这一套流程以后,客户端才能获得确认,因此很慢。
kafka异步生产者:这个生产者写一条消息的时候,先是写到某个缓冲区,这个缓冲区里的数据还没写到broker集群里的某个分区的时候,它就返回到client去了。虽然效率快,可是不能保证消息必定被发送出去了。async
经过 request.required.acks 属性进行配置:值可设为 0, 1, -1(all) -1 和 all 等同函数
0 表明:不等待 broker 的 ack,这一操做提供了一个最低的延迟,broker 一接收到尚未写入磁盘就已经返回,当 broker 故障时有可能丢失数据;性能
1 表明:producer 等待 broker 的 ack,partition 的 leader 落盘成功后返回 ack,若是在 follower 同步成功以前 leader 故障,那么将会丢失数据;
-1 表明:producer 等待 broker 的 ack,partition 的 leader 和 follower 所有落盘成功后才返回 ack,数据通常不会丢失,延迟时间长可是可靠性高;可是这样也不能保证数据不丢失,好比当 ISR 中只有 leader 时( ISR 中的成员因为某些状况会增长也会减小,最少就只剩一个 leader),这样就变成了 acks = 1 的状况;
还有第四种,部分保存策略。须要配置。
另一个就是使用高级消费者存在数据丢失的隐患: 消费者读取完成,高级消费者 API 的 offset 已经提交,可是尚未处理完成DB操做,消费者就挂掉了,此时 offset 已经更新,没法再消费以前丢失的数据. 解决办法消费者使用低级api,读取到消息后,处理完成,在手动commit给kafka,这样子一旦处理失败,offset不会提交,下次还能够继续消费。
1.acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等状况时,消息可能丢失;
2.acks=一、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;
生产者:同步发送消息,且消息配置为-1或all,leader分区和全部follwer都写到磁盘里。
异步模式下,为防止缓冲区满,能够在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态。可是处于阻塞状态会影响其余正常性能。
生产者:手动提交,即读取到消息后,确认消息消费完毕,才手动提交offset。可是要避免逻辑处理时间过长,致使链接超时,会使消息重复消费。
acks = -1 的状况下,数据发送到 leader 后 ,部分 ISR 的副本同步,leader 此时挂掉。好比 follower1 和 follower2 都有可能变成新的 leader, producer 端会获得返回异常,producer 端会从新发送数据,数据可能会重复
另外, 在高阶消费者中,offset 采用自动提交的方式, 自动提交时,假设 1s 提交一次 offset 的更新,设当前 offset = 10,当消费者消费了 0.5s 的数据,offset 移动了 15,因为提交间隔为 1s,所以这一offset 的更新并不会被提交,这时候咱们写的消费者挂掉,重启后,消费者会去 ZooKeeper 上获取读取位置,获取到的 offset 仍为10,它就会重复消费. 解决办法使用低级消费者.
针对消息重复:将消息的惟一标识保存到外部介质中,每次消费时判断是否处理过便可。好比redis中
消息可使用惟一id标识
生产者(ack=all 表明至少成功发送一次)
消费者 (offset手动提交,业务逻辑成功处理后,提交offset)
落表(主键或者惟一索引的方式,避免重复数据)
业务逻辑处理(选择惟一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)