从建立一个 ProducerRecord 对象开始, Producer Record 对象须要包含目标主题和要发送的内容。咱们还能够指定键或分区。在发送 ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样它们才可以在网络上传输。html
接下来,数据被传给分区器。若是以前在 Producer Record 对象里指定了分区,那么分区器就不会再作任何事情,直接把指定的分区返回。若是没有 指定分区,那么分区器会根据 Producer Record 对象的键来选择一个分区。选好分区之后,生产者就知道该往哪一个主题和分区发送这条记录了。紧接着, 这条记录被添加到一个记录批次里(双端队列,尾部写入),这个批次里的全部消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。算法
服务器在收到这些消息时会返回一个响应。若是消息成功写入 Kafka ,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分 区里的偏移量。若是写入失败, 则会返回一个错误。生产者在收到错误以后会尝试从新发送消息,几回以后若是仍是失败,就返回错误信息。spring
生产者发送消息通常会发生两类错误:apache
一类是可重试错误,好比链接错误(可经过再次创建链接解决)、无主 no leader(可经过分区从新选举首领解决)。数组
另外一类是没法经过重试解决,好比“消息太大”异常,具体见 message.max.bytes,这类消息不会进行任何重试,直接抛出异常缓存
三种发送方式安全
咱们经过生成者的 send 方法进行发送。send 方法会返回一个包含 RecordMetadata 的 Future 对象。RecordMetadata 里包含了目标主题,分区信息和 消息的偏移量。服务器
发送并忘记网络
忽略 send 方法的返回值,不作任何处理。大多数状况下,消息会正常到达,并且生产者会自动重试,但有时会丢失消息。多线程
同步发送
得到 send 方法返回的 Future 对象,在合适的时候调用 Future 的 get 方法。参见代码,模块 kafka-no-spring
异步发送
实现接口 org.apache.kafka.clients.producer.Callback,而后将实现类的实例做为参数传递给 send 方法。参见代码,模块 kafka-no-spring 下包 sendtype 中。
多线程下的生产者
KafkaProducer 的实现是线程安全的,因此咱们能够在多线程的环境下,安全的使用 KafkaProducer 的实例,如何节约资源的使用呢?参见代码,模块 kafka-no-spring 下包 concurrent
更多发送配置
生产者有不少属性能够设置,大部分都有合理的默认值,无需调整。有些参数可能对内存使用,性能和可靠性方面有较大影响。能够参考 org.apache.kafka.clients.producer 包下的 ProducerConfig 类。代码见模块 kafka-no-spring 下包 ProducerConfig 中 ConfigKafkaProducer 类
acks:
Kafk 内部的复制机制是比较复杂的,这里不谈论内部机制(后续章节进行细讲),咱们只讨论生产者发送消息时与副本的关系。
指定了必需要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。
acks=0:生产者在写入消息以前不会等待任 何来自服务器的响应,容易丢消息,可是吞吐量高。
acks=1:只要集群的首领节点收到消息,生产者会收到来自服务器的成功响应。若是消息没法到达首领节点(好比首领节点崩溃,新首领没有选举出 来),生产者会收到一个错误响应,为了不数据丢失,生产者会重发消息。不过,若是一个没有收到消息的节点成为新首领,消息仍是会丢失。默认 使用这个配置。
acks=all:只有当全部参与复制的节点都收到消息,生产者才会收到一个来自服务器的成功响应。延迟高。
金融业务,主备外加异地灾备。因此不少高可用场景通常不是设置 2 个副本,有可能达到 5 个副本,不一样机架上部署不一样的副本,异地上也部署一 套副本。
buffer.memory
设置生产者内存缓冲区的大小(结合生产者发送消息的基本流程),生产者用它缓冲要发送到服务器的消息。若是数据产生速度大于向 broker 发送 的速度,致使生产者空间不足,producer 会阻塞或者抛出异常。缺省 33554432 (32M)
max.block.ms
指定了在调用 send()方法或者使用 partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些 方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。缺省 60000ms
retries
发送失败时,指定生产者能够重发消息的次数(缺省 Integer.MAX_VALUE)。默认状况下,生产者在每次重试之间等待 100ms,能够经过参数 retry.backoff.ms 参数来改变这个时间间隔。
receive.buffer.bytes 和 send.buffer.bytes
指定 TCP socket 接受和发送数据包的缓存区大小。若是它们被设置为-1,则使用操做系统的默认值。若是生产者或消费者处在不一样的数据中心,那么 能够适当增大这些值,由于跨数据中心的网络通常都有比较高的延迟和比较低的带宽。缺省 102400
batch.size
当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可使用的内存大小,按照字节数计算。当批次内存 被填满后,批次里的全部消息会被发送出去。可是生产者不必定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送。缺省 16384(16k) ,若是一条消息超过了批次的大小,会写不进去。
linger.ms
指定了生产者在发送批次前等待更多消息加入批次的时间。它和 batch.size 以先到者为先。也就是说,一旦咱们得到消息的数量够 batch.size 的数量 了,他将会当即发送而不顾这项设置,然而若是咱们得到消息字节数比 batch.size 设置要小的多,咱们须要“linger”特定的时间以获取更多的消息。这个设 置默认为 0,即没有延迟。设定 linger.ms=5,例如,将会减小请求数目,可是同时会增长 5ms 的延迟,但也会提高消息的吞吐量。
compression.type
producer 用于压缩数据的压缩类型。默认是无压缩。正确的选项值是 none、gzip、snappy。压缩最好用于批量处理,批量处理消息越多,压缩性能越 好。snappy 占用 cpu 少,提供较好的性能和可观的压缩比,若是比较关注性能和网络带宽,用这个。若是带宽紧张,用 gzip,会占用较多的 cpu,但提供 更高的压缩比。
client.id
当向 server 发出请求时,这个字符串会发送给 server。目的是可以追踪请求源头,以此来容许 ip/port 许可列表以外的一些应用能够发送信息。这项 应用能够设置任意字符串,由于没有任何功能性的目的,除了记录和跟踪。
max.in.flight.requests.per.connection
指定了生产者在接收到服务器响应以前能够发送多个消息,值越高,占用的内存越大,固然也能够提高吞吐量。发生错误时,可能会形成数据的发 送顺序改变,默认是 5 (修改)。 若是须要保证消息在一个分区上的严格顺序,这个值应该设为 1。不过这样会严重影响生产者的吞吐量。
request.timeout.ms
客户端将等待请求的响应的最大时间,若是在这个时间内没有收到响应,客户端将重发请求;超太重试次数将抛异常,默认 30 秒。
metadata.fetch.timeout.ms
是指咱们所获取的一些元数据的第一个时间数据。元数据包含:topic,host,partitions。此项配置是指当等待元数据 fetch 成功完成所须要的时间, 不然会跑出异常给客户端
max.request.size
控制生产者发送请求最大大小。默认这个值为 1M,若是一个请求里只有一个消息,那这个消息不能大于 1M,若是一次请求是一个批次,该批次包 含了 1000 条消息,那么每一个消息不能大于 1KB。注意:broker 具备本身对消息记录尺寸的覆盖,若是这个尺寸小于生产者的这个设置,会致使消息被拒 绝。这个参数和 Kafka 主机的 message.max.bytes 参数有关系。若是生产者发送的消息超过 message.max.bytes 设置的大小,就会被 Kafka 服务器拒绝。
以上参数不用去,通常来讲,就记住 acks、batch.size、linger.ms、max.request.size 就好了,由于这 4 个参数重要些,其余参数通常没有太大必要调整。
顺序保证
Kafka 能够保证同一个分区里的消息是有序的。也就是说,发送消息时,主题只有且只有一个分区,同时生产者按照必定的顺序发送消息, broker 就 会按照这个顺序把它们写入分区,消费者也会按照一样的顺序读取它们。在某些状况下, 顺序是很是重要的。例如,往一个帐户存入 100 元再取出来, 这个与先取钱再存钱是大相径庭的!不过,有些场景对顺序不是很敏感。
若是把 retires 设为非零整数,同时把 max.in.flight.requests.per.connection 设为比 1 大的数,那么,若是第一个批次消息写入失败,而第二个批次写 入成功, broker 会重试写入第一个批次。若是此时第一个批次也写入成功,那么两个批次的顺序就反过来了。
通常来讲,若是某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,因此不建议把 retires 设为 0(不重试的话消息可能会由于链接关 闭等缘由会丢) 。因此仍是须要重试,同时把 max.in.flight.request.per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其余的消息发 送给 broker 。不过这样会严重影响生产者的吞吐量,因此只有在对消息的顺序有严格要求的状况下才能这么作
序列化
建立生产者对象必须指定序列化器,默认的序列化器并不能知足咱们全部的场景。咱们彻底能够自定义序列化器。只要实现 org.apache.kafka.common.serialization.Serializer 接口便可。
自定义序列化须要考虑的问题
自定义序列化容易致使程序的脆弱性。举例,在咱们上面的实现里,咱们有多种类型的消费者,每一个消费者对实体字段都有各自的需求,好比,有 的将字段变动为 long 型,有的会增长字段,这样会出现新旧消息的兼容性问题。特别是在系统升级的时候,常常会出现一部分系统升级,其他系统被迫 跟着升级的状况。
解决这个问题,能够考虑使用自带格式描述以及语言无关的序列化框架。好比 Protobuf,或者 Kafka 官方推荐的 Apache Avro。
Avro 会使用一个 JSON 文件做为 schema 来描述数据,Avro 在读写时会用到这个 schema,能够把这个 schema 内嵌在数据文件中。这样,无论数据格 式如何变更,消费者都知道如何处理数据。
可是内嵌的消息,自带格式,会致使消息的大小没必要要的增大,消耗了资源。咱们可使用 schema 注册表机制,将全部写入的数据用到的 schema 保存在注册表中,而后在消息中引用 schema 的标识符,而读取的数据的消费者程序使用这个标识符从注册表中拉取 schema 来反序列化记录。
注意:Kafka 自己并不提供 schema 注册表,须要借助第三方,如今已经有不少的开源实现,好比 Confluent Schema Registry,能够从 GitHub 上获取。
如何使用参考以下网址:
https://cloud.tencent.com/developer/article/1336568
不过通常除非你使用 Kafka 须要关联的团队比较大,敏捷开发团队才会使用,通常的团队用不上。对于通常的状况使用 JSON 足够了。
分区
咱们在新增 ProducerRecord 对象中能够看到,ProducerRecord 包含了目标主题,键和值,Kafka 的消息都是一个个的键值对。键能够设置为默认的 null。键的主要用途有两个:一,用来决定消息被写往主题的哪一个分区,拥有相同键的消息将被写往同一个分区,二,还能够做为消息的附加消息。
若是键值为 null,而且使用默认的分区器,分区器使用轮询算法将消息均衡地分布到各个分区上。
若是键不为空,而且使用默认的分区器,Kafka 对键进行散列(Kafka 自定义的散列算法,具体算法原理不知),而后根据散列值把消息映射到特定 的分区上。很明显,同一个键老是被映射到同一个分区。可是只有不改变主题分区数量的状况下,键和分区之间的映射才能保持不变,一旦增长了新的 分区,就没法保证了,因此若是要使用键来映射分区,那就要在建立主题的时候把分区规划好,并且永远不要增长新分区
自定义分区器
某些状况下,数据特性决定了须要进行特殊分区,好比电商业务,北京的业务量明显比较大,占据了总业务量的 20%,咱们须要对北京的订单进行 单独分区处理,默认的散列分区算法不合适了, 咱们就能够自定义分区算法,对北京的订单单独处理,其余地区沿用散列分区算法。或者某些状况下, 咱们用 value 来进行分区。 具体实现,先建立一个 4 分区的主题,而后观察模块 kafka-no-spring 下包 SelfPartitionProducer 中代码。