1. kafka线上集群环境规划
jvm参数设置
因为kafka是scala语言开发,运行在JVM上,须要对JVM参数合理设置。修改bin/kafka-start-server.sh中的jvm设置,假设机器是32G内存,能够以下设置:redis
export KAFKA_HEAP_OPTS="-Xmx16G -Xms16G -Xmn10G -XX:MetaspaceSize=256M -XX:+UseG1GC -XX:MaxGCPauseMillis=50 -XX:G1HeapRegionSize=16M"
这种大内存的状况通常都要用G1垃圾收集器,由于年轻代内存比较大,用G1能够设置GC最大停顿时间,不至于一次minor gc就花费太长时间,固然,由于像kafka,rocketmq,es这些中间件,写数据到磁盘会用到操做系统的page cache,因此JVM内存不宜分配过大,须要给操做系统的缓存留出几个G。bootstrap
2. 消息丢失问题
kafka在生产端发送消息 和 消费端消费消息 时均可能会丢失一些消息缓存
①:生产者消息丢失
生产者在发送消息时,会有一个ack机制,当ack=0 或者 ack=1时,均可能会丢消息。以下所示:网络
②:消费端消息丢失
消费端丢消息最主要体如今消费端offset的自动提交,若是开启了自动提交,万一消费到数据还没处理完,此时你consumer直接宕机了,未处理完的数据 丢失了,下次也消费不到了,由于offset已经提交完毕,下次会从offset出开始消费新消息。架构
解决办法是采用消费端的手动提交jvm
//手动提交offset /** * 注意若是要使用手动提交offset,须要如下三点 * ①:配置文件配置手动提交方式 * ②:加上参数Acknowledgment ack * ③:方法中使用ack.acknowledge();手动提交 */ ack.acknowledge();
3. 消息重复消费
消息的重复消费在生产端和消费端均可能发生,下面一一讲解:分布式
①:生产端消息重复发送
发送消息若是配置了重试机制,好比因为网络波动,生产者未获得broker收到了消息的响应,就会触发重试机制,3秒后再次发送此消息。broker以前已经收到过这个消息,但生产者因为触发了重试机制,就致使了消息的重复发送。那么broker就会在磁盘缓存多条一样的消息,消费端从broker拉取消息时,就会形成重复消费。ide
注意:kafka新版本已经在broker中保证了接收消息的幂等性(好比2.4版本),只需在生产者加上参数 props.put(“enable.idempotence”, true) 便可,默认是false不开启。工具
新版kafka的broker幂等性具体实现原理:
kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一块儿发送给broker,broker会将PID和Sequence Number跟消息绑定一块儿存起来,下次若是生产者重发相同消息,broker会检查PID和Sequence Number,若是相同不会再接收。性能
①:消费端消息重复消费
对于消费端消息的重复消费问题,若是消费端拉取了一部分数据,消费完毕后,准备执行手动提交(或自动提交)时,消费者挂掉了!此时offset还未提交呢,那么当服务重启时,仍是会拉取相同的一批数据重复处理!形成消息重复消费
不管是生产者仍是消费者的重复消息,通常都会在消费端卡死,作幂等性处理。
幂等性能够用redis的setnx分布式锁来实现。好比操做订单消息,能够把订单id做为key,在消费消息时,经过setnx命令设置一下,offset提交完成后,在redis中删除订单id的key。setnx命令保证一样的订单消息,只有一个能被消费,可有效保证消费的幂等性!
4. 顺序消息
kafka想要保证消息顺序,是须要牺牲必定性能的,方法就是一个消费者,消费一个分区,能够保证消费的顺序性。但也仅限于消费端消费消息的有序性,没法保证生产者发送消息有序。
好比:若是发送端配置了重试机制,kafka不会等以前那条消息彻底发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。发送端消息发送已经乱序,到了消费端消费时,天然没法保证顺序!
若是必定要保证生产-消费全链路消息有序,发送端须要同步发送,ack回调不能设置为0。且只能有一个分区,一个消费者进行消费,但这样明显有悖于kafka的高性能理论!
问题:如何在多个分区中保证消息顺序和消息处理效率呢?
首先使用多个分区,消息能够被发送端发送至多个分区,保证消息发送的效率。而后在消费端在拉消息时使用ConutdownLunch来记录一组有序消息的个数。若是达到个数,说明已拉取到完整的一组有序消息。而后在消费端根据消息序号进行排序,消费端将排好序的消息发到内存队列(能够搞多个),一个内存队列开启一个线程顺序处理消息。便可最大程度上既保证顺序又保证效率!
5. 消息积压
线上有时由于发送方发送消息速度过快,或者消费方处理消息过慢,可能会致使broker积压大量未消费消息。
解决方案:此种状况若是积压了上百万未消费消息须要紧急处理,能够修改消费端程序,让其将收到的消息快速转发到其余topic(能够设置不少分区),而后再启动多个消费者同时消费新主题的不一样分区。如图所示:
因为消息数据格式变更或消费者程序有bug,致使消费者一直消费不成功,也可能致使broker积压大量未消费消息。
解决方案:此种状况能够将这些消费不成功的消息转发到其它队列里去(相似死信队列),后面再慢慢分析死信队列里的消息处理问题。这个死信队列,kafka并无提供,须要整合第三方插件!
5. 延时消息
延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送之后,并不想让消费者马上获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有不少, 好比 :
但kafka没有提供延时消息功能,能够用rocketmq、rabbitmq都作延时消息。若是必定要用kafka实现延时消息呢?
实现思路:发送延时消息时先把消息按照不一样的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…topic_2h,这个通常不能支持任意时间段的延时),而后经过定时器进行轮训消费这些topic,查看消息是否到期,若是到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来讲就是定时器在一次消费过程当中,对消息的发送时间作判断,看下是否延迟到对应时间了,若是到了就转发,若是还没到这一次定时任务就能够提早结束了。
6. 消息回溯若是某段时间对已消费消息计算的结果以为有问题,多是因为程序bug致使的计算错误,当程序bug修复后,这时可能须要对以前已消费的消息从新消费,能够指定从多久以前的消息回溯消费,这种能够用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费,完成消息的回溯消费!
7. kafka高性能的缘由
高性能缘由以下:
分布式存储架构
磁盘顺序读写
kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
读写数据的批量batch处理以及压缩传输
数据传输的零拷贝
kafka相对于rocketMQ、rabbitMQ来讲,与它们最大的区别就是分布式存储,这也是kafka高性能的最主要缘由。使用分布式存储理念,一个主题下多个分区,同时能够被多个消费者和生产者去使用,也增长了接受消息和消费消息的能力!
但分区数也并非越多越好,若是没法肯定开多少分区,可使用kafka压测工具本身测试分区数不一样,各类状况下的吞吐量来决定分区数
# 往test里发送一百万消息,每条设置1KB # throughput 用来进行限流控制,当设定的值小于 0 时不限流,当设定的值大于 0 时,当发送的吞吐量大于该值时就会被阻塞一段时间 bin/kafka-producer-perf-test.sh --topic test --num-records 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=192.168.65.60:9092 acks=1
分区数到达某个值吞吐量反而开始降低,实际上不少事情都会有一个临界值,当超过这个临界值以后,不少本来符合既定逻辑的走向又会变得不一样。通常状况分区数跟集群机器数量至关就差很少了。