在文章【大数据实践】游戏事件处理系统系列文章中中,咱们已经作到使用filebeat收集日志事件、logstash处理日志事件、发送日志事件到kafka集群,并在消费者中消费的过程。其中,为kafka集群生产消息的,是logstash服务,而非咱们自定义的生成者。在本文中,将主要介绍KafkaProducer
类相关的一些接口和理论知识(基于kafka 1.1版本)。html
Package org.apache.kafka.clients.producer public class KafkaProducer<K, V> extends java.lang.Object implements Producer<K, V>
KafkaProducer
类用于向kafka集群发布消息记录,其中<K, V>
为泛型,指明发送的消息记录key/value对的类型。构造函数 | 描述 |
---|---|
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs) | 配置信息为Map形式,构造Producer |
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) | 配置信息为Map形式,能够指定自定义的用于序列化key和value的类。 |
KafkaProducer(java.util.Properties properties) | 配置信息放在Properties对象中,构造Producer。如,能够从配置文件***.properties 中读取,或者新建Properties对象,再设置配置信息。 |
KafkaProducer(java.util.Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) | 配置信息放在Properties对象中,可指定自定义的key和value的序列化类。 |
Producer
对象,配置信息均为key/value
对。org.apache.kafka.common.serialization
中,提供了许多已经实现好的序列化和反序列化的类,能够直接使用。你也能够实现本身的序列化和反序列化类(实现Serializer
接口),选择合适的构造函数构造你的Producer
类。想用kafka自带的序列化类,可在配置信息中配置,如:java
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
close
,不然会形成资源泄露的问题。修饰&返回 | 方法 | 描述 |
---|---|---|
void | abortTransaction() | 停止进行中的事务 |
void | beginTransaction() | 开始事务,在开始任何新事务以前,都英应该调用此方法 |
void | close() | 关闭该producer,释放资源 |
void | close(long timeout, java.util.concurrent.TimeUnit timeUnit) | 等待timeout指定的时长后关闭该producer,以便producer能够将还未完成发送的消息发送完。timeUnit为时间单位。若是超时,则强制关闭。 |
void | commitTransaction() | 提交进行中的事务 |
void | flush() | 调用次方法,使kafka生成者发送缓冲区中的消息记录(record)能够被当即发送(即便linger.ms 大于0)。而且一直阻塞,直到这些消息记录都发送完成 |
void | initTransactions() | 当构造producer时,若是配置了transactional.id ,那么在调用其transaction相关函数以前,都必须先调用该函数 |
java.util.Map<MetricName,? extends Metric> | metrics() | 列出producer中维护的全部内部监控(metrics)设置 |
java.util.List<PartitionInfo> | partitionsFor(java.lang.String topic) | 从指定的主题(topic)中,获取分区(partition)的元数据 |
java.util.concurrent.Future<RecordMetadata> | send(ProducerRecord<K,V> record) | 异步发送消息记录(record)到指定的主题 |
java.util.concurrent.Future<RecordMetadata> | send(ProducerRecord<K,V> record, Callback callback) | 异步发送一个消息记录到指定topic,当发送被确认完成以后,调用回调函数(callback) |
void | sendOffsetsToTransaction(java.util.Map<TopicPartition,OffsetAndMetadata> offsets, java.lang.String consumerGroupId) | 发送指定的偏移量列表(offsets)到消费者组协调者(consumer group coordinator),而且将这些偏移量(offsets)标记为当前事务的一部分 |
close()
方法是在回调方法callback
中被调用,那么kafka将会输出一条警告日志,而且将其替换为close(0, TimeUnit.MILLISECONDS)
,这样作的目的是为了不发送线程(sender thread)永远阻塞。flush()
函数的后置条件(方法顺利执行完毕以后必须为真的条件)是:待发送缓冲区中全部待发送的记录record都发送完成。发送完成指的是成功收到了在构建producer时设置的acks 配置的确认acks。flush()
调用开始以后发送的消息记录可以真正完成。能够经过设置重试配置retries=<large_number>
来下降消息记录不被送达的状况。事务性的producer
,不须要调用flush()
函数,由于commitTransaction()
函数在提交事务以前,会将缓冲中的记录进行flush,这样能够确保那些在 beginTransaction()
以前被send(ProducerRecord)
的消息记录将在事务提交以前完成。beginTransaction()
方法以前,必须调用一次initTransactions()
方法。initTransactions()
方法的主要做用是:apache
initTransactions()
函数会等待它完成。send(ProducerRecord<K,V> record, Callback callback)
方法将在record被送入到待发送的buffer以后,当即返回。所以,可容许并行发送record,而不用阻塞等待每一个record发送完成。该方法的返回值RecordMetadata
为该record被发送到的分区partition的元数据
,如偏移量offset,建立时间CreateTime等。要想阻塞等待发送完成,能够调用Future的get()
方法,如:segmentfault
producer.send(record).get();
上面基本上对KafkaProducer类的主要接口作了解释,主要参考了官方文档,从上面的一些方法中,能够看到kafka的一些特性,如:发送缓冲区机制,事务性producer等,这些复杂概念将在后续文章中再作深刻探索。安全