1.大致的一个业务需求:linux
Logstash吐出来的消息做为kafka producer,每小时大约有百万条记录,kafka consumer 使用springboot 配置kafka,根据业务需求按照topic 日期 小时 这种结构 写入到Hdfsspring
2.遇到的问题:1.开始的时候每条记录写入 设置到 hdfs的create 以及append ,每秒大约能够消费 1000条记录,这种消费速度形成kafka消息的堆积,LAG一直很大,特别是有的topic能够到达数万条;2.在某一天,Kafka进程忽然死掉,也没有报错日志。springboot
3.解决问题:服务器
Kafka进程忽然死掉, 在linux服务器上 使用命令 app
nohup top -p kafka pid -b >> out.txt & fetch
将服务器的内存信息等 输出到 指定文件中,发如今kafka进程挂掉的前一秒,free 可以使用的内存还有150m 。初步判断是服务器内存不足。而后 看另外一台的服务器 buff/cache 内存很高。而后查资料释放这台服务器的cache内存。将kafak consumer迁移到这台服务器,目前在观察;后续输出kafka consumer的堆栈信息优化
Kafka consumer 业务逻辑进行优化,每次拉取 1000条数据,手动提交offset 以及 按批写入Hdfs。目前的问题是,在 kafka的offset 的LAG很小或者为0的时候,每次拉取的数据不固定,基本不超过500,针对该问题 正在处理中spa
4.总结:观察 进程的占用服务器的资源; Java进程的堆栈信息;对于写Hdfs 单次写入 和批量写入的速度对比;Kafka Consumer的配置以及坑点日志
spring.kafka.consumer.auto-commit-interval-ms=3000
spring.kafka.consumer.max-poll-records=1200 //最大拉取消息的条数,当该条不知足的时候,根据spring.kafka.consumer.fetch-min-size来拉取[目前看来这个还有问题]
spring.kafka.consumer.fetch-min-size=900000 //最小拉取的字节数
spring.kafka.consumer.auto-offset-reset=earliest //针对没有提交的offset的consumer 会从头开始消费;已有提交的offset,从当前的offset开始消费