【Kafka】《Kafka权威指南》——写数据

无论是把 Kafka 做为消息队列、消息、总线仍是数据存储平台来使用 ,老是须要有一个能够往 Kafka 写入数据的生产者和一个能够从 Kafka读取数据的消费者,或者一个兼具两种角 色的应用程序。java

例如,在一个信用卡事务处理系统里,有一个客户端应用程序,它多是一个在线商店, 每当有支付行为发生时,它负责把事务发送到 Kafka上。另外一个应用程序根据规则引擎检 查这个事务,决定是批准仍是拒绝。 批准或拒绝的响应消息被写回 Kafka,而后发送给发起事务的在线商店。第三个应用程序从 Kafka上读取事务和审核状态,把它们保存到数据 库, 随后分析师能够对这些结果进行分析,或许还能借此改进规则引擎 。算法

开发者们可使用 Kafka 内置的客户端 API开发 Kafka应用程序。数据库

在这一章,咱们将从 Kafra生产者的设计和组件讲起,学习如何使用 Kafka生产者。咱们将展现如何建立 KafkaProducer和 ProducerRecords对象、如何将记录发送给 Kafka,以及如何处理从 Kafka 返回的错误,而后介绍用干控制生产者行为的重要配置选项,最后深刻 探讨如何使用不一样的分区方法和序列化器,以及如何自定义序列化器和分区器 。apache

在下一章,咱们将会介绍 Kafra的悄费者客户端,以及如何从 Kafka读取消息。bootstrap

生产者概览

一个应用程序在不少状况下须要往 Kafka 写入消息 : 记录用户的活动(用于审计和分析 )、 记录度量指标、保存日志、消息、记录智能家电的信息、与其余应用程序进行异步通讯、 缓冲即将写入到数据库的数据,等等。数组

多样的使用场景意味着多样的需求:是否每一个消息都很重要?是否容许丢失 一 小部分消息?偶尔出现重复消息是否能够接受?是否有严格的延迟和吞吐量要求?安全

在以前提到的信用卡事务处理系统里,消息丢失或消息重复是不容许的,能够接受的延迟最大为 500ms,对吞吐量要求较高,咱们但愿每秒钟能够处理一百万个消息。服务器

保存网站的点击信息是另 一种使用场景。在这个场景里,容许丢失少许的消息或出现少许 的消息重复,延迟能够高一些,只要不影响用户体验就行。换句话说,只要用户点击连接 后能够立刻加载页面,那么咱们并不介意消息要在几秒钟以后才能到达 Kafka 服务器。 吞 吐量则取决于网站用户使用网站的频度。网络

不一样的使用场景对生产者 API 的使用和配置会有直接的影响。多线程

尽管生产者 API 使用起来很简单 ,但消息的发送过程仍是有点复杂的。下图展现 了向Kafka 发送消息的主要步骤。

Kafka 生产者组件图

咱们从建立 一个 ProducerRecord 对象开始, ProducerRecord 对象须要包含目标主题和要发送的内容。咱们还能够指定键或分区。在发送 ProducerRecord对象时,生产者要先把键和 值对象序列化成字节数组,这样它们才可以在网络上传输 。

接下来,数据被传给分区器。若是以前在 ProducerRecord对象里指定了分区,那么分区器就不会再作任何事情,直接把指定的分区返回。若是没有指定分区 ,那么分区器会根据 ProducerRecord对象的键来选择一个分区 。选好分区之后 ,生产者就知道该往哪一个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的全部消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。

服务器在收到这些消息时会返回一个响应。若是消息成功写入 Kafka,就返回 一 个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。若是写入 失败, 就会返回 一个错误 。生产者在收到错误以后会尝试从新发送消息,几回以后若是仍是失败,就返回错误信息。

建立Kafka生产者

要往 Kafka写入消息,首先要建立一个生产者对象,井设置一些属性。

下面的代码片断展现了如何建立一个新的生产者,这里只指定了必要的属性,其余使用默认设置。

private Properties kafkaProps = new Properties(); 

kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
 
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
 
kafkaProps.put("value.seializer","org.apache.kafka.common.serialization.StringSerializer");
 
producer = new KafkaProducer<String, String>(kafkaProps);
复制代码

Kafka生产者有 3个必选的属性

bootstrap.servers

该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不须要包含全部的broker地址,生产者会从给定的 broker里查找到其余 broker的信息。不过建议至少要提供两个 broker的信息, 一旦其中一个宕机,生产者仍然可以链接到集群上。

key.serializer

broker但愿接收到的消息的键和值都是字节数组。生产者接口容许使用参数化类型,所以能够把 Java对象做为键和值发送给 broker。这样的代码具备良好的可读性,不过生产者须要知道如何把这些 Java对象转换成字节数组。 key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。 Kafka 客户端默认提供了ByteArraySerializer(这个只作不多的事情)、 StringSerializer和 IntegerSerializer,所以,若是你只使用常见的几种 Java对象类型,那么就不必实现本身的序列化器 。要注意, key.serializer是必须设置的,就算你打算只发送值内容。

value.serializer

与 key.serializer同样, value.serializer指定的类会将值序列化。若是键和值都是字符串,可使用与 key.serializer 同样的序列化器。若是键是整数类型而值是字符扇 , 那么须要使用不一样的序列化器。

发送消息主要有3种方式:

一、发送并忘记( fire-and-forget):咱们把消息发送给服务器,但井不关心它是否正常到达。大多数状况下,消息会正常到达,由于 Kafka是高可用的,并且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。

二、同步发送:咱们使用send()方怯发送消息, 它会返回一个Future对象,调用get()方法进行等待, 就能够知道悄息是否发送成功。

三、异步发送:咱们调用 send() 方怯,并指定一个回调函数, 服务器在返回响应时调用该函数。

在下面的几个例子中 , 咱们会介绍如何使用上述几种方式来发送消息,以及如何处理可能 发生的异常状况。

本章的全部例子都使用单线程,但其实生产者是可使用多线程来发送消息的。刚开始的 时候可使用单个消费者和单个线程。若是须要更高的吞吐量,能够在生产者数量不变的 前提下增长线程数量。若是这样作还不够 , 能够增长生产者数量。

发送消息到Kafka

最简单的同步发送消息方式以下所示 :

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
  producer.send(record);
} catch(Exception e) {
  e.printStack();
}
复制代码

生产者的 send() 方住将 ProducerRecord对象做为参数,它须要目标主题的名字和要发送的键和值对象,它们都是字符串。键和值对象的类型必须与序列化器和生产者对象相匹配。

咱们使用生产者的 send() 方愈加送 ProducerRecord对象。从生产者的架构图里能够看到,消息先是被放进缓冲区,而后使用单独的线程发送到服务器端。 send() 方法会返回一个包含 RecordMetadata 的 Future对象,不过由于咱们会忽略返回值,因此没法知道消息是否发送成功。若是不关心发送结果,那么可使用这种发送方式。好比,记录 Twitter 消息日志,或记录不过重要的应用程序日志。

咱们能够忽略发送消息时可能发生的错误或在服务器端可能发生的错误,但在发送消息以前,生产者仍是有可能发生其余的异常。这些异常有多是 SerializationException (说明序列化消息失败)、 BufferExhaustedException 或 TimeoutException (说明缓冲区已满),又或者是 InterruptException (说明发送线程被中断)。

同步发送消息

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
    producer.send(record).get();
} catch(Exception e) {
    e.printStack();
}
复制代码

在这里, producer.send() 方住先返回一个 Future对象,而后调用 Future对象的 get() 方法等待 Kafka 响应。若是服务器返回错误, get()方怯会抛出异常。若是没有发生错误,咱们会获得一个 RecordMetadata对象,能够用它获取消息的偏移量。若是在发送数据以前或者在发送过程当中发生了任何错误 ,好比 broker返回 了一个不容许重发消息的异常或者已经超过了重发的次数 ,那么就会抛出异常。咱们只是简单地把异常信息打印出来。

如何处理从Kafka生产者返回的错误

KafkaProducer通常会发生两类错误。其中一类是可重试错误 ,这类错误能够经过重发消息来解决。好比对于链接错误,能够经过再次创建链接来解决,“无主(noleader)” 错误则可 以经过从新为分区选举首领来解决。 KafkaProducer能够被配置成自动重试,若是在屡次重试后仍无能解决问题,应用程序会收到一个重试异常。另外一类错误无出经过重试解决 ,好比“消息太大”异常。对于这类错误, KafkaProducer不会进行任何重试,直接抛出异常。

异步发送消息

假设消息在应用程序和 Kafka集群之间一个来回须要 10ms。若是在发送完每一个消息后都等待回应,那么发送 100个消息须要 1秒。但若是只发送消息而不等待响应,那么发送100个消息所须要的时间会少不少。大多数时候,咱们并不须要等待响应——尽管 Kafka 会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来讲不是必需的。不过在遇到消息发送失败时,咱们须要抛出异常、记录错误日志,或者把消息写入 “错误消息”文件以便往后分析。

为了在异步发送消息的同时可以对异常状况进行处理,生产者提供了回调支持 。下面是使用异步发送消息、回调的一个例子。

生产者的配置

到目前为止 , 咱们只介绍了生产者的几个必要配置参数——bootstrap.servers API 以及序列化器。

生产者还有不少可配置的参数,在 Kafka文档里都有说明,它们大部分都有合理的默认值 , 因此没有必要去修改它们 。不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,接下来咱们会一一说明。

1. acks

acks 参数指定了必需要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。

这个参数对消息丢失的可能性有重要影响。 该参数有以下选项。 • 若是 acks=0, 生产者在成功写入悄息以前不会等待任何来自服务器的响应。也就是说, 若是当中出现了问题 , 致使服务器没有收到消息,那么生产者就无从得知,消息也就丢 失了。不过,由于生产者不须要等待服务器的响应,因此它能够以网络可以支持的最大 速度发送消息,从而达到很高的吞吐量。

• 若是 acks=1,只要集群的首领节点收到消息,生产者就会收到 一个来自服务器的成功 响应。若是消息无撞到达首领节点(好比首领节点崩愤,新的首领尚未被选举出来), 生产者会收到一个错误响应,为了不数据丢失,生产者会重发消息。不过,若是一个 没有收到消息的节点成为新首领,消息仍是会丢失。这个时候的吞吐量取决于使用的是 同步发送仍是异步发送。若是让发送客户端等待服务器的响应(经过调用 Future对象 的 get()方法),显然会增长延迟(在网络上传输一个来回的延迟)。若是客户端使用异步回调,延迟问题就能够获得缓解,不过吞吐量仍是会受发送中消息数量的限制(好比,生 产者在收到服务器响应以前能够发送多少个消息)。

• 若是 acks=all,只有当全部参与复制的节点所有收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它能够保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然能够运行(第 5 章将讨论更多的细节)。不过,它的延迟比 acks=1时更高,由于咱们要等待不仅一个服务器节点接收消息。

2. buffer.memory

该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。若是 应用程序发送消息的速度超过发送到服务器的速度,会致使生产者空间不足。这个时候, send()方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffe.full 参数 (在0.9.0.0版本里被替换成了max.block.ms,表示在抛出异常以前能够阻塞一段时间)。

3. compression.type

默认状况下,消息发送时不会被压缩。该参数能够设置为 snappy、 gzip 或 lz4,它指定了消息被发送给 broker以前使用哪种压缩算法进行压缩。 snappy 压缩算怯由 Google巳发明, 它占用较少 的 CPU,却能提供较好的性能和至关可观的压缩比,若是比较关注性能和网络带宽,可使用这种算法。 gzip压缩算法通常会占用较多的 CPU,但会提供更高的压缩比,因此若是网络带宽比较有限,可使用这种算法。使用压缩能够下降网络传输开销和存储开销,而这每每是向 Kafka发送消息的瓶颈所在。

4. retries

生产者从服务器收到的错误有多是临时性的错误(好比分区找不到首领)。在这种状况下, retries参数的值决定了生产者能够重发消息的次数,若是达到这个次数,生产者会放弃重试并返回错误。默认状况下,生产者会在每次重试之间等待 1OOms,不过能够经过 retries.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔以前, 先测试一下恢复一个崩溃节点须要多少时间(好比全部分区选举出首领须要多长时间), 让总的重试时间比 Kafka集群从崩溃中恢复的时间长,不然生产者会过早地放弃重试。不过有些错误不是临时性错误,没办怯经过重试来解决(好比“悄息太大”错误)。通常情 况下,由于生产者会自动进行重试,因此就不必在代码逻辑里处理那些可重试的错误。 你只须要处理那些不可重试的错误或重试次数超出上限的状况。

5. batch.size

当有多个消息须要被发送到同一个分区时,生产者会把它们放在放一个批次里。该参数指定了一个批次可使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的全部消息会被发送出去。不过生产者井不必定都会等到批次被填满才发送,半捕 的批次,甚至只包含一个消息的批次也有可能被发送。因此就算把批次大小设置得很大, 也不会形成延迟,只是会占用更多的内存而已。但若是设置得过小,由于生产者须要更频繁地发送消息,会增长一些额外的开销。

6. linger.ms

该参数指定了生产者在发送批次以前等待更多消息加入批次的时间。 KafkaProducer 会在批次填满或 linger.ms达到上限时把批次发送出去。默认状况下,只要有可用的线程, 生产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms设置成比0大的数, 让生产者在发送批次以前等待一下子,使更多的消息加入到这个批次 。虽然这样会增长延迟,但也会提高吞吐量(由于一次性发送更多的消息,每一个消息的开销就变小了)。

7. client.id

该参数能够是任意的字符串,服务器会用它来识别消息的来源,还能够用在日志和配额指标里。

8. max.in.flight.requests.per.connection

该参数指定了生产者在收到服务器晌应以前能够发送多少个消息。它的值越高,就会占用越多的内存,不过也会提高吞吐量。 把它设为 1 能够保证消息是按照发送的顺序写入服务器的,即便发生了重试。

9. timeout.ms、 request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms指定了生产者在发送数据时等待服务器返回响应的时间,metadata.fetch.timeout.ms指定了生产者在获取元数据(好比目标分区的首领是谁)时等待服务器返回响应的时间。若是等待响应超时,那么生产者要么重试发送数据,要么返回 一个错误 (抛出异常或执行回调)。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配一一若是在指定时间内没有收到同步副本的确认,那么 broker就会返回 一个错误 。

10. max.block.ms

该参数指定了在调用 send() 方法或使用 parttitionFor() 方能获取元数据时生产者的阻塞 时间。当生产者的发送缓冲区已捕,或者没有可用的元数据时,这些方屈就会阻塞。在阻塞时间达到 max.block.ms时,生产者会抛出超时异常。

11 . max.request.size

该参数用于控制生产者发送的请求大小。它能够指能发送的单个消息的最大值,也能够指单个请求里全部消息总的大小。例如,假设这个值为 1MB,那么能够发送的单个最大消息为 1MB,或者生产者能够在单个请求里发送一个批次,该批次包含了 1000个消息,每一个消息大小为 1KB 。另外, broker对可接收的消息最大值也有本身的限制( message.max.bytes),因此两边的配置最好能够匹配,避免生产者发送的消息被 broker拒绝 。

12. receive.buffer.bytes 和 send.buffer.bytes

这两个参数分别指定了 TCP socket接收和发送数据包的缓冲区大小 。 若是它们被设为 -1 , 就使用操做系统的默认值。若是生产者或消费者与 broker处于不一样的数据中心,那么能够适当增大这些值,由于跨数据中心的网络通常都有比较高的延迟和比较低的带宽。

顺序保证

Kafka能够保证同一个分区里的消息是有序的。也就是说,若是生产者按照必定的顺序发送消息, broker就会按照这个顺序把它们写入分区,消费者也会按照一样的顺序读取它们。在某些状况下 , 顺序是很是重要的。若是把retries 设为非零整数,同时把 max.in.flight.requests.per.connection 设为比 1大的数,那么,若是第一个批次消息写入失败,而第二个批次写入成功, broker会重试写入第一个批次。若是此时第一个批次也写入成功,那 么两个批次的顺序就反过来了。

通常来讲,若是某些场景要求消息是有序的,那么消息是否写入成功也是 很关键的,因此不建议把顺序是很是重要的。若是把retries 设为 0。能够把 max.in.flight.requests.per.connection设为 1,这样在生产者尝试发送第一批消息时,就不会有其余的消息发送给 broker。不过这样会严重影响生产者的吞吐量 ,因此只有在 对消息的顺序有严格要求的状况下才能这么作。

相关文章
相关标签/搜索