本文主要研究下kafka0.8版本api的topicCountMap与topic的partition的关系。html
物理上把topic分红一个或多个partition,每一个partition在物理上对应一个文件夹,该文件夹下存储 这个partition的全部消息和索引文件。c++
kafka producer发送消息的时候,若是有key的话,根据key进行hash,而后分发到指定的partition;若是没有key则按counter进行partition。apache
若是增减consumer,broker,partition会致使rebalance,rebalance后consumer对应的partition会发生变化。api
好比减小一个consumer,而后rebalance以后,consumer对应的partition会进行从新调整映射。session
告诉Kafka咱们在Consumer中将用多少个线程来消费该topic。topicCountMap的key是topic name,value针对该topic是线程的数量。并发
假设有个topic,有6个partiton,而后启动了两个consumer,每一个consumer的topicCount为3,则观察会发现,每一个consumer的消费线程都在运行;
若是每一个consumer的topicCount变为4,则会发现,先启动的consmer中4个线程都在运行,然后启动的consumer中只有2个线程在运行,其余2个被阻塞住了。dom
也就是说,对于consumer来讲,实际的消费个数=consumer实例个数*每一个consumer的topicCount个数,若是这个值>partition,则会形成某些消费线程多余,阻塞住。
若是这个值<=partition,则全部消费线程都在消费。
所以实际分布式部署consumer的时候,其consumer实例个数*每一个consumer的topicCount个数<=topic的partition值。分布式
sh kafka-topics.sh --create --topic topic20170921 --replication-factor 1 --partitions 6 --zookeeper localhost:2181
sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test-group-0921
public class NativeConsumer { ExecutorService pool = Executors.newFixedThreadPool(10); public void exec(String topic,String zk,int consumerCount,String group) throws UnsupportedEncodingException { Properties props = new Properties(); props.put("zookeeper.connect", zk); // props.put("auto.offset.reset","smallest"); props.put("group.id",group); props.put("zookeeper.session.timeout.ms", "10000"); props.put("zookeeper.sync.time.ms", "2000"); props.put("auto.commit.interval.ms", "10000"); props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range"); ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, consumerCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap); consumerMap.get(topic).stream().forEach(stream -> { pool.submit(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message())); } } }); }); } }
public class NativeProducer { public void produce(String topic,String brokerAddr) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerAddr); props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props)) { int totalCountOfSendedMessages = 0; long totalSendTime = 0; long timeOfLastUpdate = 0; int countOfMessagesInSec = 0; for(int i=0;i<1000000;i++){ //todo key不能相同,不然都发送到同一个partition了,消费者没法scale out byte[] dataKey = SerializationUtils.serialize(UUID.randomUUID().toString()); byte[] dataValue = SerializationUtils.serialize(UUID.randomUUID().toString()); ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>( topic, dataKey, dataValue ); long sendingStartTime = System.currentTimeMillis(); // Sync send producer.send(producerRecord).get(); Thread.sleep(100); long currentTime = System.currentTimeMillis(); long sendTime = currentTime - sendingStartTime; totalSendTime += sendTime; totalCountOfSendedMessages++; countOfMessagesInSec++; if (currentTime - timeOfLastUpdate > TimeUnit.SECONDS.toMillis(1)) { System.out.println("Average send time: " + (double) (totalSendTime / totalCountOfSendedMessages) + " ms."); System.out.println("Count of messages in second: " + countOfMessagesInSec); timeOfLastUpdate = currentTime; countOfMessagesInSec = 0; } } } } }
String zkAddr = "localhost:2181"; String topic = "topic20170921"; //partition 6 String brokerAddr = "localhost:9092"; String group = "test-group-0921"; @Test public void testConsumer1() throws InterruptedException { NativeConsumer nativeConsumer = new NativeConsumer(); try { nativeConsumer.exec(topic,zkAddr,4,group); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } Thread.sleep(100000); } @Test public void testConsumer2() throws InterruptedException { NativeConsumer nativeConsumer = new NativeConsumer(); try { nativeConsumer.exec(topic,zkAddr,4,group); } catch (UnsupportedEncodingException e1) { e1.printStackTrace(); } Thread.sleep(100000); } @Test public void testProducer() throws UnsupportedEncodingException, InterruptedException { NativeProducer producer = new NativeProducer(); try { producer.produce(topic,brokerAddr); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }