初识kafka中的生产者与消费者

发送生产消息的大体流程:服务器

1. 建立生产者对象,生产者发送包装消息的ProducerRecord
2. 生产者经过send方法发送消息
3. 消息被序列化
4. 消息计算出分区
5. 根据分区消息被分配到指定主题和分区的批次中
6. 批量发送到broker
7. broker判断是否消息失败,成功则直接返回元数据【可选】,失败判断是否重试,对应作相应处理

如何建立生产者对象?

必须指定3个属性:broker地址,key的序列化方式,value的序列化方式。其它可选参数,包括重试次数,内存缓冲大小,每次发送消息的批次大小,是否压缩等等

Avro序列化简介

它是一种与语言无关的序列化格式。数据经过schema来定义,若是出现读的schema与写的shema不一致的时候,不会抛出遗产,而选择返回默认值。使用的时候,在注册表中注册一个schema,消息字段schema的标识,而后存放到broker中,消费者使用标识符从注册表中拉取schema进行解析获得结果
网络


如何发送消息?

1. 同步方式:构建消息的封装ProducerRecord,经过生产者的send方法发送便可,能够用Future的方式接收返回的RecordMetadata
2. 异步方式:同步发消息若是服务器之间通讯的时间是10ms,那么1s只能发100个消息,所以不等待的方式(异步)能够节省时间,增长吞吐
3. 发送并忘记:只管发送,不处理任何返回值

发送消息的过程当中出了异常怎么办?

kafka异常基本有两类,一是可以重试的方式,好比网络链接段了,一是不会重连,好比消息太大,会直接抛异常,对于异步来说,能够经过使用回调函数来处理期间出现的异常


代码上如何建立消费者并订阅主题?

使用Propertites[包含 server,key.deserializer和value.deserializer]初始化 KafkaConsumer,经过consumer.subscribe便可订阅主题,主题能够是一个列表或者是一表达式

异步

代码上消费者是如何获取数据的?

轮询。消费者订阅了主题后,轮询中处理全部细节,包括群组协调、分区再平衡、发送心跳和获取数据函数

如何优雅退出轮询?添加shutdownhook,在钩子里头调用消费者的wakeup方法,这样若是读取poll,会抛出wakeup异常,而后调用close方法,保证最后的提交都已经完成,而且告知群组协调器,本身要离开群组,而后就触发了再均衡


消费者和线程之间的关系是什么?

一个群组里面有多个消费者,一个消费者只有一个线程
spa


为何kafka可以从上次断开的地方再开始读取消息?

kafka对每一个分区都有一个偏移量,来跟踪当前消息消费到哪儿去了,若是配置自动提交(更新分区当前位置),默认每5s就上报一次从poll中获取的收到的最大偏移量。可是这种自动方式若是在小于默认的时间以内发生了再均衡,会照成消息重复消费

线程

想本身提交偏移量,避免自动提交存在的问题怎么办? 1. 同步提交 [commitSync()],提交最后一次的偏移量。只要不是不可恢复的问题,就会一直重试,可是在broker对提交作出反应前,会一直阻塞,有可能成为吞吐量的瓶颈  ; 2. 异步提交[commitAsync()],提交最后一次的偏移量。不重试,若是异步提交出现问题,能够经过回调来观察
某些操做我必定要成功,可是又不想每次阻塞,怎么办? 混用同步提交和异步提交。在消息处理的时候异步提交,若是出了问题就catch住,而后同步提交
同步提交和异步提交都只能对最后一次进行提交,我想更频繁的,更自助的控制好提交的频率,怎么作? 用map存储每一个分区的偏移量,而后根据本身的需求,在读取消息后,异步提交整个map
相关文章
相关标签/搜索