客户端用法:java
kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); // 决定一个topic启动几个线程去拉取数据,即生成几个KafkaStream; Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(threads)); Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(topic); // 本质是调用了 ZookeeperConsumerConnector val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
queued.max.message.chunks
。消费者线程数量决定阻塞队列的个数。所以,分析Consumer,主要是分析ZookeeperConsumerConnector
。代码里面,有两个类,它们是什么关系呢?程序员
0.8.0 和 0.8.2.1
ZookeeperConsumerConnector
的源码不同,下面以0.8.2.1源码为主来分析,也就是从这个版本开始,能够将Offset存在Kafka的Broker中。(关注实现思想,忽略细节。)api
一个Consumer会建立一个ZookeeperConsumerConnector,表明一个消费者进程.缓存
createMessageStreams
建立消息流,反序列化messagesyncedRebalance
方法,去rebalance
消费者。以一段代码来讲明,消费的topic 12 partition,分配在3台broker机器上。数据结构
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("test-string-topic", new Integer(2)); //value表示consumer thread线程数量 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
下面是分配的状况:架构
consumer线程 | Blocking Queue | partitions |
---|---|---|
consumer thread1 | blockingQueue1 | 0,1,2,3,4,5 |
consumer thread2 | blockingQueue2 | 6,7,8,9,10,11 |
fetch线程 | partitions |
---|---|
fetch thread1 | 0,3,6,9 |
fetch thread2 | 1,4,7,10 |
fetch thread3 | 2,5,8,11 |
用户的consumer thread就使用2个BlockingQueue的数据进行处理;因此通常会使用2个consumer thread去消费这2个BlockingQueue数据。负载均衡
代码上调用:syncedRebalance方法在内部会调用def rebalance(cluster: Cluster): Boolean方法,去执行操做。
性能
topicRegistry=getCurrentConsumerPartitionInfo
最后,对每一个broker建立一个FetcherRunnable线程,并启动它。这个fetcher线程负责从Broker上不断获取数据,对每一个partition分别建立FetchRequest,最后把数据插入BlockingQueue的操做。fetch
KafkaStream
对ConsumerIterator
作了进一步的封装,咱们调用stream的next
方法就能够取到数据了(内部经过调用ConsumerIterator
的next
方法实现)线程
ConsumerIterator
的实现可能会形成数据的重复发送(这要看生产者如何生产数据),FetchedDataChunk
是一个数据集合,它内部会包含不少数据块,一个数据块可能包含多条消息,但同一个数据块中的消息只有一个offset,因此当一个消息块有多条数据,处理完部分数据发生异常时,消费者从新去取数据,就会再次取得这个数据块,而后消费过的数据就会被从新消费。
问题: Kafka中由Consumer维护消费状态,当Consumer消费消息时,支持2种模式commit消费状态,分别为当即commit和周期commit。前者会致使性能低下,作到消息投递刚好一次,但不多使用,后者性能高,一般用于实际应用,但极端条件下没法保证消息不丢失。
解决方案(这个问题太极端状况,不推荐,长个知识)
变成下面的: