kafka生产者发送流程:java
kafka是经过异步的方式进行的消息发送流程,为何是异步的?数组
主线程->构建ProducerRecord对象,这个对象声明了主题Topic、分区Partition、键 Key以及 值 Value,主题和值是必需要声明的,分区和键能够不用指定。 主线程->调用send发送进行消息发送。(由于消息要在网络上传输,因此必须进行序列化) 主线程->序列化器:将key和value序列化成字节数组。 主线程->分区器,根据参数进行分区选择,参数里有分区则指定分区,无分区有key值则经过key计算出分区,既没有分区也没有key值,则经过轮询方式进行指定分区。 主线程->,将消息放入一个RecordAccumulator(消息收集器,也能够理解为主线程与Sender线程 直接的缓冲区)中暂存. 异步线程->Sender线程负责将消息信息构成请求,并最终执行网络I/O的线程,它从 RecordAccumulator中取出消息并批量发送出去 Broker成功接收到消息,表示发送成功,返回消息的元数据(主题、分区、偏移量)。发送失败能够选择重试或跑出异常。
须要注意的是,KafkaProducer是线程安全的,多个 线程间能够共享使用同一个KafkaProducer对象安全
发送类型:网络
一、发送即忘记:经过Producer对象调用send方法,发送完成后对响应结果不作任何处理。异步
二、同步发送:经过Producer对象调用send方法返回一个Future对象,而后调用Future对象的get方法等待kafka的响应,若是kafka正常响应,返回一个RecordMetadata对象,该对象为消息的元数据对象(主题、分区、偏移量);若是kafka发生错误,没法正常响应,就会抛出异常,咱们即可以进行异常处理 (try/catch包围)。线程
三、异步发送:code
producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.out.println(metadata.partition() + ":" + metadata.offset()); } } })