Kafka消费者APi

Kafka客户端从集群中消费消息,并透明地处理kafka集群中出现故障服务器,透明地调节适应集群中变化的数据分区。也和服务器交互,平衡均衡消费者。html

public class KafkaConsumer<K,V> extends Object implements Consumer<K,V> 

消费者TCP长链接到broker来拉取消息。故障致使的消费者关闭失败,将会泄露这些链接,消费者不是线程安全的,能够查看更多关于Multi-threaded(多线程)处理的细节。java

跨版本兼容性

该客户端能够与0.10.0或更新版本的broker集群进行通讯。较早的版本可能不支持某些功能。例如,0.10.0broker不支持offsetsForTimes,由于此功能是在版本0.10.1中添加的。 若是你调用broker版本不可用的API时,将报 UnsupportedVersionException 异常。正则表达式

偏移量和消费者的位置

kafka为分区中的每条消息保存一个偏移量(offset),这个偏移量是该分区中一条消息的惟一标示符。也表示消费者在分区的位置。例如,一个位置是5的消费者(说明已经消费了0到4的消息),下一个接收消息的偏移量为5的消息。实际上有两个与消费者相关的“位置”概念:数据库

消费者的位置给出了下一条记录的偏移量。它比消费者在该分区中看到的最大偏移量要大一个。 它在每次消费者在调用poll(long)中接收消息时自动增加。apache

“已提交”的位置是已安全保存的最后偏移量,若是进程失败或从新启动时,消费者将恢复到这个偏移量。消费者能够选择按期自动提交偏移量,也能够选择经过调用commit API来手动的控制(如:commitSync 和 commitAsync)。bootstrap

这个区别是消费者来控制一条消息何时才被认为是已被消费的,控制权在消费者,下面咱们进一步更详细地讨论。缓存

消费者组和主题订阅

Kafka的消费者组概念,经过进程池瓜分消息并处理消息。这些进程能够在同一台机器运行,也可分布到多台机器上,以增长可扩展性和容错性,相同group.id的消费者将视为同一个消费者组安全

分组中的每一个消费者都经过subscribe API动态的订阅一个topic列表。kafka将已订阅topic的消息发送到每一个消费者组中。并经过平衡分区在消费者分组中全部成员之间来达到平均。所以每一个分区刚好地分配1个消费者(一个消费者组中)。全部若是一个topic有4个分区,而且一个消费者分组有只有2个消费者。那么每一个消费者将消费2个分区。服务器

消费者组的成员是动态维护的:若是一个消费者故障。分配给它的分区将从新分配给同一个分组中其余的消费者。一样的,若是一个新的消费者加入到分组,将从现有消费者中移一个给它。这被称为从新平衡分组,并在下面更详细地讨论。当新分区添加到订阅的topic时,或者当建立与订阅的正则表达式匹配的新topic时,也将从新平衡。将经过定时刷新自动发现新的分区,并将其分配给分组的成员。网络

从概念上讲,你能够将消费者分组看做是由多个进程组成的单一逻辑订阅者。做为一个多订阅系统,Kafka支持对于给定topic任何数量的消费者组,而不重复。

这是在消息系统中常见的功能的略微归纳。全部进程都将是单个消费者分组的一部分(相似传统消息传递系统中的队列的语义),所以消息传递就像队列同样,在组中平衡。与传统的消息系统不一样的是,虽然,你能够有多个这样的组。但每一个进程都有本身的消费者组(相似于传统消息系统中pub-sub的语义),所以每一个进程都会订阅到该主题的全部消息。

此外,当分组从新分配自动发生时,能够经过ConsumerRebalanceListener通知消费者,这容许他们完成必要的应用程序级逻辑,例如状态清除,手动偏移提交等。有关更多详细信息,请参阅Kafka存储的偏移

它也容许消费者经过使用assign(Collection)手动分配指定分区,若是使用手动指定分配分区,那么动态分区分配和协调消费者组将失效。

发现消费者故障

订阅一组topic后,当调用poll(long)时,消费者将自动加入到组中。只要持续的调用poll,消费者将一直保持可用,并继续从分配的分区中接收消息。此外,消费者向服务器定时发送心跳。 若是消费者崩溃或没法在session.timeout.ms配置的时间内发送心跳,则消费者将被视为死亡,而且其分区将被从新分配。

还有一种可能,消费可能遇到“活锁”的状况,它持续的发送心跳,可是没有处理。为了预防消费者在这种状况下一直持有分区,咱们使用max.poll.interval.ms活跃检测机制。 在此基础上,若是你调用的poll的频率大于最大间隔,则客户端将主动地离开组,以便其余消费者接管该分区。 发生这种状况时,你会看到offset提交失败(调用commitSync()引起的CommitFailedException)。这是一种安全机制,保障只有活动成员可以提交offset。因此要留在组中,你必须持续调用poll。

消费者提供两个配置设置来控制poll循环:

  1. max.poll.interval.ms:增大poll的间隔,能够为消费者提供更多的时间去处理返回的消息(调用poll(long)返回的消息,一般返回的消息都是一批)。缺点是此值越大将会延迟组从新平衡。

  2. max.poll.records:此设置限制每次调用poll返回的消息数,这样能够更容易的预测每次poll间隔要处理的最大值。经过调整此值,能够减小poll间隔,减小从新平衡分组的

对于消息处理时间不可预测地的状况,这些选项是不够的。 处理这种状况的推荐方法是将消息处理移到另外一个线程中,让消费者继续调用poll。 可是必须注意确保已提交的offset不超过实际位置。另外,你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量(取决于你)。 还要注意,你须要pause暂停分区,不会从poll接收到新消息,让线程处理完以前返回的消息(若是你的处理能力比拉取消息的慢,那建立新线程将致使你机器内存溢出)。

示例

这个消费者API提供了灵活性,以涵盖各类消费场景,下面是一些例子来演示如何使用它们。

自动提交偏移量

这是个【自动提交偏移量】的简单的kafka消费者API。

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } 

设置enable.auto.commit,偏移量由auto.commit.interval.ms控制自动提交的频率。

集群是经过配置bootstrap.servers指定一个或多个broker。不用指定所有的broker,它将自动发现集群中的其他的borker(最好指定多个,万一有服务器故障)。

在这个例子中,客户端订阅了主题foobar。消费者组叫test

broker经过心跳机器自动检测test组中失败的进程,消费者会自动ping集群,告诉进群它还活着。只要消费者可以作到这一点,它就被认为是活着的,并保留分配给它分区的权利,若是它中止心跳的时间超过session.timeout.ms,那么就会认为是故障的,它的分区将被分配到别的进程。

这个deserializer设置如何把byte转成object类型,例子中,经过指定string解析器,咱们告诉获取到的消息的key和value只是简单个string类型。

手动控制偏移量

不须要定时的提交offset,能够本身控制offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。这个颇有用的,当消费的消息结合了一些处理逻辑,这个消息就不该该认为是已经消费的,直到它完成了整个处理。

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } } 

在这个例子中,咱们将消费一批消息并将它们存储在内存中。当咱们积累足够多的消息后,咱们再将它们批量插入到数据库中。若是咱们设置offset自动提交(以前说的例子),消费将被认为是已消费的。这样会出现问题,咱们的进程可能在批处理记录以后,但在它们被插入到数据库以前失败了。

为了不这种状况,咱们将在相应的记录插入数据库以后再手动提交偏移量。这样咱们能够准确控制消息是成功消费的。提出一个相反的可能性:在插入数据库以后,可是在提交以前,这个过程可能会失败(即便这可能只是几毫秒,这是一种可能性)。在这种状况下,进程将获取到已提交的偏移量,并会重复插入的最后一批数据。这种方式就是所谓的“至少一次”保证,在故障状况下,能够重复。

若是您没法执行这些操做,可能会使已提交的偏移超过消耗的位置,从而致使缺乏记录。 使用手动偏移控制的优势是,您能够直接控制记录什么时候被视为“已消耗”。

注意:使用自动提交也能够“至少一次”。可是要求你必须下次调用poll(long)以前或关闭消费者以前,处理完全部返回的数据。若是操做失败,这将会致使已提交的offset超过消费的位置,从而致使丢失消息。使用手动控制offset的有点是,你能够直接控制消息什么时候提交。、

上面的例子使用commitSync表示全部收到的消息为”已提交",在某些状况下,你能够但愿更精细的控制,经过指定一个明确消息的偏移量为“已提交”。在下面,咱们的例子中,咱们处理完每一个分区中的消息后,提交偏移量。

try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); } 

注意:已提交的offset应始终是你的程序将读取的下一条消息的offset。所以,调用commitSync(offsets)时,你应该加1个到最后处理的消息的offset。

订阅指定的分区

在前面的例子中,咱们订阅咱们感兴趣的topic,让kafka提供给咱们平分后的topic分区。可是,在有些状况下,你可能须要本身来控制分配指定分区,例如:

  • 若是这个消费者进程与该分区保存了某种本地状态(如本地磁盘的键值存储),则它应该只能获取这个分区的消息。

  • 若是消费者进程自己具备高可用性,而且若是它失败,会自动从新启动(可能使用集群管理框架如YARN,Mesos,或者AWS设施,或做为一个流处理框架的一部分)。 在这种状况下,不须要Kafka检测故障,从新分配分区,由于消费者进程将在另外一台机器上从新启动。

要使用此模式,,你只需调用assign(Collection)消费指定的分区便可:

String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1)); 

一旦手动分配分区,你能够在循环中调用poll(跟前面的例子同样)。消费者分组仍须要提交offset,只是如今分区的设置只能经过调用assign修改,由于手动分配不会进行分组协调,所以消费者故障不会引起分区从新平衡。每个消费者是独立工做的(即便和其余的消费者共享GroupId)。为了不offset提交冲突,一般你须要确认每个consumer实例的gorupId都是惟一的。

注意,手动分配分区(即,assgin)和动态分区分配的订阅topic模式(即,subcribe)不能混合使用。

offset存储在其余地方

消费者能够不使用kafka内置的offset仓库。能够选择本身来存储offset。要注意的是,将消费的offset和结果存储在同一个的系统中,用原子的方式存储结果和offset,但这不能保证原子,要想消费是彻底原子的,并提供的“正好一次”的消费保证比kafka默认的“至少一次”的语义要更高。你须要使用kafka的offset提交功能。

这有结合的例子。

  • 若是消费的结果存储在关系数据库中,存储在数据库的offset,让提交结果和offset在单个事务中。这样,事物成功,则offset存储和更新。若是offset没有存储,那么偏移量也不会被更新。

  • 若是offset和消费结果存储在本地仓库。例如,能够经过订阅一个指定的分区并将offset和索引数据一块儿存储来构建一个搜索索引。若是这是以原子的方式作的,常见的多是,即便崩溃引发未同步的数据丢失。索引程序从它确保没有更新丢失的地方恢复,而仅仅丢失最近更新的消息。

每一个消息都有本身的offset,因此要管理本身的偏移,你只须要作到如下几点:

  • 配置 enable.auto.commit=false

  • 使用提供的 ConsumerRecord 来保存你的位置。

  • 在重启时用 seek(TopicPartition, long) 恢复消费者的位置。

当分区分配也是手动完成的(像上文搜索索引的状况),这种类型的使用是最简单的。 若是分区分配是自动完成的,须要特别当心处理分区分配变动的状况。能够经过调用subscribe(Collection,ConsumerRebalanceListener)subscribe(Pattern,ConsumerRebalanceListener)中提供的ConsumerRebalanceListener实例来完成的。例如,当分区向消费者获取时,消费者将经过实现ConsumerRebalanceListener.onPartitionsRevoked(Collection)来给这些分区提交它们offset。当分区分配给消费者时,消费者经过ConsumerRebalanceListener.onPartitionsAssigned(Collection)为新的分区正确地将消费者初始化到该位置。

ConsumerRebalanceListener的另外一个常见用法是清除应用已移动到其余位置的分区的缓存。

控制消费的位置

大多数状况下,消费者只是简单的从头至尾的消费消息,周期性的提交位置(自动或手动)。kafka也支持消费者去手动的控制消费的位置,能够消费以前的消息也能够跳过最近的消息。

有几种状况,手动控制消费者的位置多是有用的。

一种场景是对于时间敏感的消费者处理程序,对足够落后的消费者,直接跳过,从最近的消费开始消费。

另外一个使用场景是本地状态存储系统(上一节说的)。在这样的系统中,消费者将要在启动时初始化它的位置(不管本地存储是否包含)。一样,若是本地状态已被破坏(假设由于磁盘丢失),则能够经过从新消费全部数据并从新建立状态(假设kafka保留了足够的历史)在新的机器上从新建立。

kafka使用seek(TopicPartition, long)指定新的消费位置。用于查找服务器保留的最先和最新的offset的特殊的方法也可用(seekToBeginning(Collection) 和 seekToEnd(Collection))。

消费者流量控制

若是消费者分配了多个分区,并同时消费全部的分区,这些分区具备相同的优先级。在一些状况下,消费者须要首先消费一些指定的分区,当指定的分区有少许或者已经没有可消费的数据时,则开始消费其余分区。

例如流处理,当处理器从2个topic获取消息并把这两个topic的消息合并,当其中一个topic长时间落后另外一个,则暂停消费,以便落后的遇上来。

kafka支持动态控制消费流量,分别在future的poll(long)中使用pause(Collection)resume(Collection) 来暂停消费指定分配的分区,从新开始消费指定暂停的分区。

多线程处理

Kafka消费者不是线程安全的。全部网络I/O都发生在进行调用应用程序的线程中。用户的责任是确保多线程访问正确同步的。非同步访问将致使ConcurrentModificationException。

此规则惟一的例外是wakeup(),它能够安全地从外部线程来中断活动操做。在这种状况下,将从操做的线程阻塞并抛出一个WakeupException。这可用于从其余线程来关闭消费者。 如下代码段显示了典型模式:

public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(10000); // Handle new records } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } } 

在单独的线程中,能够经过设置关闭标志和唤醒消费者来关闭消费者。

closed.set(true); consumer.wakeup(); 

咱们没有多线程模型的例子。但留下几个操做可用来实现多线程处理消息。

  1. 每一个线程一个消费者

    每一个线程本身的消费者实例。这里是这种方法的优势和缺点:

    • PRO: 这是最容易实现的
    • PRO: 由于它不须要在线程之间协调,因此一般它是最快的。
    • PRO: 它按顺序处理每一个分区(每一个线程只处理它接受的消息)。
    • CON: 更多的消费者意味着更多的TCP链接到集群(每一个线程一个)。通常kafka处理链接很是的快,因此这是一个小成本。
    • CON: 更多的消费者意味着更多的请求被发送到服务器,但稍微较少的数据批次可能致使I/O吞吐量的一些降低。
    • CON: 全部进程中的线程总数受到分区总数的限制。
  2. 解耦消费和处理

    另外一个替代方式是一个或多个消费者线程,它来消费全部数据,其消费全部数据并将ConsumerRecords实例切换到由实际处理记录处理的处理器线程池来消费的阻塞队列。这个选项一样有利弊:

    • PRO: 可扩展消费者和处理进程的数量。这样单个消费者的数据可分给多个处理器线程来执行,避免对分区的任何限制。
    • CON: 跨多个处理器的顺序保证须要特别注意,由于线程是独立的执行,后来的消息可能比遭到的消息先处理,这仅仅是由于线程执行的运气。若是对排序没有问题,这就不是个问题。
    • CON: 手动提交变得更困难,由于它须要协调全部的线程以确保处理对该分区的处理完成。

这种方法有多种玩法,例如,每一个处理线程能够有本身的队列,消费者线程可使用TopicPartitionhash到这些队列中,以确保按顺序消费,而且提交也将简化。

做者:半兽人 连接:http://orchome.com/451 来源:OrcHome 著做权归做者全部。商业转载请联系做者得到受权,非商业转载请注明出处。
相关文章
相关标签/搜索