根据 KafkaProducer 类上的注释上来看 KafkaProducer 具备以下特征:java
KafkaProducer 是线程安全的,能够被多个线程交叉使用。算法
KafkaProducer 内部包含一个缓存池,存放待发送消息,即 ProducerRecord 队列,与此同时会开启一个IO线程将 ProducerRecord 对象发送到 Kafka 集群。apache
KafkaProducer 的消息发送 API send 方法是异步,只负责将待发送消息 ProducerRecord 发送到缓存区中,当即返回,并返回一个结果凭证 Future。bootstrap
acks
KafkaProducer 提供了一个核心参数 acks 用来定义消息“已提交”的条件(标准),就是 Broker 端向客户端承偌已提交的条件,可选值以下:api
0
表示生产者不关系该条消息在 broker 端的处理结果,只要调用 KafkaProducer 的 send 方法返回后即认为成功,显然这种方式是最不安全的,由于 Broker 端可能压根都没有收到该条消息或存储失败。缓存
all 或 -1
表示消息不只须要 Leader 节点已存储该消息,而且要求其副本(准确的来讲是 ISR 中的节点)所有存储才认为已提交,才向客户端返回提交成功。这是最严格的持久化保障,固然性能也最低。安全
1
表示消息只须要写入 Leader 节点后就能够向客户端返回提交成功。app
retries
kafka 在生产端提供的另一个核心属性,用来控制消息在发送失败后的重试次数,设置为 0 表示不重试,重试就有可能形成消息在发送端的重复。负载均衡
batch.size
kafka 消息发送者为每个分区维护一个未发送消息积压缓存区,其内存大小由batch.size指定,默认为 16K。
但若是缓存区中不足100条,但发送线程此时空闲,是须要等到缓存区中积满100条才能发送仍是能够当即发送呢?默认是当即发送,即 batch.size 的做用实际上是客户端一次发送到broker的最大消息大小。异步
linger.ms
为了提升 kafka 消息发送的高吞吐量,即控制在缓存区中未积满 batch.size 时来控制 消息发送线程的行为,是当即发送仍是等待必定时间,若是linger.ms 设置为 0表示当即发送,若是设置为大于0,则消息发送线程会等待这个值后才会向broker发送。该参数会增长响应时间,但有利于增长吞吐量。有点相似于 TCP 领域的 Nagle 算法。
buffer.memory
用于控制消息发送者缓存的总内存大小,若是超过该值,往缓存区中添加消息会被阻塞,具体会在下文的消息发送流程中详细介绍,阻塞的最大时间可经过参数 max.block.ms 设置,阻塞超过该值会抛出超时异常。
key.serializer
指定 key 的序列化处理器。
value.serializer
指定 消息体的序列化处理器。
enable.idempotence
从 kafka0.11版本开始,支持消息传递幂等,能够作到消息只会被传递一次,经过 enable.idempotence 为 true 来开启。若是该值设置为 true,其 retries 将设置为 Integer.MAX_VALUE,acks 将被设置为 all。为了确保消息发送幂等性,必须避免应用程序端的任何重试,而且若是消息发送API若是返回错误,应用端应该记录最后成功发送的消息,避免消息的重复发送。
从Kafka 0.11开始,kafka 也支持事务消息。
在 Kafka 中,生产者经过接口 Producer 定义,经过该接口的方法,咱们基本能够得知 KafkaProducer 将具有以下基本能力:
void initTransactions()
初始化事务,若是须要使用事务方法,该方法必须首先被调用。
void beginTransaction()
开启事务。
void sendOffsetsToTransaction(Map< TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId)
向消费组提交当前事务中的消息偏移量,将在介绍 Kafka 事务相关文章中详细介绍。
void commitTransaction()
提交事务。
void abortTransaction()
回滚事务。
Future< RecordMetadata> send(ProducerRecord
record)Future< RecordMetadata> send(ProducerRecord
record, Callback callback)void flush()
忽略 linger.ms 的值,直接唤醒发送线程,将缓冲区中的消息所有发送到 broker。
List< PartitionInfo> partitionsFor(String topic)
获取 topic 的路由信息(分区信息)。
Map< MetricName, ? extends Metric> metrics()
获取由生产者收集的统计信息。
void close()
关闭发送者。
void close(Duration timeout)
定时关闭消息发送者。
上面的方法咱们会根据须要在后续文章中进行详细的介绍。接下来咱们看一下 KafkaProducer 的核心属性的含义。
String clientId
客户端ID。在建立 KafkaProducer 时可经过 client.id 定义 clientId,若是未指定,则默认 producer- seq,seq 在进程内递增,强烈建议客户端显示指定 clientId。
Metrics metrics
度量的相关存储容器,例如消息体大小、发送耗时等与监控相关的指标。
Partitioner partitioner
分区负载均衡算法,经过参数 partitioner.class 指定。
int maxRequestSize
调用 send 方法发送的最大请求大小,包括 key、消息体序列化后的消息总大小不能超过该值。经过参数 max.request.size 来设置。
long totalMemorySize
生产者缓存所占内存的总大小,经过参数 buffer.memory 设置。
Metadata metadata
元数据信息,例如 topic 的路由信息,由 KafkaProducer 自动更新。
RecordAccumulator accumulator
消息记录累积器,将在消息发送部分详细介绍。
Sender sender
用于封装消息发送的逻辑,即向 broker 发送消息的处理逻辑。
Thread ioThread
用于消息发送的后台线程,一个独立的线程,内部使用 Sender 来向 broker 发送消息。
CompressionType compressionType
压缩类型,默认不启用压缩,可经过参数 compression.type 配置。可选值:none、gzip、snappy、lz四、zstd。
Sensor errors
错误信息收集器,当成一个 metrics,用来作监控的。
Time time
用于获取系统时间或线程睡眠等。
Serializer< K> keySerializer
用于对消息的 key 进行序列化。
Serializer< V> valueSerializer
对消息体进行序列化。
ProducerConfig producerConfig
生产者的配置信息。
long maxBlockTimeMs
最大阻塞时间,当生产者使用的缓存已经达到规定值后,此时消息发送会阻塞,经过参数 max.block.ms 来设置最多等待多久。
ProducerInterceptors
interceptorsApiVersions apiVersions
维护 api 版本的相关元信息,该类只能在 kafka 内部使用。
TransactionManager transactionManager
kafka 消息事务管理器。
TransactionalRequestResult initTransactionsResult
kafka 生产者事务上下文环境初始结果。
通过上面的梳理,详细读者朋友对 KafkaProducer 消息生产者有了一个大概的认识,下一篇会重点介绍消息发送流程。接下来咱们以一个简单的示例结束本文的学习。
package persistent.prestige.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerTest {
public static void main(String[] args){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072,");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
try {
for (int i = 0; i < 100; i++) {
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i)));
RecordMetadata recordMetadata = future.get();
System.out.printf("offset:" + recordMetadata.offset());
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
本文就介绍到这里,其主要的目的是了解Kafka 的 Producer,引出后续须要学习的内容,下一篇将重点讲述 Kafka 消息的发送流程,敬请关注。
若是本文对你们有所帮助的话,麻烦帮忙点个【在看】,谢谢。
https://mp.weixin.qq.com/s/rUJSctU8qdGL-7ri_9Kgkw