本文将学习Kafka生产者的使用和原理,文中使用的kafka-clients版本号为2.6.0。下面进入正文,先经过一个示例看下如何使用生产者API发送消息。node
public class Producer { public static void main(String[] args) { // 1. 配置参数 Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 根据参数建立KafkaProducer实例(生产者) KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 3. 建立ProducerRecord实例(消息) ProducerRecord<String, String> record = new ProducerRecord<>("topic-demo", "hello kafka"); // 4. 发送消息 producer.send(record); // 5. 关闭生产者示例 producer.close(); } }
首先建立一个Properties实例,设置了三个必填参数:apache
因为broker但愿接受的是字节数组,因此须要将消息中的键值序列化成字节数组。在设置好参数后,根据参数建立KafkaProducer实例,也就是用于发送消息的生产者,接着再建立准备发送的消息ProducerRecord实例,而后使用KafkaProducer的send方法发送消息,最后再关闭生产者。bootstrap
关于KafkaProducer,咱们先记住两点:api
关于配置咱们先只了解这三个必填参数,下面咱们看下send方法,关于发送消息的方式有三种:数组
发送并忘记(fire-and-forget)
在发送消息给Kafka时,不关心消息是否正常到达,只负责成功发送,存在丢失消息的可能。上面给出的示例就是这种方式。缓存
Future<RecordMetadata> recordMetadataFuture = producer.send(record); RecordMetadata recordMetadata = recordMetadataFuture.get();
RecordMetadata对象中包含有消息的一些元数据,如消息的主题、分区号、分区中的偏移量、时间戳等。markdown
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); } else { System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + ":" + recordMetadata.offset()); } } });
onCompletion有两个参数,其类型分别是RecordMetadata和Exception。当消息发送成功时,recordMetadata为非null,而e将为null。当消息发送失败时,则反之。
下面咱们认识下消息对象ProducerRecord,封装了发送的消息,其定义以下:架构
public class ProducerRecord<K, V> { private final String topic; // 主题 private final Integer partition; // 分区号 private final Headers headers; // 消息头部 private final K key; // 键 private final V value; // 值 private final Long timestamp; // 时间戳 // ...其余构造方法和成员方法 }
其中主题和值为必填,其他非必填。例如当给出了分区号,则至关于指定了分区,而当未给出分区号时,若给出了键,则可用于计算分区号。关于消息头部和时间戳,暂不讲述。app
在对生产者对象KafkaProducer和消息对象ProducerRecord有了认识后,下面咱们看下在使用生产者发送消息时,会使用到的组件有生产者拦截器、序列化器和分区器。其架构(部分)以下:异步
下面结合代码来看下处理过程,加深印象。
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // 拦截器,拦截消息进行处理 ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
上面是KafkaProducer的send方法,首先会将消息传给拦截器的onSend方法,而后进入doSend方法。其中doSend方法较长,但内容并不复杂,下面给出了主要步骤的注释。
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { throwIfProducerClosed(); // 1.确认数据发送到的topic的metadata可用 long nowMs = time.milliseconds(); ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } nowMs += clusterAndWaitTime.waitedOnMetadataMs; long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; // 2.序列化器,序列化消息的key和value byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } // 3.分区器,获取或计算分区号 int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); } // 4.消息累加器,缓存消息 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); if (result.abortForNewBatch) { int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); // 5.若是batch满了或者消息大小超过了batch的剩余空间须要建立新的batch // 将唤醒sender线程发送消息 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { this.interceptors.onSendError(record, tp, e); throw e; } }
doSend方法主要分为5个步骤:
关于meatadata本文将不深究,序列化器、分区器前文也给出了介绍。下面咱们主要看下消息累加器。
消息累加器,其做用是用于缓存消息,以便批量发送消息。在RecordAccumulator中用一个ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches的map变量保存消息。做为key的TopicPartition封装了topic和分区号,而对应的value为ProducerBatch的双端队列,也就是将发往同一个分区的消息缓存在ProducerBatch中。在发送消息时,Record会被追加在队列的尾部,即加入到尾部的ProducerBatch中,若是ProducerBatch的空间不足或队列为空,则将建立新的ProducerBatch,而后追加。当ProducerBatch满了或建立新的ProducerBatch时,将唤醒Sender线程从队列的头部获取ProducerBatch进行发送。
RecordAccumulator
在Sender线程中会将待发送的ProducerBatch将转换成<Integer, List<ProducerBatch>>的形式,按Kafka节点的ID进行分组,而后将同一个node的ProducerBatch放在一个请求中发送。
Kafak生产者的内容就先了解到这,下面经过思惟导图对本文内容作一个简单的回顾: