如何肯定Kafka的分区数、key和consumer线程数

【原创】如何肯定Kafka的分区数、key和consumer线程数

 
在Kafak中国社区的qq群中,这个问题被说起的比例是至关高的,这也是Kafka用户最常碰到的问题之一。本文结合Kafka源码试图对该问题相关的因素进行探讨。但愿对你们有所帮助。
 
怎么肯定分区数?
    “我应该选择几个分区?”——若是你在Kafka中国社区的群里,这样的问题你会常常碰到的。不过有些遗憾的是,咱们彷佛并无很权威的答案可以解答这样的问题。其实这也不奇怪,毕竟这样的问题一般都是没有固定答案的。Kafka官网上标榜本身是"high-throughput distributed messaging system",即一个高吞吐量的分布式消息引擎。那么怎么达到高吞吐量呢?Kafka在底层摒弃了Java堆缓存机制,采用了操做系统级别的页缓存,同时将随机写操做改成顺序写,再结合Zero-Copy的特性极大地改善了IO性能。可是,这只是一个方面,毕竟单机优化的能力是有上限的。如何经过水平扩展甚至是线性扩展来进一步提高吞吐量呢? Kafka就是使用了分区(partition),经过将topic的消息打散到多个分区并分布保存在不一样的broker上实现了消息处理(不论是producer仍是consumer)的高吞吐量。
    Kafka的生产者和消费者均可以多线程地并行操做,而每一个线程处理的是一个分区的数据。所以分区其实是调优Kafka并行度的最小单元。对于producer而言,它其实是用多个线程并发地向不一样分区所在的broker发起Socket链接同时给这些分区发送消息;而consumer呢,同一个消费组内的全部consumer线程都被指定topic的某一个分区进行消费(具体如何肯定consumer线程数目咱们后面会详细说明)。因此说,若是一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。
    但分区是否越多越好呢?显然也不是,由于每一个分区都有本身的开销:
1、客户端/服务器端须要使用的内存就越多
    先说说客户端的状况。Kafka 0.8.2以后推出了Java版的全新的producer,这个producer有个参数batch.size,默认是16KB。它会为每一个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个可以提高性能的设计。不过很显然,由于这个参数是分区级别的,若是分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存须要占用约157MB的内存。而consumer端呢?咱们抛开获取数据所需的内存不说,只说线程的开销。若是仍是假设有10000个分区,同时consumer线程数要匹配分区数(大部分状况下是最佳的消费吞吐量配置)的话,那么在consumer client就要建立10000个线程,也须要建立大约10000个Socket去获取分区数据。这里面的线程切换的开销自己已经不容小觑了。
    服务器端的开销也不小,若是阅读Kafka源码的话能够发现,服务器端的不少组件都在内存中维护了分区级别的缓存,好比controller,FetcherManager等,所以分区数越多,这种缓存的成本越久越大。
2、文件句柄的开销
    每一个分区在底层文件系统都有属于本身的一个目录。该目录下一般会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每一个broker都保存这两个文件句柄(file handler)。很明显,若是分区数越多,所须要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。
3、下降高可用性
    Kafka经过副本(replica)机制来保证高可用。具体作法就是为每一个分区保存若干个副本(replica_factor指定副本数)。每一个副本保存在不一样的broker上。期中的一个副本充当leader 副本,负责处理producer和consumer请求。其余副本充当follower角色,由Kafka controller负责保证与leader的同步。若是leader所在的broker挂掉了,contorller会检测到而后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分状况下可能只是几毫秒级别。但若是你有10000个分区,10个broker,也就是说平均每一个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller须要当即对这1000个分区进行leader选举。比起不多的分区leader选举而言,这必然要花更长的时间,而且一般不是线性累加的。若是这个broker还同时是controller状况就更糟了。
  说了这么多“废话”,不少人确定已经不耐烦了。那你说到底要怎么肯定分区数呢?答案就是:视状况而定。基本上你仍是须要经过一系列实验和测试来肯定。固然测试的依据应该是吞吐量。虽然LinkedIn这篇文章作了Kafka的基准测试,但它的结果其实对你意义不大,由于不一样的硬件、软件、负载状况测试出来的结果必然不同。我常常碰到的问题相似于,官网说每秒能到10MB,为何个人producer每秒才1MB? —— 且不说硬件条件,最后发现他使用的消息体有1KB,而官网的基准测试是用100B测出来的,所以根本没有可比性。不过你依然能够遵循必定的步骤来尝试肯定分区数:建立一个只有1个分区的topic,而后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位能够是MB/s。而后假设总的目标吞吐量是Tt,那么分区数 =  Tt / max(Tp, Tc)
    Tp表示producer的吞吐量。测试producer一般是很容易的,由于它的逻辑很是简单,就是直接发送消息到Kafka就行了。Tc表示consumer的吞吐量。测试Tc一般与应用的关系更大, 由于Tc的值取决于你拿到消息以后执行什么操做,所以Tc的测试一般也要麻烦一些。
    另外,Kafka并不能真正地作到线性扩展(其实任何系统都不能),因此你在规划你的分区数的时候最好多规划一下,这样将来扩展时候也更加方便。
 
消息-分区的分配
默认状况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions,以下图所示:
def partition(key: Any, numPartitions: Int): Int = {
    Utils.abs(key.hashCode) % numPartitions
}

  这就保证了相同key的消息必定会被路由到相同的分区。若是你没有指定key,那么Kafka是如何肯定这条消息去往哪一个分区的呢?html

复制代码
if(key == null) {  // 若是没有指定key
        val id = sendPartitionPerTopicCache.get(topic)  // 先看看Kafka有没有缓存的现成的分区Id
        id match {
          case Some(partitionId) =>  
            partitionId  // 若是有的话直接使用这个分区Id就行了
          case None => // 若是没有的话,
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)  //找出全部可用分区的leader所在的broker
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size  // 从中随机挑一个
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用
            partitionId
        }
      }
复制代码

  能够看出,Kafka几乎就是随机找一个分区发送无key的消息,而后把这个分区号加入到缓存中以备后面直接使用——固然了,Kafka自己也会清空该缓存(默认每10分钟或每次请求topic元数据时)算法

如何设定consumer线程数
    我我的的观点,若是你的分区数是N,那么最好线程数也保持为N,这样一般可以达到最大的吞吐量。超过N的配置只是浪费系统资源,由于多出的线程不会被分配到任何分区。让咱们来看看具体Kafka是如何分配的。
    topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程能够消费多个分区的数据,好比Kafka提供的ConsoleConsumer,默认就只是一个线程来消费全部分区的数据。——其实ConsoleConsumer可使用通配符的功能实现同时消费多个topic数据,但这和本文无关。
    再讨论分配策略以前,先说说KafkaStream——它是consumer的关键类,提供了遍历方法用于consumer程序调用实现数据的消费。其底层维护了一个阻塞队列,因此在没有新消息到来时,consumer是处于阻塞状态的,表现出来的状态就是consumer程序一直在等待新消息的到来。——你固然能够配置成带超时的consumer,具体参看参数consumer.timeout.ms的用法。
    下面说说Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了,假设你有10个分区,P0 ~ P9,consumer线程数是3, C0 ~ C2,那么每一个线程都分配哪些分区呢?
 
C0 消费分区 0, 1, 2, 3
C1 消费分区 4, 5, 6
C2 消费分区 7, 8, 9
  
具体算法就是:
复制代码
val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每一个consumer至少保证消费的分区数
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 还剩下多少个分区须要单独分配给开头的线程们
...
for (consumerThreadId <- consumerThreadIdSet) {   // 对于每个consumer线程
        val myConsumerPosition = curConsumers.indexOf(consumerThreadId)  //算出该线程在全部线程中的位置,介于[0, n-1]
        assert(myConsumerPosition >= 0)
// startPart 就是这个线程要消费的起始分区数
        val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
// nParts 就是这个线程总共要消费多少个分区
        val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
...
}
复制代码

针对于这个例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart为10%3=1,说明每一个线程至少保证3个分区,还剩下1个分区须要单独分配给开头的若干个线程。这就是为何C0消费4个分区,后面的2个线程每一个消费3个分区,具体过程详见下面的Debug截图信息:缓存

 ctx.myTopicThreadIds
nPartsPerConsumer = 10 / 3  = 3
nConsumersWithExtraPart = 10 % 3 = 1
第一次:
myConsumerPosition = 1
startPart = 1 * 3 + min(1, 1) = 4 ---也就是从分区4开始读
nParts = 3 + (if (1 + 1 > 1) 0 else 1) = 3 读取3个分区, 即4,5,6
第二次:
myConsumerPosition = 0
startPart = 3 * 0 + min(1, 0) =0  --- 从分区0开始读
nParts = 3 + (if (0 + 1 > 1) 0 else 1) = 4 读取4个分区,即0,1,2,3
第三次:
myConsumerPosition = 2
startPart = 3 * 2 + min(2, 1) = 7 --- 从分区7开始读
nParts = 3 + if (2 + 1 > 1) 0 else 1) = 3 读取3个分区,即7, 8, 9
至此10个分区都已经分配完毕
 
说到这里,常常有个需求就是我想让某个consumer线程消费指定的分区而不消费其余的分区。坦率来讲,目前Kafka并无提供自定义分配策略。作到这点很难,但仔细想想,也许咱们指望Kafka作的事情太多了,毕竟它只是个消息引擎,在Kafka中加入消息消费的逻辑也许并非Kafka该作的事情。
相关文章
相关标签/搜索