【大数据实践】Kafka生产者编程(1)——KafkaProducer详解

前言

在文章【大数据实践】游戏事件处理系统系列文章中中,咱们已经作到使用filebeat收集日志事件、logstash处理日志事件、发送日志事件到kafka集群,并在消费者中消费的过程。其中,为kafka集群生产消息的,是logstash服务,而非咱们自定义的生成者。在本文中,将主要介绍KafkaProducer类相关的一些接口和理论知识(基于kafka 1.1版本)。html

KafkaProducer类

Package org.apache.kafka.clients.producer

public class KafkaProducer<K, V> extends java.lang.Object implements Producer<K, V>
  • KafkaProducer类用于向kafka集群发布消息记录,其中<K, V>为泛型,指明发送的消息记录key/value对的类型。
  • kafka producer(生产者)是线程安全的,多个线程共享一个producer实例,相比于多个producer实例,这样作的效率更高、更快。

构造函数

构造函数 描述
KafkaProducer​(java.util.Map<java.lang.String,java.lang.Object> configs) 配置信息为Map形式,构造Producer
KafkaProducer​(java.util.Map<java.lang.String,java.lang.Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) 配置信息为Map形式,能够指定自定义的用于序列化key和value的类。
KafkaProducer​(java.util.Properties properties) 配置信息放在Properties对象中,构造Producer。如,能够从配置文件***.properties中读取,或者新建Properties对象,再设置配置信息。
KafkaProducer​(java.util.Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) 配置信息放在Properties对象中,可指定自定义的key和value的序列化类。
  • 从上述构造函数能够看出,能够经过Map和Properties两种形式传递配置信息,用于构造Producer对象,配置信息均为key/value对。
  • 可配置的信息可参见官方配置表producercofigs,其中Value能够是String类型,也能够是其余合适的类型。
  • kafka的包org.apache.kafka.common.serialization中,提供了许多已经实现好的序列化反序列化的类,能够直接使用。你也能够实现本身的序列化和反序列化类(实现Serializer接口),选择合适的构造函数构造你的Producer类。
  • 想用kafka自带的序列化类,可在配置信息中配置,如:java

    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  • producer被建立以后,使用完以后,必定要记得将其close,不然会形成资源泄露的问题。

方法

修饰&返回 方法 描述
void abortTransaction​() 停止进行中的事务
void beginTransaction​() 开始事务,在开始任何新事务以前,都英应该调用此方法
void close​() 关闭该producer,释放资源
void close​(long timeout, java.util.concurrent.TimeUnit timeUnit) 等待timeout指定的时长后关闭该producer,以便producer能够将还未完成发送的消息发送完。timeUnit为时间单位。若是超时,则强制关闭。
void commitTransaction​() 提交进行中的事务
void flush() 调用次方法,使kafka生成者发送缓冲区中的消息记录(record)能够被当即发送(即便linger.ms大于0)。而且一直阻塞,直到这些消息记录都发送完成
void initTransactions​() 当构造producer时,若是配置了transactional.id,那么在调用其transaction相关函数以前,都必须先调用该函数
java.util.Map<MetricName,? extends Metric> metrics​() 列出producer中维护的全部内部监控(metrics)设置
java.util.List<PartitionInfo> partitionsFor​(java.lang.String topic) 从指定的主题(topic)中,获取分区(partition)的元数据
java.util.concurrent.Future<RecordMetadata> send​(ProducerRecord<K,V> record) 异步发送消息记录(record)到指定的主题
java.util.concurrent.Future<RecordMetadata> send​(ProducerRecord<K,V> record, Callback callback) 异步发送一个消息记录到指定topic,当发送被确认完成以后,调用回调函数(callback)
void sendOffsetsToTransaction​(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, java.lang.String consumerGroupId) 发送指定的偏移量列表(offsets)到消费者组协调者(consumer group coordinator),而且将这些偏移量(offsets)标记为当前事务的一部分
  • 若是close()方法是在回调方法callback中被调用,那么kafka将会输出一条警告日志,而且将其替换为close(0, TimeUnit.MILLISECONDS),这样作的目的是为了不发送线程(sender thread)永远阻塞。
  • flush()函数的后置条件(方法顺利执行完毕以后必须为真的条件)是:待发送缓冲区中全部待发送的记录record都发送完成。发送完成指的是成功收到了在构建producer时设置的acks 配置的确认acks
  • 当一个线程阻塞于flush调用(等待其完成)时,其余线程能够继续发送消息记录record,但不保证这些在flush()调用开始以后发送的消息记录可以真正完成。能够经过设置重试配置retries=<large_number>来下降消息记录不被送达的状况。
  • 对于事务性的producer,不须要调用flush()函数,由于commitTransaction() 函数在提交事务以前,会将缓冲中的记录进行flush,这样能够确保那些在 beginTransaction() 以前被send(ProducerRecord)的消息记录将在事务提交以前完成。
  • 在第一次调用beginTransaction()方法以前,必须调用一次initTransactions()方法。
  • initTransactions()方法的主要做用是:apache

    • 确保被其余producer实例初始化的、具备相同transactional.id的事务被完成。若是这些事务在进行过程当中失败,则事务被停止;若是事务已经开始,但还没完成,那么initTransactions()函数会等待它完成。
    • 获取内部producer的id和epoch,用于后续该producer产生的事务性消息。
  • send​(ProducerRecord<K,V> record, Callback callback)方法将在record被送入到待发送的buffer以后,当即返回。所以,可容许并行发送record,而不用阻塞等待每一个record发送完成。该方法的返回值RecordMetadata为该record被发送到的分区partition元数据,如偏移量offset,建立时间CreateTime等。要想阻塞等待发送完成,能够调用Future的get()方法,如:segmentfault

    producer.send(record).get();

小结

上面基本上对KafkaProducer类的主要接口作了解释,主要参考了官方文档,从上面的一些方法中,能够看到kafka的一些特性,如:发送缓冲区机制,事务性producer等,这些复杂概念将在后续文章中再作深刻探索。安全

相关文章
相关标签/搜索