最近看了kafka2.4新版本的一些功能特性,不得不说,在kafka2.0之后,kafka自身就比较少推出一些新的feature了,基本都是一些修修补补的东西。却是kafka connect和kafka stream相关的开发工做作的比较多。可能kafka的野心也不局限于要当一个中间件,而是要实现一个流处理系统的生态了。git
此次要介绍的是我以为比较有意思的两个特性,一个是kafka支持从follower副本读取数据,固然这个功能并非为了提供读取性能,后面再详细介绍。另外一个则是新推出的sticky partitioner功能,我猜是从rebalance的StickyAssignor中获得灵感,发现producer的分区策略也能够这样搞,233,这个feature主要做用是提升性能。github
这两个feature都是kafka2.4.0版本推出的,若是想使用这些新feature,那么不妨升级下吧~apache
在早先kafka的设计中,为了使consumer读取数据可以保持一致,是只容许consumer读取leader副本的数据的。即follower replica只是单纯地备份数据的做用。那推出follower replica fetch功能的背景是什么呢?服务器
举个比较常见的场景,kafka存在多个数据中心,不一样数据中心存在于不一样的机房,当其中一个数据中心须要向另外一个数据中心同步数据的时候,因为只能从leader replica消费数据,那么它不得不进行跨机房获取数据,而这些流量带宽一般是比较昂贵的(尤为是云服务器)。即没法利用本地性来减小昂贵的跨机房流量。性能
因此kafka推出这一个功能,就是帮助相似这种场景,节约流量资源。而且这种功能彷佛还能够和新推出的mirror maker2相互配合,实现多个数据源的数据同步,不过我本身还没测试过。测试
要说follower replica fetch,那就不得不先说rack功能,这个是kafka比较早就推出的功能,是Kafka对机架感知提供了的基本支持,能够将其用于控制副本的放置,详细内容能够参阅这篇Kafka机架感知文章。fetch
使用方式,其实就是一个broker端的参数,broker.rack,这个参数能够说明当前broker在哪一个机房。设计
举上面文章中的例子,若是一个数据中心的集群分布以下:
code
那么能够这样配置:orm
这样其实就是至关于给broker打一个标签,当新建topic,好比新建一个两个副本 & 两个分区的topic,kafka至少会自动给rack1或rack2分配所有分区的一个副本。什么,你说要是建立两个分区一个副本的topic该怎么分。。。抱歉,我给不了答案。等你本身实践而后评论跟我说下答案 =。=
OK,上面介绍的rack功能,咱们就能发现,这个其实跟跨机房读数据的这种场景是很搭的。在跨机房多数据中心场景中,若是数据中心A,一个副本放在数据中心B的机房中,只要让数据中心B的consumer可以读数据中心A的那个replica的数据(follower副本)读数据,那不就万事大吉。
社区也是这样想的,因此就推出了这个功能。让消费者能够指定rack id,而后能够不从消费者读取数据。要实现这个目的,须要先配置两个参数:
replica.selector.class
为了支持这个功能,kafka修改了这部分的接口实现,源码中新增一个ReplicaSelector
接口,若是用户有自定义消费策略的需求,也能够继承这个接口实现本身的功能。
目前这个接口有两个实现类,一个是LeaderSelector
,即从leader副本读数据。另外一个则是RackAwareReplicaSelector
,会去到指定的rack id读数据。
client.rack
broker.rack
相同,表示去哪一个rack中获取数据。这个参数只有在上面的replica.selector.class
指定为RackAwareReplicaSelector
且broekr指定了broker.rack
才会生效。
这个功能要测试也挺简单的,能够直接搭建一个两个broker的kafka集群,配置broker.rack,而后使用consumer客户端指定client.rack发送到非leader的节点查数据就好了。另外,可使用这条命令查看网卡流量信息:
sar -n DEV 1 300
从follower replica读取数据确定有问题,最可能的问题就是落后节点的问题,从这样的节点读取数据会面临什么样的状况呢?官方给出了几种场景及解决办法。先看看这张图
主要有四种可能出现问题的状况,咱们分别来看看应该如何解决:
Case 1(uncommitted offset)
这个场景是follower接收到数据但还未committed offset,这个时候,若消费者的offet消费到high watemark到log end offset之间的那段(Case 1黄色那段),会返回空数据,而不是一个错误信息。直到这段内容 committed。
case 2(unavailable offset)
这种场景应该发生于慢节点的状况下,满节点的broker还未接收到实际数据,但已经跟leader通讯知道有部分数据committed了(case 2黄色部分)。当遇到这种状况,consumer 消费到时候,会返回 OFFSET_NOT_AVAILABLE 错误信息。
case 3(offset too small)
这种状况可能出如今消费者指定了 offset 的状况。那么在指定不一样auto.offset.reset
的时候有不一样的状况。
case 4(offset too large)
遇到这种状况,会返回一个 broker 会返回一个 OFFSET_OUT_OF_RANGE 的错误。
但 OFFSET_OUT_OF_RANGE 遇到这种错误的时候也有多种可能,官方给出当 consumer 遇到这种问题的解决思路,
Use the OffsetForLeaderEpoch API to verify the current position with the leader.
kafka producer发送数据并非一个一个消息发送,而是取决于两个producer端参数。一个是linger.ms
,默认是0ms,当达到这个时间后,kafka producer就会马上向broker发送数据。另外一个参数是batch.size
,默认是16kb,当产生的消息数达到这个大小后,就会当即向broker发送数据。
按照这个设计,从直观上思考,确定是但愿每次都尽量填满一个batch再发送到一个分区。但实际决定batch如何造成的一个因素是分区策略(partitioner strategy)。在Kafka2.4版本以前,在producer发送数据默认的分区策略是轮询策略(没指定keyd的状况),这在我之前的文章有说到过详细解析kafka之kafka分区和副本。若是多条消息不是被发送到相同的分区,它们就不能被放入到一个batch中。
因此若是使用默认的轮询partition策略,可能会形成一个大的batch被轮询成多个小的batch的状况。鉴于此,kafka2.4的时候推出一种新的分区策略,即Sticky Partitioning Strategy,Sticky Partitioning Strategy会随机地选择另外一个分区并会尽量地坚持使用该分区——即所谓的粘住这个分区。
鉴于小batch可能致使延时增长,以前对于无Key消息的分区策略效率很低。社区于2.4版本引入了黏性分区策略(Sticky Partitioning Strategy)。该策略是一种全新的策略,可以显著地下降给消息指定分区过程当中的延时。
使用Sticky Partitioner有助于改进消息批处理,减小延迟,并减小broker的负载。
sticky Partitioner实现的代码是在UniformStickyPartitioner
里面。贴下代码看看:
public class UniformStickyPartitioner implements Partitioner { private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache(); public void configure(Map<String, ?> configs) {} public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return stickyPartitionCache.partition(topic, cluster); } public void close() {} public void onNewBatch(String topic, Cluster cluster, int prevPartition) { stickyPartitionCache.nextPartition(topic, cluster, prevPartition); } }
咱们主要关注UniformStickyPartitioner#partition()
方法,能够看到,它是直接经过一个cache类获取相同的分区,这表示新的record会一直发送到同一个分区中,除非生成新的batch,触发了UniformStickyPartitioner#onNewBatch()
方法才会换分区。
能够看看RoundRobinPartitioner#partition()
方法(即轮询分区策略)进行对比,就能发现比较明显的对比。
这个sticky partitioner最大的好处就是性能较好,按照官方给出的测试结果,使用sticky partitioner测试能够减小50%的延时,吞吐也有相对应的提升。我本身测了下数据基本出入不大。
另外说明下,在kafka2.4之后,默认的partitioner分区策略,已经包含了sticky partitioner了,因此升级到kafka2.4之后,并不须要任何修改就能享受到性能到极大提高。这里能够看下kafka2.4版本的策略说明:
/** * The default partitioning strategy: * <ul> * <li>If a partition is specified in the record, use it * <li>If no partition is specified but a key is present choose a partition based on a hash of the key * <li>If no partition or key is present choose the sticky partition that changes when the batch is full. * * See KIP-480 for details about sticky partitioning. */ public class DefaultPartitioner implements Partitioner {
有一点挺奇怪到,在测试过程当中(使用bin/kafka-producer-perf-test.sh测试),发现DefaultPartitioner
的性能要比UniformStickyPartitioner
的性能要好一些,不肯定是什么缘由,知道到小伙伴能够在评论区给出答案:)
参考:
KIP-392: Allow consumers to fetch from closest replica
KIP-480: Sticky Partitioner 以上~