首先推荐一个关于Kafka的中文网站:http://orchome.com/kafka/indexhtml
部分翻译直接参考此网站内容,可是网站目前的API版本为0.10.0.1,因此在学习过程当中,自行翻译了一下0.11.0的API文档。在翻译过程当中有些地方也不是太理解,感受翻译的不太准确,有问题的地方望读者指出。 java
public class KafkaProducer<K,V> extends Object implements Producer<K,V>
Kafka客户端发布消息至kafka集群。apache
生产者是线程安全的,在线程之间共享单个生产者实例一般比持有多个实例更快。bootstrap
下面是一个简单的例子,它使用生产者将包含有序数字的字符串消息做为键/值对发送。api
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
生产者的缓冲空间池保留还没有发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。若是使用后不关闭生产者,则会泄露这些资源。
send()方法是异步的,添加消息到缓冲区等待发送,并当即返回。这容许生产者将单个的消息批量在一块儿发送来提升效率。缓存
ack是判别请求是否为完整的条件。咱们指定了“all”将会阻塞消息,这种设置使性能最低,可是是最可靠的。安全
retries,若是请求失败,生产者会自动重试,咱们指定是0次即不启动重试,若是启用重试,则会有重复消息的可能性。服务器
batch.size ,producer为每一个分区未发送的消息保持一个缓冲区。缓存的大小是经过 batch.size 配置指定的。值较大的话将会产生更大的批处理。并须要更多的内存(由于一般咱们会为每一个“活跃”的分区都设置1个缓冲区)。oracle
linger.ms默认状况,即使缓冲空间尚未满,缓冲也可当即发送,可是,若是想减小请求的数量,能够设置linger.ms大于0。这将指示生产者发送请求以前等待一段时间,但愿更多的消息填补到未满的批中。这相似于TCP的算法,例如上面的代码段,可能100条消息在一个请求发送,由于咱们设置了linger(逗留)时间为1毫秒,而后,若是咱们没有填满缓冲区,这个设置将增长1毫秒的延迟请求以等待更多的消息。须要注意的是,在高负载下,即便是 linger.ms=0,相近的时间通常也会组成批。在不处于高负载的状况下,若是设置比0大,将以少许的延迟代价换取更少的,更有效的请求。
buffer.memory 控制生产者可用的缓存总量,若是消息发送速度比它们可以传输到服务器的速度快,将会耗尽这个缓存空间。当缓存空间耗尽,其余发送调用将被阻塞,阻塞时间的阈值经过max.block.ms设定,以后它将抛出一个TimeoutException。
key.serializer和value.serializer示例,将用户提供的key和value对象ProducerRecord转换成字节,你可使用附带的ByteArraySerializaer或StringSerializer处理简单的string或byte类型。
从kafka 0.11开始,KafkaProducer 支持两种额外的模式:idempotent producer和transactional producer。
idempotent producer(幂等性生产者)加强Kafka的投递语义,从至少一次投递变为彻底的一次投递,特别是消息重试将再也不重复提出。
transactional producer(事务性生产者)容许应用程序将消息发送给多个原子分区(和topics)。
使用idempotent,enable.idempotence配置项必须设置为true,若是设置为true,重试配置项(retries )将默认设置为 Integer.MAX_VALUE,max.inflight.requests.per.connection 将默认设置为 1,asks config(确认配置)将默认设置为all 。idempotent producer的API并无改变,因此现有的应用程序应用此特性时不须要作修改。
为了充分利用idempotent producer,必须避免应用级别的重试发送,由于这样不能de-duplicated(去耦合/去重复:此处不是太理解,不知道如何翻译)。所以,若是应用程序启用了idempotence,建议取消retries (重试)配置,由于它将被默认为Integer.MAX_VALUE。此外,若是send(ProducerRecord)即便有无限重试仍是返回了一个错误(例如,若是消息在发送以前在缓冲区中过时),则建议关闭producer 并检查最后生成的消息的内容,以确保它不是重复的。最后,producer 只能保证在单个会话中发送消息的idempotent 特性。
使用transactional producer和与它相关的API,则必须设置transactional.id配置属性,若是transactional.id被设置,idempotence 会随着其所依赖的producer的配置被自动启用,此外,transactions 中包含的topics应该配置为持久性的。特别是,replication.factor(复制因子)至少应该为3,这些topics的 min.insync.replicas应该设为2。最后,为了保证transactional 端到端的实现,consumers必须配置为只读取提交的信息。
transactional.id的目的是在单个生产者实例的多个会话中启用事务恢复(transaction recovery )。它一般由分区、状态和应用程序中的shard标识符派生而来。所以,对于在分区应用程序中运行的每一个生产者实例来讲,它应该是唯一的。
全部新的transactional api都是阻塞的,而且在故障时抛出异常。下面的示例演示了如何使用新的api。它与上面的示例相似,只是全部100个消息都是单个事务的一部分。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("transactional.id", "my-transactional-id"); Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // We can't recover from these exceptions, so our only option is to close the producer and exit. producer.close(); } catch (KafkaException e) { // For all other exceptions, just abort the transaction and try again. producer.abortTransaction(); } producer.close();
正如在示例中所暗示的那样,每一个生产者只能有一个打开的事务。beginTransaction()和commitTransaction()之间发送的全部消息都将是单个事务的一部分。当transactional.id被指定,producer发送的全部消息必须是事务的一部分。
transactional producer使用异常来传达错误状态。具体地说,不须要为producer . send()或调用. get()指定回调函数:若是任何一个producer.send()或事务性调用在事务中遇到不可恢复的错误,则将抛出KafkaException。查看send(ProducerRecord)文档,了解从事务发送中探知错误的更多细节。
在接收一个KafkaException时,经过调用producer.abortTransaction()咱们能够确保任何成功的写操做标记为失败(停止),所以保持事务保证。
这个客户端能够与0.10.0版本或更新的brokers进行通讯。旧的或新的brokers可能不支持某些客户端特性。例如,事务api须要broker 0.11.0版本或更高。当调用的API在运行broker版本中不可用,你将收到一个UnsupportedVersionException。
public KafkaProducer(Map<String,Object> configs)
producer经过提供一组键值对做为配置来实例化。有效配置字符串都记录在这里。值能够是字符串或适当类型的对象(例如,数字配置能够接受字符串“42”或整数42)。
public KafkaProducer(Map<String,Object> configs,Serializer<K> keySerializer,Serializer<V> valueSerializer)
producer经过提供一组键值对做为配置、一个键和一个值序列化器来实例化。有效配置字符串都记录在这里。值能够是字符串或适当类型的对象(例如,数字配置能够接受字符串“42”或整数42)。
public KafkaProducer(Properties properties) public KafkaProducer(Properties properties,Serializer<K> keySerializer,Serializer<V> valueSerializer)
同上。
public void initTransactions()
当在配置中设置了transactional.id,该方法须要在任何其余方法以前被调用,该方法执行如下步骤:一、确保由producer先前实例发起的任何事物都已经完成。若是前一个实例的事务在进程中失败,它将被终止。若是最后一个事务已经开始完成,但尚未完成,该方法将等待它完成。二、获取producer的内部ID和epoch(纪元?此处不清楚准确的翻译),用于 producer.发布的全部将来的事务消息。
Specified by:
initTransactions in interface Producer<K,V>
Throws:
IllegalStateException - 若是配置中没有设置producer的transactional.id则抛出此异常。
public void beginTransaction()
须要在每一个新事务开始以前调用。
Specified by:
beginTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 若是另外一个活跃的producer有相同的transactional.id则抛出此异常。
public void sendOffsetsToTransaction(Map<TopicPartition,OffsetAndMetadata> offsets,String consumerGroupId)
向consumer组协调器发送被消耗的偏移量列表,并将这些偏移量标记为当前事务的一部分。只有当事务成功提交时,这些偏移量才会被视为消费掉的。当您须要将消费和生成的消息一块儿批量处理时,应该使用此方法,一般在consume-transform-produce 模式中使用。
Specified by:
sendOffsetsToTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 若是另外一个活跃的producer有相同的transactional.id则抛出此异常。
public void commitTransaction()
提交正在进行中的事务。此方法将在实际提交事务以前flush任何未发送的消息。此外,如何事务包含部分的任何send(ProducerRecord) 调用触发不可恢复的错误,那么该方法将当即抛出最后一个接收到的异常,而事务将不会被提交。所以,在事务中对send(ProducerRecord)的调用必须成功,以便此方法成功。
Specified by:
commitTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 若是另外一个活跃的producer有相同的transactional.id则抛出此异常。
public void abortTransaction()
停止正在进行中的事务。当此调用完成时,任何未flush的生成消息将停止。若是任何先前的send(ProducerRecord)调用有ProducerFencedException或ProducerFencedException 致使的调用失败,此调用将当即抛出异常。
Specified by:
abortTransaction in interface Producer<K,V>
Throws:
ProducerFencedException - 若是另外一个活跃的producer有相同的transactional.id则抛出此异常。
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
异步发送消息到topic,并在发送的消息被确认的时候调用所提供的回调。
send是异步的,而且一旦消息被保存在等待发送的消息缓冲区中,此方法就当即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。
发送的结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳。若是topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间(若是用户没有指定消息的时间戳时使用发送的时间)若是topic使用的是LogAppendTime,则追加消息时,时间戳是broker的本地时间。
由于send 调用是异步的,它将为分配给消息的RecordMetadata返回一个Future。若是future调用get(),则将阻塞,直到相关请求完成并返回该消息的metadata,或抛出发送异常。
若是你想模拟一个简单的阻塞调用,您能够当即调用get()方法:
byte[] key = "key".getBytes(); byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value); producer.send(record).get();
彻底非阻塞的使用能够利用回调参数提供一个回调,当请求完成时将被调用。
producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) e.printStackTrace(); System.out.println("The offset of the record we just sent is: " + metadata.offset()); } });
发送到同一个分区的消息回调保证按必定的顺序执行,也就是说,在下面的例子中 callback1 保证执行 callback2 以前:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1); producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
当send做为事务的一部分使用时,不须要定义回调或者检查future的结果来检查send的错误。若是任何send由于一个不可恢复的错误而调用失败,则最终的commitTransaction() 调用将失败,而且在最后的发送失败时抛出异常。当发生这种状况的时候,应用程序应该调用abortTransaction()来重置状态并继续发送数据。
有些事务发送错误没法经过调用abortTransaction()来解决。特别是,若是一个事务发送完成时伴随着一个ProducerFencedException,OutOfOrderSequenceException,
UnsupportedVersionException,或一个AuthorizationException等异常,那么惟一的选择就是调用close()。重大的错误致使生产者进入无效状态,在这种状态下,future的API调用将一样的底层错误包装在新的KafkaException中抛出。
当启用idempotence(幂等性),但没有配置transactional.id 的时候,是一个相似的场景。在这种状况下,UnsupportedVersionException和AuthorizationException被视为重大错误。可是,ProducerFencedException不须要被处理。此外,它有可能在收到一个OutOfOrderSequenceException以后继续发送信息,可是这样作可能致使等待中的消息的无序投递,为了确保正确的顺序,你应该关闭生产者并建立一个新的实例。
若是目标topic的消息格式不升级到0.11.0.0,idempotent(幂等性)和transactional(事务性)的生产请求将失败并伴随一个UnsupportedForMessageFormatException的错误。若是在事务中遇到这种状况,则能够停止并继续执行。可是须要注意的是,以后发送到同一topic将会继续收到相同的异常,直到topic被更新。
注意:callback通常在生产者的I/O线程中执行,因此是至关的快的,不然将延迟其余的线程的消息发送。若是你想要执行阻塞或计算代价高昂的回调,建议在callback主体中使用本身的Executor来并行处理。
Specified by:
send in interface Producer<K,V>
Parameters:
record - 发送的消息
callback - 当消息被服务器确认的时候执行用户提供的回调 (null 表示没有回调)
Throws:
IllegalStateException -若是配置了transactional.id 但没有事务启动。
InterruptException - 若是线程在阻塞时被中断
SerializationException - 若是序列化配置了无效的键值对象
TimeoutException - 若是获取metadata或为消息分配内存所消耗的时间超过了 max.block.ms设定的值。
KafkaException -若是出现了不属于公共API异常的Kafka相关的错误。
public void flush()
调用此方法可使全部的缓冲消息当即能够发送(即便linger.ms配置的值大于0)而且阻塞与这些信息相关联的请求的完成。flush()后置条件是任何先前发送的记录已经完成(举例来讲就是,Future.isDone() == true)。一个请求根据你指定的确认配置被成功确认以后则被认为是完成的,不然会致使错误。
当一个线程被阻塞等待一个flush()调用完成时,其它线程能够继续发送消息,可是不能保证关于flush调用开始以后发送的消息的完成。
这个方法能够用于从一些输入系统消费消息并生产至kafka中。flush()调用提供了一种方便的方法来确保全部之前发送的消息实际上已经完成。
这个示例展现了如何从一个Kafka topic中消费,并生成至另外一个Kafka topic:
for(ConsumerRecord<String, String> record: consumer.poll(100)) producer.send(new ProducerRecord("my-topic", record.key(), record.value()); producer.flush(); consumer.commit();
须要注意的是,上述示例可能在生产(produce)请求失败的时候删除消息。若是要确保这种状况不会发生,须要在配置中设置retries=<large_number>。
应用程序不须要为事务性生产者调用此方法,由于commitTransaction()将在执行提交以前flush全部缓冲消息。这将确保在提交以前先前的beginTransaction()以后的全部send(ProducerRecord)调用都已经完成。
Specified by:
flush in interface Producer<K,V>
Throws:
InterruptException - 若是线程在阻塞时被中断
public List<PartitionInfo> partitionsFor(String topic)
获取给定topic的分区metadata ,这能够用于自定义分区。
Specified by:
partitionsFor in interface Producer<K,V>
Throws:
InterruptException - 若是线程在阻塞时被中断
public Map<MetricName,? extends Metric> metrics()
得到生产者维护的完整的内部度量集。
Specified by:
metrics in interface Producer<K,V>
public void close()
关闭这个生产者,此方法阻塞直到全部之前的发送请求完成。该方法等效于close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)。
若是从回调中调用close(),日志将记录一条警告消息并以close(0, TimeUnit.MILLISECONDS)调用替代。咱们这样作是由于发送方线程将尝试链接本身并永远阻塞。
Specified by:
close in interface Closeable
Specified by:
close in interface AutoCloseable
Specified by:
close in interface Producer<K,V>
Throws:
InterruptException - 若是线程在阻塞时被中断
public void close(long timeout,TimeUnit timeUnit)
此方法等待生产者完成全部未完成请求的发送直到超时。若是超时以前生产者不能完成全部请求,此方法将马上丢弃任何的未发送和未确认的消息。
若是从一个Callback中调用此方法,此方法将不会阻塞,等同于close(0, TimeUnit.MILLISECONDS).这样作是由于在阻塞生产者的I/O线程时不会发生进一步的发送。
Specified by:
close in interface Producer<K,V>
Parameters:
timeout - 等待生产者完成任何正要发生的请求的最大时间。这个值应该是非负的。指定超时为0意味着不等待正要发生的请求的完成。
timeUnit - 超时时间的单位
Throws:
InterruptException - 若是线程在阻塞时被中断
IllegalArgumentException - 若是超时时间的值是负的