每一个Consumer 进程都会划归到一个逻辑的Consumer Group中,逻辑的订阅者是Consumer Group。因此一条message能够被多个订阅message 所在的topic的每个Consumer Group,也就好像是这条message被广播到每一个Consumer Group同样。而每一个Consumer Group中,相似于一个Queue(JMS中的Queue)的概念差很少,即一条消息只会被Consumer Group中的一个Consumer消费。apache
其实上面所说的订阅关系还不够明确,其实topic中的partition被分配到某个consumer上,也就是某个consumer订阅了某个partition。 再重复一下:consumer订阅的是partition,而不是message。因此在同一时间点上,订阅到同一个partition的consumer必然属于不一样的Consumer Group。bootstrap
在官方网站上,给出了这样一张图:缓存
一个kafka cluster中的某个topic,有4个partition。有两个consumer group (A and B)订阅了该topic。 Consumer Group A有2个partition:p0、p1,Consumer Group B有4个partition:c3,c4,c5,c6。通过分区分配后,consumer与partition的订阅关系以下:安全
Topic 中的4个partition在Consumer Group A中的分配状况以下: C1 订阅p0,p3 C2 订阅p1,p2 Topic 中的4个partition在Consumer Group B中的分配状况以下: C3 订阅p0 C4 订阅p3 C5 订阅p1 C6 订阅p2
另外要知道的是,partition分配的工做实际上是在consumer leader中完成的。网络
Consumer Group与Consumer的关系是动态维护的:session
当一个Consumer 进程挂掉 或者是卡住时,该consumer所订阅的partition会被从新分配到该group内的其它的consumer上。当一个consumer加入到一个consumer group中时,一样会从其它的consumer中分配出一个或者多个partition 到这个新加入的consumer。多线程
当启动一个Consumer时,会指定它要加入的group,使用的是配置项:group.id。fetch
为了维持Consumer 与 Consumer Group的关系,须要Consumer周期性的发送heartbeat到coordinator(协调者,在早期版本,以zookeeper做为协调者。后期版本则以某个broker做为协调者)。当Consumer因为某种缘由不能发Heartbeat到coordinator时,而且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。而这个过程,被称为rebalance。网站
那么如今有这样一个问题:若是一个consumer 进程一直在周期性的发送heartbeat,可是它就是不消费消息,这种状态称为livelock状态。那么在这种状态下,它所订阅的partition不消息是否就一直不能被消费呢?spa
Coordinator 协调者,协调consumer、broker。早期版本中Coordinator,使用zookeeper实现,可是这样作,rebalance的负担过重。为了解决scalable的问题,再也不使用zookeeper,而是让每一个broker来负责一些group的管理,这样consumer就彻底再也不依赖zookeeper了。
从Consumer的实现来看,在执行poll或者是join group以前,都要保证已链接到Coordinator。链接到coordinator的过程是:
1)链接到最后一次链接的broker(若是是刚启动的consumer,则要根据配置中的borker)。它会响应一个包含coordinator信息(host, port等)的response。
2)链接到coordinator。
Consumer Group 管理中,也是须要coordinator的参与。一个Consumer要join到一个group中,或者一个consumer退出时,都要进行rebalance。进行rebalance的流程是:
1)会给一个coordinator发起Join请求(请求中要包括本身的一些元数据,例如本身感兴趣的topics)
2)Coordinator 根据这些consumer的join请求,选择出一个leader,并通知给各个consumer。这里的leader是consumer group 内的leader,是由某个consumer担任,不要与partition的leader混淆。
3)Consumer leader 根据这些consumer的metadata,从新为每一个consumer member从新分配partition。分配完毕经过coordinator把最新分配状况同步给每一个consumer。
4)Consumer拿到最新的分配后,继续工做。
在Kafka partition中,每一个消息有一个惟一标识,即partition内的offset。每一个consumer group中的订阅到某个partition的consumer在从partition中读取数据时,是依次读取的。
上图中,Consumer A、B分属于不用的Consumer Group。Consumer B读取到offset =11,Consumer A读取到offset=9 。这个值表示Consumer Group中的某个Consumer 在下次读取该partition时会从哪一个offset的 message开始读取,即 Consumer Group A 中的Consumer下次会从offset = 9 的message 读取, Consumer Group B 中的Consumer下次会从offset = 11 的message 读取。
这里并无说是Consumer A 下次会从offset = 9 的message读取,缘由是Consumer A可能会退出Group ,而后Group A 进行rebalance,即从新分配分区。
Consumer读取partition中的数据是经过调用发起一个fetch请求来执行的。而从KafkaConsumer来看,它有一个poll方法。可是这个poll方法只是可能会发起fetch请求。缘由是:Consumer每次发起fetch请求时,读取到的数据是有限制的,经过配置项max.partition.fetch.bytes来限制的。而在执行poll方法时,会根据配置项个max.poll.records来限制一次最多pool多少个record。
那么就可能出现这样的状况: 在知足max.partition.fetch.bytes限制的状况下,假如fetch到了100个record,放到本地缓存后,因为max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就须要执行7次才能将这一次经过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。
在consumer中,还有另一个配置项:max.poll.interval.ms ,它表示最大的poll数据间隔,若是超过这个间隔没有发起pool请求,但heartbeat仍旧在发,就认为该consumer处于 livelock状态。就会将该consumer退出consumer group。因此为了避免使Consumer 本身被退出,Consumer 应该不停的发起poll(timeout)操做。而这个动做 KafkaConsumer Client是不会帮咱们作的,这就须要本身在程序中不停的调用poll方法了。
当一个consumer因某种缘由退出Group时,进行从新分配partition后,同一group中的另外一个consumer在读取该partition时,怎么可以知道上一个consumer该从哪一个offset的message读取呢?也是是如何保证同一个group内的consumer不重复消费消息呢?上面说了一次走网络的fetch请求会拉取到必定量的数据,可是这些数据尚未被消息完毕,Consumer就挂掉了,下一次进行数据fetch时,是否会从上次读到的数据开始读取,而致使Consumer消费的数据丢失吗?
为了作到这一点,当使用完poll从本地缓存拉取到数据以后,须要client调用commitSync方法(或者commitAsync方法)去commit 下一次该去读取 哪个offset的message。
而这个commit方法会经过走网络的commit请求将offset在coordinator中保留,这样就可以保证下一次读取(不论进行了rebalance)时,既不会重复消费消息,也不会遗漏消息。
对于offset的commit,Kafka Consumer Java Client支持两种模式:由KafkaConsumer自动提交,或者是用户经过调用commitSync、commitAsync方法的方式完成offset的提交。
自动提交的例子:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
手动提交的例子:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } }
在手动提交时,须要注意的一点是:要提交的是下一次要读取的offset,例如:
try { while(running) { // 取得消息 ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); // 根据分区来遍历数据: for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); // 数据处理 for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } // 取得当前读取到的最后一条记录的offset long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 提交offset,记得要 + 1 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); }
KafkaProducer是线程安全的,上一节已经了解到。但Consumer却没有设计成线程安全的。当用户想要在在多线程环境下使用kafkaConsumer时,须要本身来保证synchronized。若是没有这样的保证,就会抛出ConcurrentModificatinException的。
当你想要关闭Consumer或者为也其它的目的想要中断Consumer的处理时,能够调用consumer的wakeup方法。这个方法会抛出WakeupException。
例如:
public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(10000); // Handle new records } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } }
在kafka 0.9+使用Java Consumer替代了老版本的scala Consumer。新版的配置以下:
·bootstrap.servers
在启动consumer时配置的broker地址的。不须要将cluster中全部的broker都配置上,由于启动后会自动的发现cluster全部的broker。
它配置的格式是:host1:port1;host2:port2…
·key.descrializer、value.descrializer
Message record 的key, value的反序列化类。
·group.id
用于表示该consumer想要加入到哪一个group中。默认值是 “”。
·heartbeat.interval.ms
心跳间隔。心跳是在consumer与coordinator之间进行的。心跳是肯定consumer存活,加入或者退出group的有效手段。
这个值必须设置的小于session.timeout.ms,由于:
当Consumer因为某种缘由不能发Heartbeat到coordinator时,而且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。
一般设置的值要低于session.timeout.ms的1/3。
默认值是:3000 (3s)
·session.timeout.ms
Consumer session 过时时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
其默认值是:10000 (10 s)
·enable.auto.commit
Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit。
默认值是true。
·auto.commit.interval.ms
自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)
·auto.offset.reset
这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(若是一个record被删除,就确定不存在了)时,该如何处理。它有4种处理方式:
1) earliest:自动重置到最先的offset。
2) latest:看上去重置到最晚的offset。
3) none:若是边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。
4) 若是不是上述3种,只抛出异常给consumer。
默认值是latest。
·connections.max.idle.ms
链接空闲超时时间。由于consumer只与broker有链接(coordinator也是一个broker),因此这个配置的是consumer到broker之间的。
默认值是:540000 (9 min)
·fetch.max.wait.ms
Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。
·fetch.min.bytes
当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。若是broker中数据量不够的话会wait,直到数据大小知足这个条件。
取值范围是:[0, Integer.Max],默认值是1。
默认值设置为1的目的是:使得consumer的请求可以尽快的返回。
·fetch.max.bytes
一次fetch请求,从一个broker中取得的records最大大小。若是在从topic中第一个非空的partition取消息时,若是取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片状况下,只会返回这一条record。
broker、topic都会对producer发给它的message size作限制。因此在配置这值时,能够参考broker的message.max.bytes 和 topic的max.message.bytes的配置。
取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)
·max.partition.fetch.bytes
一次fetch请求,从一个partition中取得的records最大大小。若是在从topic中第一个非空的partition取消息时,若是取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片状况下,只会返回这一条record。
broker、topic都会对producer发给它的message size作限制。因此在配置这值时,能够参考broker的message.max.bytes 和 topic的max.message.bytes的配置。
·max.poll.interval.ms
前面说过要求程序中不间断的调用poll()。若是长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。
·max.poll.records
Consumer每次调用poll()时取到的records的最大数。
·receive.buffer.byte
Consumer receiver buffer (SO_RCVBUF)的大小。这个值在建立Socket链接时会用到。
取值范围是:[-1, Integer.MAX]。默认值是:65536 (64 KB)
若是值设置为-1,则会使用操做系统默认的值。
·request.timeout.ms
请求发起后,并不必定会很快接收到响应信息。这个配置就是来配置请求超时时间的。默认值是:305000 (305 s)
·client.id
Consumer进程的标识。若是设置一我的为可读的值,跟踪问题会比较方便。
·interceptor.classes
用户自定义interceptor。
·metadata.max.age.ms
Metadata数据的刷新间隔。即使没有任何的partition订阅关系变动也行执行。
范围是:[0, Integer.MAX],默认值是:300000 (5 min)