为了更好的实现负载均衡和消息的顺序性,kafka的producer在分发消息时能够经过分发策略发送给指定的partition。实现分发的程序是须要制定消息的key值,而kafka经过key进行策略分发。 java
为了更好的弄清楚相关原理,咱们从kafka自己提供的分发函数分析:
源代码以下: 负载均衡
private[kafka] class DefaultPartitioner[T] extends Partitioner[T] { private val random = new java.util.Random def partition(key: T, numPartitions: Int): Int = { if(key == null) { println("key is null") random.nextInt(numPartitions) } else { println("key is "+ key + " hashcode is "+key.hashCode) math.abs(key.hashCode) % numPartitions } } }
上述类对key进行了模版封装,所以key 能够提供Int,String等类型。
其中numPartitions是来自ZKBrokerPartitionInfo生成的数据,具体代码是: dom
val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)(Tips:从上面能够看到,咱们能够更多的扩展分区信息,多多利用zookeeper提供的信息,好比sortedBrokerPartitions等等)
不少时候咱们可能要本身实现一个分区函数,具体的使用方式就是:
函数
private Properties props = new Properties(); ... props.put("partitioner.class","***/***/TestPartition");//必定要写对路径和partitioner.class
具体的实现代码就是改装自DefaultPartitioner的java实现方式,一并贴上: spa
public class TestPartition implements Partitioner<String>{ public int partition(String key,int numPartitions) { //System.out.println("Fuck!!!!"); System.out.print("partitions number is "+numPartitions+" "); if (key == null) { Random random = new Random(); System.out.println("key is null "); return random.nextInt(numPartitions); } else { int result = Math.abs(key.hashCode())%numPartitions; //很奇怪, //hashCode 会生成负数,奇葩,因此加绝对值 System.out.println("key is "+ key+ " partitions is "+ result); return result; } } }而发送消息使用方式:
List<String> messages = new java.util.ArrayList<String>(); String messageString = "test-message"+Integer.toString(messageNo); messages.add(messageString); //producer.send(new ProducerData<String, String>(topic,"test_key", messageStr)); ProducerData<String, String> data = new ProducerData<String, String>(topic, messageString, messages); producer.send(data);kafka官方文档中直接使用
ProducerData<String, String> data = new ProducerData<String, String>(topic, “xxx”, "XXX"); producer.send(data);可是我没有实现,第三个参数用String会报错。