Kafka基础——生产者

kafka生产者发送流程:java

kafka是经过异步的方式进行的消息发送流程,为何是异步的?数组

主线程->构建ProducerRecord对象,这个对象声明了主题Topic、分区Partition、键 Key以及 值 Value,主题和值是必需要声明的,分区和键能够不用指定。
主线程->调用send发送进行消息发送。(由于消息要在网络上传输,因此必须进行序列化)
主线程->序列化器:将key和value序列化成字节数组。
主线程->分区器,根据参数进行分区选择,参数里有分区则指定分区,无分区有key值则经过key计算出分区,既没有分区也没有key值,则经过轮询方式进行指定分区。
主线程->,将消息放入一个RecordAccumulator(消息收集器,也能够理解为主线程与Sender线程 直接的缓冲区)中暂存.
异步线程->Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从 RecordAccumulator中取出消息并批量发送出去
Broker成功接收到消息,表示发送成功,返回消息的元数据(主题、分区、偏移量)。发送失败能够选择重试或跑出异常。

须要注意的是,KafkaProducer是线程安全的,多个 线程间能够共享使用同一个KafkaProducer对象安全

发送类型:网络

一、发送即忘记:经过Producer对象调用send方法,发送完成后对响应结果不作任何处理。异步

二、同步发送:经过Producer对象调用send方法返回一个Future对象,而后调用Future对象的get方法等待kafka的响应,若是kafka正常响应,返回一个RecordMetadata对象,该对象为消息的元数据对象(主题、分区、偏移量);若是kafka发生错误,没法正常响应,就会抛出异常,咱们即可以进行异常处理 (try/catch包围)。线程

三、异步发送:code

producer.send(record, new Callback() { 
 public void  onCompletion(RecordMetadata metadata, Exception exception) {     
 	if (exception == null) { 
	System.out.println(metadata.partition() + ":" + metadata.offset());     
	}   
} })
相关文章
相关标签/搜索