【Kafka】《Kafka权威指南》——从Kafka读取数据

应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 。 从 Kafka 读取数据不一样于从其余悄息系统读取数据,它涉及一些独特的概念和想法。若是不先理解 这些概念,就难以理解如何使用消费者 API。因此咱们接下来先解释这些重要的概念,然 后再举几个例子,横示如何使用消费者 API 实现不一样的应用程序。java

消费者和消费者群组

假设咱们有一个应用程序须要从-个 Kafka主题读取消息井验证这些消息,而后再把它们 保存起来。应用程序须要建立一个消费者对象,订阅主题并开始接收消息,而后验证消息 井保存结果。过了 一阵子,生产者往主题写入消息的速度超过了应用程序验证数据的速 度,这个时候该怎么办?若是只使用单个消费者处理消息,应用程序会远跟不上消息生成 的速度。显然,此时颇有必要对消费者进行横向伸缩。就像多个生产者能够向相同的 主题 写入消息同样,咱们也可使用多个消费者从同一个主题读取消息,对消息进行分流。正则表达式

Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每一个消费者 接收主题一部分分区的消息。数据库

假设主题 T1 有 4 个分区,咱们建立了消费者 C1 ,它是群组 G1 里惟 一 的消费者,咱们用 它订阅主题 T1。消费者 Cl1将收到主题 T1所有 4个分区的消息,如图 4-1 所示。apache

若是在群组 G1 里新增一个消费者 C2,那么每一个消费者将分别从两个分区接收消息。我 假设消费者 C1接收分区 0 和分区 2 的消息,消费者 C2 接收分区 1 和分区 3 的消息,如图 4-2 所示。bootstrap

若是群组 G1 有 4 个消费者,那么每一个消费者能够分配到 一个分区,如图 4-3 所示。数组

若是咱们往群组里添加更多的消费者,超过主题的分区数量,那么多出的消费者就会被闲置,不会接收到任何消息。缓存

往群组里增长消费者是横向伸缩消费能力的主要方式。 Kafka 消费者常常会作一些高延迟的操做,好比把数据写到数据库或 HDFS,或者使用数据进行比较耗时的计算。在这些状况下,单个消费者没法跟上数据生成的速度,因此能够增长更多的消费者,让它们分担负载,每一个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。咱们有必要为主题建立大量的分区,在负载增加时能够加入更多的消费者。不过要性意,不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。安全

除了经过增长消费者来横向伸缩单个应用程序外,还常常出现多个应用程序从同一个主题读取数据的状况。实际上, Kafka 设计的主要目标之一 ,就是要让 Kafka 主题里的数据可以知足企业各类应用场景的需求。在这些场景里,每一个应用程序能够获取到全部的消息, 而不仅是其中的 一部分。只要保证每一个应用程序有本身的消费者群组,就可让它们获取到主题全部的消息。不一样于传统的消息系统,横向伸缩 Kafka消费者和消费者群组并不会对性能形成负面影响。服务器

在上面的例子里,若是新增一个只包含一个消费者的群组 G2,那么这个消费者将从主题 T1 上接收全部的消息,与群组 G1 之间互不影响。群组 G2 能够增长更多的消费者,每一个消费者能够消费若干个分区,就像群组 G1 那样,如图 4-5 所示。总的来讲,群组 G2 仍是会接收到全部消息,无论有没有其余群组存在。网络

简而言之,为每个须要获取一个或多个主题所有消息的应用程序建立一个消费者群组, 而后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每一个消费者只处理一部分消息。

消费者群组和分区再均衡

咱们已经从上一个小节了解到,群组里的消费者共同读取主题的分区。一个新的消费者加 入群组时,它读取的是本来由其余消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,本来由它读取的分区将由群组里的其余消费者来读取。在主题发生变化时 , 好比管理员添加了新的分区,会发生分区重分配。

分区的全部权从一个消费者转移到另外一个消费者,这样的行为被称为再均衡。再均衡很是重要, 它为消费者群组带来了高可用性和伸缩性(咱们能够放心地添加或移除消费者), 不过在正常状况下,咱们并不但愿发生这样的行为。在再均衡期间,消费者没法读取消息,形成整个群组一小段时间的不可用。另外,当分区被从新分配给另 一个消费者时,消费者当前的读取状态会丢失,它有可能还须要去刷新缓存 ,在它从新恢复状态以前会拖慢应用程序。咱们将在本章讨论如何进行安全的再均衡,以及如何避免没必要要的再均衡。

消费者经过向被指派为 群组协调器的 broker (不一样的群组能够有不一样的协调器)发送 心跳 来维持它们和群组的从属关系以及它们对分区的全部权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息 (为了获取消息)或提交偏移量时发送心跳。若是消费者中止发送心跳的时间足够长,会话就会过时,群组协调器认为它已经死亡,就会触发一次再均衡。

若是一个消费者发生崩溃,井中止读取消息,群组协调器(broker)会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会当即触发一次再均衡,尽可能下降处理停顿。在本章的后续部分,咱们将讨论一些用于控制发送心跳频率和会话过时时间的配置参数,以及如何根据实际须要来配置这些参数 。

分配分区是怎样的一个过程

当消费者要加入群组时,它会向群组协调器发送 一 个 JoinGroup 请求。第 一 个加入群组的消费者将成为“群主”。群主从协调器那里得到群组的成员列 表(列表中包含了全部最近发送过心跳的消费者,它们被认为是活跃的), 并负责给每个消费者分配分区。它使用 一个实现了 PartitionAssignor接口的类来决定哪些分 区应该被分配给哪一个消费者 。

Kafka 内置了两种分配策略,在后面的配置参数小节咱们将深刻讨论。分配完毕以后,群主把分配状况列表发送给群组协调器,协调器再把这些信息发送给全部消费者。每一个消费者只能看到本身的分配信息,只有群 主知道群组 里全部消费者的分配信息。这个过程会在每次再均衡时重复发生。

建立 Kafka消费者

在读取消息以前,须要先建立 一个 KafkaConsumer对象 。 建立 KafkaConsumer 对象与建立 KafkaProducer对象很是类似——把想要传给消费者的属性放在 Properties 对象里。本章 后续部分会深刻讨论全部的属性。在这里,咱们只须要使用 3个必要的属性: bootstrap.servers、 key.deserializer、 value.deserializer。

下面代码演示了如何建立一个KafkaConsumer对象:

Properties props = new Properties();
 
props.put("bootstrap.servers", "broker1:9092, broker2:9092");
 
props.put("group.id", "CountryCounter");
 
props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");
 
props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");
 
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

deserializer使用指定的类(反序列化器)把字节数组转成 Java对象。

group.id指定了KafkaConsumer 属于哪个消费者群组。 group.id不是必需的,不过咱们如今姑且认为它是必需的。它指定了 KafkaConsumer 属于哪个消费者群组。建立不属于任何一个群组的消费者也是能够的,只是这样作不太常见。

订阅主题

建立好消费者以后,下一步能够开始订阅主题了。subscribe()方法接受一个主题列表做为参数

consumer.subscribe(Collections.singletonList("customerCountries"));

在这里咱们建立了一个包含单个元素的列表,主题的名字叫做“customerCountries”,咱们也能够在调用subscribe()方法时传入一个正则表达式,正则表达式能够匹配多个主题若是有人建立了新的主题,而且主题名与正则表达式匹配,那么会当即触发一次再均衡,消费者就能够读取新添加的主题。若是应用程序须要读取多个主题,而且能够处理不一样类型的数据,那么这种订阅方式就很管用。在Kafka和其余系统之间复制数据时,使用正则表达式的方式订阅多个主题时很常见的作法。

要订阅全部test相关的主题,能够这样作:consumer.subscribe("test.*");

轮询

消息轮询是消费者 API 的核心,经过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题 ,轮询就会处理全部的细节,包括群组协调、分区再均衡、发送心跳和获取数据, 开发者只须要使用一组简单的 API 来处理从分区返回的数据。消费者代码的主要部分以下所示 :

轮询不仅是获取数据那么简单。在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator, 而后加入群组,接受分配的分区。 若是发生了再均衡,整个过程也是在轮询期间进行的。固然 ,心跳也是从轮询里发迭出去的。因此,咱们要确保在轮询期间所作的任何处理工做都应该尽快完成。

线程安全

在同一个群组中,咱们没法让一个线程运行多个消费者,也没法让多个线程安全地共享一个消费者。按照规则,一个消费者使用一个线程。若是要在同一个消费者群组里运行多个消费者,须要让每一个消费者运行在本身的线程里。最好是把消费者的逻辑封装在本身的对象里,而后使用Java的ExecutorService启动多个线程,使每一个消费者运行在本身的线程上。

消费者的配置

到目前为止,咱们学习了如何使用消费者 API,不过只介绍了几个配置属’性一一如bootstrap.servers、 key.deserializer、 value.deserializer、group.id。 Kafka的文档列出了全部与消费者相关的配置说明。大部分参数都有合理的默认值,通常不须要修改它们,不过有一些参数与消费 者的性能和可用性有很大关系。接下来介绍这些重要的属性。

1. fetch.min.bytes

该属性指定了消费者从服务器获取记录的最小字节数。 broker 在收到消费者的数据请求时, 若是可用的数据量小于 fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样能够下降消费者和 broker 的工做负载,由于它们在主题不是很活跃的时候(或者一天里的低谷时段)就不须要来来回回地处理消息。若是没有不少可用数据,但消费者的 CPU 使用率却很高,那么就须要把该属性的值设得比默认值大。若是消费者的数量比较多,把该属性的值设置得大一点能够下降 broker 的工做负载。

2. fetch.max.wait.ms

咱们经过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 fetch.max.wait.ms则用于指定 broker的等待时间,默认是 500ms。若是没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到知足,最终致使500ms的延迟。 若是要下降潜在的延迟(为了知足 SLA),能够把该参数值设置得小一些。若是 fetch.max.wait.ms被设 为 100ms,而且 fetch.min.bytes 被设为 1MB,那么 Kafka在收到消费者的请求后,要么返 回 1MB 数据,要么在 100ms 后返回全部可用的数据 , 就看哪一个条件先获得知足。

3. max.parition.fetch.bytes

该属性指定了服务器从每一个分区里返回给消费者的最大字节数。它的默认值是 1MB,也 就是说, KafkaConsumer.poll() 方法从每一个分区里返回的记录最多不超过 max.parition.fetch.bytes 指定的字节。若是一个主题有 20个分区和 5 个消费者,那么每一个消费者须要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,能够给它们多分配一些,因 为若是群组里有消费者发生崩溃,剩下的消费者须要处理更多的分区。 max.parition.fetch.bytes 的值必须比 broker可以接收的最大消息的字节数(经过 max.message.size属 性配置 )大, 不然消费者可能没法读取这些消息,致使消费者一直挂起重试。在设置该属性时,另外一个须要考虑的因素是消费者处理数据的时间。 消费者须要频繁调用 poll() 方法来避免会话过时和发生分区再均衡,若是单次调用 poll() 返回的数据太多,消费者须要更多的时间来处理,可能没法及时进行下一个轮询来避免会话过时。若是出现这种状况, 能够把 max.parition.fetch.bytes 值改小 ,或者延长会话过时时间。

4. session.timeout.ms

该属性指定了消费者在被认为死亡以前能够与服务器断开链接的时间,默认是 3s。若是消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其余消费者 。该属性与 heartbeat.interval.ms紧密相关。heartbeat.interval.ms 指定了poll()方法向协调器 发送心跳的频 率, session.timeout.ms 则指定了消费者能够多久不发送心跳。因此, 通常须要同时修改这两个属性, heartbeat.interval.ms 必须比 session.timeout.ms 小, 通常是 session.timeout.ms 的三分之一。若是 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 ls。 把 session.timeout.ms 值设 得比默认值小,能够更快地检测和恢 复崩溃的节点,不过长时间的轮询或垃圾收集可能致使非预期的再均衡。把该属性的值设置得大一些,能够减小意外的再均衡 ,不过检测节点崩溃须要更长的时间。

5. auto.offset.reset

该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的状况下(因消费者长时间失效,包含偏移量的记录已通过时井被删除)该做何处理。它的默认值是latest, 意 思是说,在偏移量无效的状况下,消费者将从最新的记录开始读取数据(在消费者 启动之 后生成的记录)。另外一个值是 earliest,意思是说,在偏移量无效的状况下,消费者将从 起始位置读取分区的记录。

6. enable.auto.commit

咱们稍后将介绍 几种 不一样的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是 true。为了尽可能避免出现重复数据和数据丢失,能够把它设为 false,由本身控制什么时候提交偏移量。若是把它设为 true,还能够经过配置 auto.commit.interval.mls 属性来控制提交的频率。

7. partition.assignment.strategy

咱们知道,分区会被分配给群组里的消费者。 PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪一个消费者。 Kafka 有两个默认的分配策略 。

- Range

该策略会把主题的若干个连续的分区分配给消费者。假设悄费者 C1 和消费者 C2 同时 订阅了主题 T1 和主题 T2,井且每一个主题有 3 个分区。那么消费者 C1 有可能分配到这 两个主题的分区 0 和 分区 1,而消费者 C2 分配到这两个主题 的分区 2。由于每一个主题 拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了 Range策略,并且分区数量没法被消费者数量整除,就会出现这种状况。

- RoundRobin

该策略把主题的全部分区逐个分配给消费者。若是使用 RoundRobin 策略来给消费者 C1 和消费者 C2分配分区,那么消费者 C1 将分到主题 T1 的分区 0和分区 2以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 l 以及主题T2 的分区 0和分区 2。通常 来讲,若是全部消费者都订阅相同的主题(这种状况很常见), RoundRobin策略会给所 有消费者分配相同数量 的分区(或最多就差一个分区)。

能够经过设置 partition.assignment.strategy 来选择分区策略。默认使用的是 org. apache.kafka.clients.consumer.RangeAssignor, 这个类实现了 Range策略,不过也能够 把它改为 org.apache.kafka.clients.consumer.RoundRobinAssignor。咱们还可使用自定 义策略,在这种状况下 , partition.assignment.strategy 属性的值就是自定义类的名字。

8. client.id

该属性能够是任意字符串 , broker用它来标识从客户端发送过来的消息,一般被用在日志、度量指标和配额里。

9. max.poll.records

该属性用于控制单次调用 call() 方法可以返回的记录数量,能够帮你控制在轮询里须要处理的数据量。

10. receive.buffer.bytes 和 send.buffer.bytes

socket 在读写数据时用到的 TCP 缓冲区也能够设置大小。若是它们被设为-1,就使用操做系统的默认值。若是生产者或消费者与 broker处于不一样的数据中心内,能够适当增大这些值,由于跨数据中心的网络通常都有 比较高的延迟和比较低的带宽 。

做者注:欢迎关注笔者公号,按期分享IT互联网、金融等工做经验心得、人生感悟,欢迎交流,目前就任阿里-移动事业部,须要大厂内推的也可到公众号砸简历,或查看我我的资料获取。(公号ID:weknow619)。

相关文章
相关标签/搜索