发送生产消息的大体流程:服务器
1. 建立生产者对象,生产者发送包装消息的ProducerRecord它是一种与语言无关的序列化格式。数据经过schema来定义,若是出现读的schema与写的shema不一致的时候,不会抛出遗产,而选择返回默认值。使用的时候,在注册表中注册一个schema,消息字段schema的标识,而后存放到broker中,消费者使用标识符从注册表中拉取schema进行解析获得结果
网络
使用Propertites[包含 server,key.deserializer和value.deserializer]初始化 KafkaConsumer,经过consumer.subscribe便可订阅主题,主题能够是一个列表或者是一表达式
异步
轮询。消费者订阅了主题后,轮询中处理全部细节,包括群组协调、分区再平衡、发送心跳和获取数据函数
如何优雅退出轮询?添加shutdownhook,在钩子里头调用消费者的wakeup方法,这样若是读取poll,会抛出wakeup异常,而后调用close方法,保证最后的提交都已经完成,而且告知群组协调器,本身要离开群组,而后就触发了再均衡
一个群组里面有多个消费者,一个消费者只有一个线程
spa
kafka对每一个分区都有一个偏移量,来跟踪当前消息消费到哪儿去了,若是配置自动提交(更新分区当前位置),默认每5s就上报一次从poll中获取的收到的最大偏移量。可是这种自动方式若是在小于默认的时间以内发生了再均衡,会照成消息重复消费
线程
想本身提交偏移量,避免自动提交存在的问题怎么办?
1. 同步提交 [commitSync()],提交最后一次的偏移量。只要不是不可恢复的问题,就会一直重试,可是在broker对提交作出反应前,会一直阻塞,有可能成为吞吐量的瓶颈 ;
2. 异步提交[commitAsync()],提交最后一次的偏移量。不重试,若是异步提交出现问题,能够经过回调来观察
某些操做我必定要成功,可是又不想每次阻塞,怎么办?
混用同步提交和异步提交。在消息处理的时候异步提交,若是出了问题就catch住,而后同步提交
同步提交和异步提交都只能对最后一次进行提交,我想更频繁的,更自助的控制好提交的频率,怎么作?
用map存储每一个分区的偏移量,而后根据本身的需求,在读取消息后,异步提交整个map