在上一篇文章【大数据实践】Kafka生产者编程(1)——KafkaProducer详解中,主要对KafkaProducer
类中的函数进行了详细的解释,但仅针对其中的一些方法,对于producer背后的原理、机制,没有作深刻讲解。所以,在本文章中,尝试介绍kafka producer整个发送流程。在写做此文章时,本身也处于对Kafka的学习阶段,可能有些细节掌握的并不精确,但愿你们指正。java
备注:图片来源:https://blog.csdn.net/zhanglh...算法
在上一篇文章中,详细介绍了KafkaProducer的构造函数,其主要是对producer的一些选项进行配置。配置项可在类ProducerConfig
中找到:apache
package org.apache.kafka.clients.producer; public class ProducerConfig extends AbstractConfig { ... }
其中,除了能够配置一些简单的数值,还能够配置一些kafka自带或者咱们自定义的类,如:编程
key.serializer
:key的序列化类,kafka在包package org.apache.kafka.common.serialization;
中实现了一系列经常使用的序列化和反序列化的类。若要自定义序列化类,则须要实现接口org.apache.kafka.common.serialization.Serializer
,如Integer的序列化类:segmentfault
package org.apache.kafka.common.serialization; import java.util.Map; public class IntegerSerializer implements Serializer<Integer> { public IntegerSerializer() { } public void configure(Map<String, ?> configs, boolean isKey) { } public byte[] serialize(String topic, Integer data) { return data == null ? null : new byte[]{(byte)(data.intValue() >>> 24), (byte)(data.intValue() >>> 16), (byte)(data.intValue() >>> 8), data.byteValue()}; } public void close() { } }
value.serializer
:value的序列化类。partitioner.class
:partition分配的类,使消息均匀发送到topic的各个分区partition中,Kafka默认partition为org.apache.kafka.clients.producer.internals.DefaultPartitioner
。若要自定义负载均衡算法,须要实现org.apache.kafka.clients.producer.Partitioner
接口。拦截链Interceptors
:为拦截器List,可让用户在消息记录发送以前,或者producer回调方法执行以前,对消息或者回调信息作一些逻辑处理。能够经过实现org.apache.kafka.clients.producer.ProducerInterceptor
接口来定义本身的拦截器。ProducerRecord即消息记录,记录了要发送给kafka集群的消息、分区等信息:缓存
public class ProducerRecord<K, V> { private final String topic; private final Integer partition; private final Headers headers; private final K key; private final V value; private final Long timestamp;
topic
:必须字段,表示该消息记录record发送到那个topic。value
:必须字段,表示消息内容。partition
:可选字段,要发送到哪一个分区partition。key
:可选字段,消息记录的key,可用于计算选定partition。timestamp
:可选字段,时间戳;表示该条消息记录的建立时间createtime,若是不指定,则默认使用producer的当前时间。headers
:可选字段,(做用暂时不明,待再查证补充)。异步发送时,直接将消息记录扔进发送缓冲区,当即返回,有另外的线程负责将缓冲区中的消息发送出去。异步发送时,须要设置callback
方法,当收到broker的ack确认时,将调用callback方法。下面直接贴kafka官方例子中,展现的异步和同步发送方法:负载均衡
package kafka.examples; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; public class Producer extends Thread { private final KafkaProducer<Integer, String> producer; private final String topic; private final Boolean isAsync; public Producer(String topic, Boolean isAsync) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<>(props); this.topic = topic; this.isAsync = isAsync; } public void run() { int messageNo = 1; while (true) { String messageStr = "Message_" + messageNo; long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { // Send synchronously try { producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } ++messageNo; } } } class DemoCallBack implements Callback { private final long startTime; private final int key; private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } /** * A callback method the user can implement to provide asynchronous handling of request completion. This method will * be called when the record sent to the server has been acknowledged. Exactly one of the arguments will be * non-null. * * @param metadata The metadata for the record that was sent (i.e. the partition and offset). Null if an error * occurred. * @param exception The exception thrown during processing of this record. Null if no error occurred. */ public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println( "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } }
调用send方法时,首先拦截器Interceptor将拦截ProducerRecord,调用Interceptor的onSend方法,对消息记录进行一些处理,返回处理后的ProducerRecord。异步
调用配置的key 和 value的序列化类,对ProducerRecord的key和value进行序列化,并设置到ProducerRecord中。async
经过DefaultPartitioner类或者配置项中指定的自定义Partitioner类中的partiton方法,计算出消息要发送到topic中某个分区partition。设置到ProducerRecord中。ide
根据配置项max.request.size
和buffer.memory
进行检查,超出任何一项就会抛出异常。
若是ProducerRecord构建时已经指定时间戳,则用构建时指定的,不然用当前时间。
ProducerRecord放入缓存区(RecordAccumulator维护)时,发往相同topic的相同partition的消息记录将会被捆绑batch压缩,压缩到ProducerBatch中。也就是说,ProducerBatch中可能包含多个ProducerRecord。这样作的目的是为了一次请求发送多个record,提升性能。
RecordAccumulator为每个TopicPartition维护了一个双端队列:
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
相同topic的相同partition的ProducerBatch将被放在对应的队列中。
压缩策略有:
·NONE:就是不压缩。
·GZIP:压缩率为50%
·SNAPPY:压缩率为50%
·LZ4:压缩率为50%
当一个ProducerBatch已满或者有新的ProducerBatch到达时,会唤醒真正发送消息记录的发送线程Sender,将ProducerBatch发送到kafka集群。
Sender的发送逻辑以下:
本文章对producer消息大致发送流程进行一次梳理,其中有一些本身还不是特别懂,也就没有写得特别详细,后续若是有进一步的了解,将修改本文进行补充。后面的文章将对发送过程当中构建Producer时,自定义Inteceptor和自定义Partitioner进行介绍。