在 Kafka 中,消费者一般是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响。Kafka 之因此要引入消费者群组这个概念是由于 Kafka 消费者常常会作一些高延迟的操做,好比把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些状况下,单个消费者没法跟上数据生成的速度。此时能够增长更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是 Kafka 实现横向伸缩的主要手段。java
须要注意的是:同一个分区只能被同一个消费者群组里面的一个消费者读取,不可能存在同一个分区被同一个消费者群里多个消费者共同读取的状况,如图:git
能够看到即使消费者 Consumer5 空闲了,可是也不会去读取任何一个分区的数据,这同时也提醒咱们在使用时应该合理设置消费者的数量,以避免形成闲置和额外开销。github
由于群组里的消费者共同读取主题的分区,因此当一个消费者被关闭或发生崩溃时,它就离开了群组,本来由它读取的分区将由群组里的其余消费者来读取。同时在主题发生变化时 , 好比添加了新的分区,也会发生分区与消费者的从新分配,分区的全部权从一个消费者转移到另外一个消费者,这样的行为被称为再均衡。正是由于再均衡,因此消费费者群组才能保证高可用性和伸缩性。数据库
消费者经过向群组协调器所在的 broker 发送心跳来维持它们和群组的从属关系以及它们对分区的全部权。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。若是消费者中止发送心跳的时间足够长,会话就会过时,群组协调器认为它已经死亡,就会触发再均衡。apache
在建立消费者的时候如下如下三个选项是必选的:bootstrap
除此以外你还须要指明你须要想订阅的主题,可使用以下两个 API :api
最后只须要经过轮询 API(poll
) 向服务器定时请求数据。一旦消费者订阅了主题,轮询就会处理全部的细节,包括群组协调、分区再均衡、发送心跳和获取数据,这使得开发者只须要关注从分区返回的数据,而后进行业务处理。 示例以下:服务器
String topic = "Hello-Kafka"; String group = "group1"; Properties props = new Properties(); props.put("bootstrap.servers", "hadoop001:9092"); /*指定分组 ID*/ props.put("group.id", group); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /*订阅主题 (s)*/ consumer.subscribe(Collections.singletonList(topic)); try { while (true) { /*轮询获取数据*/ ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", record.topic(), record.partition(), record.key(), record.value(), record.offset()); } } } finally { consumer.close(); }
本篇文章的全部示例代码能够从 Github 上进行下载:kafka-basissession
Kafka 的每一条消息都有一个偏移量属性,记录了其在分区中的位置,偏移量是一个单调递增的整数。消费者经过往一个叫做 _consumer_offset
的特殊主题发送消息,消息里包含每一个分区的偏移量。 若是消费者一直处于运行状态,那么偏移量就没有
什么用处。不过,若是有消费者退出或者新分区加入,此时就会触发再均衡。完成再均衡以后,每一个消费者可能分配到新的分区,而不是以前处理的那个。为了可以继续以前的工做,消费者须要读取每一个分区最后一次提交的偏移量,而后从偏移量指定的地方继续处理。 由于这个缘由,因此若是不能正确提交偏移量,就可能会致使数据丢失或者重复出现消费,好比下面状况:异步
Kafka 支持自动提交和手动提交偏移量两种方式。这里先介绍比较简单的自动提交:
只须要将消费者的 enable.auto.commit
属性配置为 true
便可完成自动提交的配置。 此时每隔固定的时间,消费者就会把 poll()
方法接收到的最大偏移量进行提交,提交间隔由 auto.commit.interval.ms
属性进行配置,默认值是 5s。
使用自动提交是存在隐患的,假设咱们使用默认的 5s 提交时间间隔,在最近一次提交以后的 3s 发生了再均衡,再均衡以后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s ,因此在这 3s 内到达的消息会被重复处理。能够经过修改提交时间间隔来更频繁地提交偏移量,减少可能出现重复消息的时间窗,不过这种状况是没法彻底避免的。基于这个缘由,Kafka 也提供了手动提交偏移量的 API,使得用户能够更为灵活的提交偏移量。
用户能够经过将 enable.auto.commit
设为 false
,而后手动提交偏移量。基于用户需求手动提交偏移量能够分为两大类:
而按照 Kafka API,手动提交偏移量又能够分为同步提交和异步提交。
经过调用 consumer.commitSync()
来进行同步提交,不传递任何参数时提交的是当前轮询的最大偏移量。
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*同步提交*/ consumer.commitSync(); }
若是某个提交失败,同步提交还会进行重试,这能够保证数据可以最大限度提交成功,可是同时也会下降程序的吞吐量。基于这个缘由,Kafka 还提供了异步提交的 API。
异步提交能够提升程序的吞吐量,由于此时你能够尽管请求数据,而不用等待 Broker 的响应。代码以下:
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*异步提交并定义回调*/ consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { System.out.println("错误处理"); offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset())); } } }); }
异步提交存在的问题是,在提交失败的时候不会进行自动重试,实际上也不能进行自动重试。假设程序同时提交了 200 和 300 的偏移量,此时 200 的偏移量失败的,可是紧随其后的 300 的偏移量成功了,此时若是重试就会存在 200 覆盖 300 偏移量的可能。同步提交就不存在这个问题,由于在同步提交的状况下,300 的提交请求必须等待服务器返回 200 提交请求的成功反馈后才会发出。基于这个缘由,某些状况下,须要同时组合同步和异步两种提交方式。
注:虽然程序不能在失败时候进行自动重试,可是咱们是能够手动进行重试的,你能够经过一个 Map<TopicPartition, Integer> offsets 来维护你提交的每一个分区的偏移量,而后当失败时候,你能够判断失败的偏移量是否小于你维护的同主题同分区的最后提交的偏移量,若是小于则表明你已经提交了更大的偏移量请求,此时不须要重试,不然就能够进行手动重试。
下面这种状况,在正常的轮询中使用异步提交来保证吞吐量,可是由于在最后即将要关闭消费者了,因此此时须要用同步提交来保证最大限度的提交成功。
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } // 异步提交 consumer.commitAsync(); } } catch (Exception e) { e.printStackTrace(); } finally { try { // 由于即将要关闭消费者,因此要用同步提交保证提交成功 consumer.commitSync(); } finally { consumer.close(); } }
在上面同步和异步提交的 API 中,实际上咱们都没有对 commit 方法传递参数,此时默认提交的是当前轮询的最大偏移量,若是你须要提交特定的偏移量,能够调用它们的重载方法。
/*同步提交特定偏移量*/ commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) /*异步提交特定偏移量*/ commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
须要注意的是,由于你能够订阅多个主题,因此 offsets
中必需要包含全部主题的每一个分区的偏移量,示例代码以下:
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); /*记录每一个主题的每一个分区的偏移量*/ TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset()+1, "no metaData"); /*TopicPartition 重写过 hashCode 和 equals 方法,因此可以保证同一主题和分区的实例不会被重复添加*/ offsets.put(topicPartition, offsetAndMetadata); } /*提交特定偏移量*/ consumer.commitAsync(offsets, null); } } finally { consumer.close(); }
由于分区再均衡会致使分区与消费者的从新划分,有时候你可能但愿在再均衡前执行一些操做:好比提交已经处理可是还没有提交的偏移量,关闭数据库链接等。此时能够在订阅主题时候,调用 subscribe
的重载方法传入自定义的分区再均衡监听器。
/*订阅指定集合内的全部主题*/ subscribe(Collection<String> topics, ConsumerRebalanceListener listener) /*使用正则匹配须要订阅的主题*/ subscribe(Pattern pattern, ConsumerRebalanceListener listener)
代码示例以下:
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() { /*该方法会在消费者中止读取消息以后,再均衡开始以前就调用*/ @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("再均衡即将触发"); // 提交已经处理的偏移量 consumer.commitSync(offsets); } /*该方法会在从新分配分区以后,消费者开始读取消息以前被调用*/ @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } }); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metaData"); /*TopicPartition 重写过 hashCode 和 equals 方法,因此可以保证同一主题和分区的实例不会被重复添加*/ offsets.put(topicPartition, offsetAndMetadata); } consumer.commitAsync(offsets, null); } } finally { consumer.close(); }
Kafka 提供了 consumer.wakeup()
方法用于退出轮询,它经过抛出 WakeupException
异常来跳出循环。须要注意的是,在退出线程时最好显示的调用 consumer.close()
, 此时消费者会提交任何尚未提交的东西,并向群组协调器发送消息,告知本身要离开群组,接下来就会触发再均衡 ,而不须要等待会话超时。
下面的示例代码为监听控制台输出,当输入 exit
时结束轮询,关闭消费者并退出程序:
/*调用 wakeup 优雅的退出*/ final Thread mainThread = Thread.currentThread(); new Thread(() -> { Scanner sc = new Scanner(System.in); while (sc.hasNext()) { if ("exit".equals(sc.next())) { consumer.wakeup(); try { /*等待主线程完成提交偏移量、关闭消费者等操做*/ mainThread.join(); break; } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> rd : records) { System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n", rd.topic(), rd.partition(), rd.key(), rd.value(), rd.offset()); } } } catch (WakeupException e) { //对于 wakeup() 调用引发的 WakeupException 异常能够没必要处理 } finally { consumer.close(); System.out.println("consumer 关闭"); }
由于 Kafka 的设计目标是高吞吐和低延迟,因此在 Kafka 中,消费者一般都是从属于某个群组的,这是由于单个消费者的处理能力是有限的。可是某些时候你的需求可能很简单,好比可能只须要一个消费者从一个主题的全部分区或者某个特定的分区读取数据,这个时候就不须要消费者群组和再均衡了, 只须要把主题或者分区分配给消费者,而后开始读取消息井提交偏移量便可。
在这种状况下,就不须要订阅主题, 取而代之的是消费者为本身分配分区。 一个消费者能够订阅主题(井加入消费者群组),或者为本身分配分区,但不能同时作这两件事情。 分配分区的示例代码以下:
List<TopicPartition> partitions = new ArrayList<>(); List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); /*能够指定读取哪些分区 如这里假设只读取主题的 0 分区*/ for (PartitionInfo partition : partitionInfos) { if (partition.partition()==0){ partitions.add(new TopicPartition(partition.topic(), partition.partition())); } } // 为消费者指定分区 consumer.assign(partitions); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<Integer, String> record : records) { System.out.printf("partition = %s, key = %d, value = %s\n", record.partition(), record.key(), record.value()); } consumer.commitSync(); }
消费者从服务器获取记录的最小字节数。若是可用的数据量小于设置值,broker 会等待有足够的可用数据时才会把它返回给消费者。
broker 返回给消费者数据的等待时间,默认是 500ms。
该属性指定了服务器从每一个分区返回给消费者的最大字节数,默认为 1MB。
消费者在被认为死亡以前能够与服务器断开链接的时间,默认是 3s。
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的状况下该做何处理:
是否自动提交偏移量,默认值是 true。为了不出现重复消费和数据丢失,能够把它设置为 false。
客户端 id,服务器用来识别消息的来源。
单次调用 poll()
方法可以返回的记录数量。
这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 表明使用操做系统的默认值。
更多大数据系列文章能够参见 GitHub 开源项目: 大数据入门指南