当你编写kafka Producer时, 会生成KeyedMessage对象。程序员
KeyedMessage<K, V> keyedMessage = new KeyedMessage<>(topicName, key, message)
这里的key值能够为空,在这种状况下, kafka会将这个消息发送到哪一个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区:缓存
The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.服务器
可是这句话至关的误导人。
从字面上来说,这句话没有问题, 可是这里的随机是指在参数"topic.metadata.refresh.ms"刷新后随机选择一个, 这个时间段内老是使用惟一的分区。 默认状况下每十分钟才可能从新选择一个新的分区。 可是相信大部分的程序员和我同样, 都理解成每一个消息都会随机选择一个分区。
能够查看相关的代码:dom
private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist") val partition = if(key == null) { // If the key is null, we don't really need a partitioner // So we look up in the send partition cache for the topic to decide the target partition val id = sendPartitionPerTopicCache.get(topic) id match { case Some(partitionId) => // directly return the partitionId without checking availability of the leader, // since we want to postpone the failure until the send operation anyways partitionId case None => val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) partitionId } } else partitioner.partition(key, numPartitions) if(partition < 0 || partition >= numPartitions) throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic + "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]") trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition)) partition }
若是key为null, 它会从sendPartitionPerTopicCache查选缓存的分区, 若是没有,随机选择一个分区,不然就用缓存的分区。socket
LinkedIn工程师Guozhang Wang在邮件列表中解释了这一问题,
最初kafka是按照大部分用户理解的那样每次都随机选择一个分区, 后来改为了按期选择一个分区, 这是为了减小服务器段socket的数量。不过这的确很误导用户,据称0.8.2版本后又改回了每次随机选取。可是我查看0.8.2的代码还没看到改动。ide
因此,若是有可能,仍是为KeyedMessage设置一个key值吧。post
当你编写kafka Producer时, 会生成KeyedMessage对象。this
KeyedMessage<K, V> keyedMessage = new KeyedMessage<>(topicName, key, message)
这里的key值能够为空,在这种状况下, kafka会将这个消息发送到哪一个分区上呢?依据Kafka官方的文档, 默认的分区类会随机挑选一个分区:code
The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.orm
可是这句话至关的误导人。