>舒适提示:整个 Kafka Client 专栏基于 kafka-2.3.0 版本。java
根据 KafkaProducer 类上的注释上来看 KafkaProducer 具备以下特征:算法
KafkaProducer 是线程安全的,能够被多个线程交叉使用。apache
KafkaProducer 内部包含一个缓存池,存放待发送消息,即 ProducerRecord 队列,与此同时会开启一个IO线程将 ProducerRecord 对象发送到 Kafka 集群。bootstrap
KafkaProducer 的消息发送 API send 方法是异步,只负责将待发送消息 ProducerRecord 发送到缓存区中,当即返回,并返回一个结果凭证 Future。api
acks KafkaProducer 提供了一个核心参数 acks 用来定义消息“已提交”的条件(标准),就是 Broker 端向客户端承偌已提交的条件,可选值以下:缓存
retries kafka 在生产端提供的另一个核心属性,用来控制消息在发送失败后的重试次数,设置为 0 表示不重试,重试就有可能形成消息在发送端的重复。安全
batch.size kafka 消息发送者为每个分区维护一个未发送消息积压缓存区,其内存大小由batch.size指定,默认为 16K。 但若是缓存区中不足100条,但发送线程此时空闲,是须要等到缓存区中积满100条才能发送仍是能够当即发送呢?默认是当即发送,即 batch.size 的做用实际上是客户端一次发送到broker的最大消息数量。架构
linger.ms 为了提升 kafka 消息发送的高吞吐量,即控制在缓存区中未积满 batch.size 时来控制 消息发送线程的行为,是当即发送仍是等待必定时间,若是linger.ms 设置为 0表示当即发送,若是设置为大于0,则消息发送线程会等待这个值后才会向broker发送。该参数者会增长响应时间,但有利于增长吞吐量。有点相似于 TCP 领域的 Nagle 算法。并发
buffer.memory 用于控制消息发送者缓存的总内存大小,若是超过该值,往缓存区中添加消息会被阻塞,具体会在下文的消息发送流程中详细介绍,阻塞的最大时间可经过参数 max.block.ms 设置,阻塞超过该值会抛出超时异常。app
key.serializer 指定 key 的序列化处理器。
value.serializer 指定 消息体的序列化处理器。
enable.idempotence 从 kafka0.11版本开始,支持消息传递幂等,能够作到消息只会被传递一次,经过 enable.idempotence 为 true 来开启。若是该值设置为 true,其 retries 将设置为 Integer.MAX_VALUE,acks 将被设置为 all。为了确保消息发送幂等性,必须避免应用程序端的任何重试,而且若是消息发送API若是返回错误,应用端应该记录最后成功发送的消息,避免消息的重复发送。
从Kafka 0.11开始,kafka 也支持事务消息。
在 Kafka 中,生产者经过接口 Producer 定义,经过该接口的方法,咱们基本能够得知 KafkaProducer 将具有以下基本能力:
上面的方法咱们会根据须要在后续文章中进行详细的介绍。接下来咱们看一下 KafkaProducer 的核心属性的含义。
通过上面的梳理,详细读者朋友对 KafkaProducer 消息生产者有了一个大概的认识,下一篇会重点介绍消息发送流程。接下来咱们以一个简单的示例结束本文的学习。
package persistent.prestige.demo.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.Future; public class KafkaProducerTest { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072,"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<string, string> producer = new KafkaProducer<>(props); try { for (int i = 0; i < 100; i++) { Future<recordmetadata> future = producer.send(new ProducerRecord<string, string>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i))); RecordMetadata recordMetadata = future.get(); System.out.printf("offset:" + recordMetadata.offset()); } } catch (Throwable e) { e.printStackTrace(); } finally { producer.close(); } } }
本文就介绍到这里,其主要的目的是了解Kafka 的 Producer,引出后续须要学习的内容,下一篇将重点讲述 Kafka 消息的发送流程,敬请关注。
若是本文对你们有所帮助的话,麻烦帮忙点个赞,谢谢。
做者介绍:丁威,《RocketMQ技术内幕》做者,RocketMQ 社区布道师,公众号:中间件兴趣圈 维护者,目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。能够点击连接:中间件知识星球,一块儿探讨高并发、分布式服务架构,交流源码。
</string,></recordmetadata></string,></k,></k,></k,>