咱们知道Kafka 的消息经过topic进行分类。topic能够被分为若干个partition来存储消息。消息以追加的方式写入partition,而后以先入先出的顺序读取。java
下面是topic和partition的关系图:spa
咱们通常会在server.conf中经过num.partitions参数指定建立topic时包含多少个partition。默认是num.partitions=1。代理
既然一个topic有多个partition,那么消息是怎么样分配到partition的呢?code
生产者生产一个消息send到topic分区器,分区器会根据消息里面的分区参数key值把消息分到对应的partition。这里就像咱们快递代发网点同样,快递代发网点能够代理不少种快递公司,若是要寄快递者P(生产者)指定用什么快递公司,代发网点人员C(分区器)就会把该物品M(消息)归类到指定的快递公司区域存放。若是P不要求具体的快递公司寄件,那么就由C随意分配快递公司(哈哈,那就要看这个家伙的心情了,心情好点给你一个顺丰比较快到达,心情很差时就GG吧)。server
下面是Kafka对消息分配分区 DefaultPartitioner.java 类的核心代码:blog
1 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 2 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 3 int numPartitions = partitions.size(); 4 if (keyBytes == null) { 5 int nextValue = counter.getAndIncrement(); 6 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); 7 if (availablePartitions.size() > 0) { 8 int part = Utils.toPositive(nextValue) % availablePartitions.size(); 9 return availablePartitions.get(part).partition(); 10 } else { 11 // no partitions are available, give a non-available partition 12 return Utils.toPositive(nextValue) % numPartitions; 13 } 14 } else { 15 // hash the keyBytes to choose a partition 16 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; 17 } 18 }
第四、7行:若是没有指定key值而且可用分区个数大于0时,在就可用分区中作轮询决定改消息分配到哪一个partition。rem
第四、10行:若是没有指定key值而且没有可用分区时,在全部分区中轮询决定改消息分配到哪一个partition。get
第14行:若是指定key值,对key作hash分配到指定的partition。hash
因此当同一个key的消息会被分配到同一个partition中。消息在同一个partition处理的顺序是FIFO,这就保证了消息的顺序性。it