kafka使用分区将topic的消息打散到多个分区分布保存在不一样的broker上,实现了producer和consumer消息处理的高吞吐量。Kafka的producer和consumer均可以多线程地并行操做,而每一个线程处理的是一个分区的数据。所以分区其实是调优Kafka并行度的最小单元。对于producer而言,它其实是用多个线程并发地向不一样分区所在的broker发起Socket链接同时给这些分区发送消息;而consumer,同一个消费组内的全部consumer线程都被指定topic的某一个分区进行消费。javascript
因此说,若是一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。java
分区是否越多越好呢?显然也不是,由于每一个分区都有本身的开销:python
1、客户端/服务器端须要使用的内存就越多 Kafka0.8.2以后,在客户端producer有个参数batch.size,默认是16KB。它会为每一个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个可以提高性能的设计。不过很显然,由于这个参数是分区级别的,若是分区数越多,这部分缓存所需的内存占用也会更多。假设你有10000个分区,按照默认设置,这部分缓存须要占用约157MB的内存。而consumer端呢?咱们抛开获取数据所需的内存不说,只说线程的开销。若是仍是假设有10000个分区,同时consumer线程数要匹配分区数(大部分状况下是最佳的消费吞吐量配置)的话,那么在consumer client就要建立10000个线程,也须要建立大约10000个Socket去获取分区数据。这里面的线程切换的开销自己已经不容小觑了。
服务器端的开销也不小,若是阅读Kafka源码的话能够发现,服务器端的不少组件都在内存中维护了分区级别的缓存,好比controller,FetcherManager等,所以分区数越多,这种缓存的成本就越大。
2、文件句柄的开销 每一个分区在底层文件系统都有属于本身的一个目录。该目录下一般会有两个文件: base_offset.log和base_offset.index。Kafak的controller和ReplicaManager会为每一个broker都保存这两个文件句柄(file handler)。很明显,若是分区数越多,所须要保持打开状态的文件句柄数也就越多,最终可能会突破你的ulimit -n的限制。
3、下降高可用性 Kafka经过副本(replica)机制来保证高可用。具体作法就是为每一个分区保存若干个副本(replica_factor指定副本数)。每一个副本保存在不一样的broker上。期中的一个副本充当leader 副本,负责处理producer和consumer请求。其余副本充当follower角色,由Kafka controller负责保证与leader的同步。若是leader所在的broker挂掉了,contorller会检测到而后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分状况下可能只是几毫秒级别。但若是你有10000个分区,10个broker,也就是说平均每一个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller须要当即对这1000个分区进行leader选举。比起不多的分区leader选举而言,这必然要花更长的时间,而且一般不是线性累加的。若是这个broker还同时是controller状况就更糟了。缓存
能够遵循必定的步骤来尝试肯定分区数:建立一个只有1个分区的topic,而后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位能够是MB/s。而后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)服务器
说明:Tp表示producer的吞吐量。测试producer一般是很容易的,由于它的逻辑很是简单,就是直接发送消息到Kafka就行了。Tc表示consumer的吞吐量。测试Tc一般与应用的关系更大, 由于Tc的值取决于你拿到消息以后执行什么操做,所以Tc的测试一般也要麻烦一些。多线程
默认状况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions:并发
def partition(key: Any, numPartitions: Int): Int = { Utils.abs(key.hashCode) % numPartitions }
这保证了相同key的消息必定会被路由到相同的分区。dom
若是你没有指定key,那么Kafka是如何肯定这条消息去往哪一个分区的呢?性能
if(key == null) { // 若是没有指定key val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有没有缓存的现成的分区Id id match { case Some(partitionId) => partitionId // 若是有的话直接使用这个分区Id就行了 case None => // 若是没有的话, val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出全部可用分区的leader所在的broker if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size // 从中随机挑一个 val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用 partitionId } }
不指定key时,Kafka几乎就是随机找一个分区发送无key的消息,而后把这个分区号加入到缓存中以备后面直接使用——固然了,Kafka自己也会清空该缓存(默认每10分钟或每次请求topic元数据时)。测试
topic下的一个分区只能被同一个consumer group下的一个consumer线程来消费,但反之并不成立,即一个consumer线程能够消费多个分区的数据,好比Kafka提供的ConsoleConsumer,默认就只是一个线程来消费全部分区的数据。
即分区数决定了同组消费者个数的上限
因此,若是你的分区数是N,那么最好线程数也保持为N,这样一般可以达到最大的吞吐量。超过N的配置只是浪费系统资源,由于多出的线程不会被分配到任何分区。
Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。
当如下事件发生时,Kafka 将会进行一次分区分配:
将分区的全部权从一个消费者移到另外一个消费者称为从新平衡(rebalance),如何rebalance就涉及到本文提到的分区分配策略。
下面咱们将详细介绍 Kafka 内置的两种分区分配策略。本文假设咱们有个名为 T1 的主题,其包含了10个分区,而后咱们有两个消费者(C1,C2)
来消费这10个分区里面的数据,并且 C1 的 num.streams = 1,C2 的 num.streams = 2。
Range策略是对每一个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。在咱们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C2-1。而后将partitions的个数除于消费者线程的总数来决定每一个消费者线程消费几个分区。若是除不尽,那么前面几个消费者线程将会多消费一个分区。在咱们的例子里面,咱们有10个分区,3个消费者线程, 10 / 3 = 3,并且除不尽,那么消费者线程 C1-0 将会多消费一个分区,因此最后分区分配的结果看起来是这样的:
假如咱们有11个分区,那么最后分区分配的结果看起来是这样的:
假如咱们有2个主题(T1和T2),分别有10个分区,那么最后分区分配的结果看起来是这样的:
能够看出,C1-0 消费者线程比其余消费者线程多消费了2个分区,这就是Range strategy的一个很明显的弊端。
使用RoundRobin策略有两个前提条件必须知足:
因此这里假设前面提到的2个消费者的num.streams = 2。RoundRobin策略的工做原理:将全部主题的分区组成 TopicAndPartition 列表,而后对 TopicAndPartition 列表按照 hashCode 进行排序,看下面的代码应该会明白:
val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) => info("Consumer %s rebalancing the following partitions for topic %s: %s" .format(ctx.consumerId, topic, partitions)) partitions.map(partition => { TopicAndPartition(topic, partition) }) }.toSeq.sortWith((topicPartition1, topicPartition2) => { /* * Randomize the order by taking the hashcode to reduce the likelihood of all partitions of a given topic ending * up on one consumer (if it has a high enough stream count). */ topicPartition1.toString.hashCode < topicPartition2.toString.hashCode })
最后按照round-robin风格将分区分别分配给不一样的消费者线程。
在这个的例子里面,假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,咱们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:
多个主题的分区分配和单个主题相似。遗憾的是,目前咱们还不能自定义分区分配策略,只能经过partition.assignment.strategy参数选择 range 或 roundrobin。