是否真正的存在数据丢失问题,好比有不少时候多是其余同事操做了测试环境,因此首先确保数据没有第三方干扰。缓存
理清你的业务流程,数据流向,数据究竟是在什么地方丢失的数据,在kafka 以前的环节或者kafka以后的流程丢失?好比kafka的数据是由flume提供的,也许是flume丢失了数据,kafka 天然就没有这一部分数据。安全
如何发现有数据丢失,又是如何验证的。从业务角度考虑,例如:教育行业,每一年高考后数据量巨大,可是却反常的比高考前还少,或者源端数据量和目的端数据量不符网络
定位数据是否在kafka以前就已经丢失还事消费端丢失数据的session
若是auto.commit.enable=true,当consumer fetch了一些数据但尚未彻底处理掉的时候,恰好到commit interval出发了提交offset操做,接着consumer crash掉了。这时已经fetch的数据尚未处理完成但已经被commit掉,所以没有机会再次被处理,数据丢失。架构
网络负载很高或者磁盘很忙写入失败的状况下,没有自动重试重发消息。没有作限速处理,超出了网络带宽限速。kafka必定要配置上消息重试的机制,而且重试的时间间隔必定要长一些,默认1秒钟并不符合生产环境(网络中断时间有可能超过1秒)。app
若是磁盘坏了,会丢失已经落盘的数据异步
单批数据的长度超过限制会丢失数据,报kafka.common.MessageSizeTooLargeException异常 解决:ide
Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer. Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated). Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer. Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)
partition leader在未完成副本数follows的备份时就宕机的状况,即便选举出了新的leader可是已经push的数据由于未备份就丢失了! kafka是多副本的,当你配置了同步复制以后。多个副本的数据都在PageCache里面,出现多个副本同时挂掉的几率比1个副本挂掉的几率就很小了。(官方推荐是经过副原本保证数据的完整性的)oop
kafka的数据一开始就是存储在PageCache上的,按期flush到磁盘上的,也就是说,不是每一个消息都被存储在磁盘了,若是出现断电或者机器故障等,PageCache上的数据就丢失了。 能够经过log.flush.interval.messages和log.flush.interval.ms来配置flush间隔,interval大丢的数据多些,小会影响性能但在0.8版本,能够经过replica机制保证数据不丢,代价就是须要更多资源,尤为是磁盘资源,kafka当前支持GZip和Snappy压缩,来缓解这个问题 是否使用replica取决于在可靠性和资源代价之间的balance性能
同时kafka也提供了相关的配置参数,来让你在性能与可靠性之间权衡(通常默认):
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.flush.scheduler.interval.ms = 3000
设计上保证数据的可靠安全性,依据分区数作好数据备份,设立副本数等。 push数据的方式:同步异步推送数据:权衡安全性和速度性的要求,选择相应的同步推送仍是异步推送方式,当发现数据有问题时,能够改成同步来查找问题。
flush是kafka的内部机制,kafka优先在内存中完成数据的交换,而后将数据持久化到磁盘.kafka首先会把数据缓存(缓存到内存中)起来再批量flush. 能够经过log.flush.interval.messages和log.flush.interval.ms来配置flush间隔
能够经过replica机制保证数据不丢. 代价就是须要更多资源,尤为是磁盘资源,kafka当前支持GZip和Snappy压缩,来缓解这个问题 是否使用replica(副本)取决于在可靠性和资源代价之间的balance(平衡)
broker到 Consumer kafka的consumer提供两种接口.
high-level版本已经封装了对partition和offset的管理,默认是会按期自动commit offset,这样可能会丢数据的
low-level版本本身管理spout线程和partition之间的对应关系和每一个partition上的已消费的offset(按期写到zk) 而且只有当这个offset被ack后,即成功处理后,才会被更新到zk,因此基本是能够保证数据不丢的即便spout线程crash(崩溃),重启后仍是能够从zk中读到对应的offset
kafka不像hadoop更致力于处理大量级数据,kafka的消息队列更擅长于处理小数据。针对具体业务而言,如果源源不断的push大量的数据(eg:网络爬虫),能够考虑消息压缩。可是这也必定程度上对CPU形成了压力,仍是得结合业务数据进行测试选择
topic设置多分区,分区自适应所在机器,为了让各分区均匀分布在所在的broker中,分区数要大于broker数。分区是kafka进行并行读写的单位,是提高kafka速度的关键。
broker能接收消息的最大字节数的设置必定要比消费端能消费的最大字节数要小,不然broker就会由于消费端没法使用这个消息而挂起。
broker可赋值的消息的最大字节数设置必定要比能接受的最大字节数大,不然broker就会由于数据量的问题没法复制副本,致使数据丢失
关闭自动更新offset,等到数据被处理后再手动跟新offset。 在消费前作验证前拿取的数据是不是接着上回消费的数据,不正确则return先行处理排错。 通常来讲zookeeper只要稳定的状况下记录的offset是没有问题,除非是多个consumer group 同时消费一个分区的数据,其中一个先提交了,另外一个就丢失了。
问题: kafka的数据一开始就是存储在PageCache上的,按期flush到磁盘上的,也就是说,不是每一个消息都被存储在磁盘了,若是出现断电或者机器故障等,PageCache上的数据就丢失了。
这个是总结出的到目前为止没有发生丢失数据的状况
//producer用于压缩数据的压缩类型。默认是无压缩。正确的选项值是none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越好 props.put("compression.type", "gzip"); //增长延迟 props.put("linger.ms", "50"); //这意味着leader须要等待全部备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。, props.put("acks", "all"); //无限重试,直到你意识到出现了问题,设置大于0的值将使客户端从新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不一样。容许重试将潜在的改变数据的顺序,若是这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。 props.put("retries ", MAX_VALUE); props.put("reconnect.backoff.ms ", 20000); props.put("retry.backoff.ms", 20000); //关闭unclean leader选举,即不容许非ISR中的副本被选举为leader,以免数据丢失 props.put("unclean.leader.election.enable", false); //关闭自动提交offset props.put("enable.auto.commit", false); 限制客户端在单个链接上可以发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求以前client不能再向同一个broker发送请求。注意:设置此参数是为了不消息乱序 props.put("max.in.flight.requests.per.connection", 1);
强行kill线程,致使消费后的数据,offset没有提交,partition就断开链接。好比,一般会遇到消费的数据,处理很耗时,致使超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有必定概率offset没提交,会致使重平衡后重复消费。 若是在close以前调用了consumer.unsubscribe()则有可能部分offset没提交,下次重启会重复消费
kafka数据重复 kafka设计的时候是设计了(at-least once)至少一次的逻辑,这样就决定了数据多是重复的,kafka采用基于时间的SLA(服务水平保证),消息保存必定时间(一般为7天)后会被删除 kafka的数据重复通常状况下应该在消费者端,这时log.cleanup.policy = delete使用按期删除机制