kafka的生产者能够选择使用异步方式发送数据,所谓异步方式,就是咱们调用 send()
方法,并指定一个回调函数, 服务器在返回响应时调用该函数。程序员
kafka在客户端里暴露了两个send
方法,咱们能够本身选择同步或者异步模式。咱们来看一个kafka的生产者发送示例,有个直观的感觉。这个示例是一个同步的模式。缓存
ProducerRecord<String, String> record = new ProducerRecord<>(“Kafka”, “Kafka_Products”, “测试”);//Topic Key Value try{ Future future = producer.send(record); future.get();//获取执行结果 } catch(Exception e) { e.printStackTrace(); }
咱们从源码层面来继续看下。服务器
首先kafka定义了一个接口,异步
而后KafkaProducer
实现了这两个方法,咱们看下异步方法的实现逻辑。函数
能够看到最终是调用doSend
方法,调用的时候传入一个回调。这个回调就是监听方法的执行结果的。学习
不少人会认为,既然是异步模式,无论结果是成功仍是失败,确定方法调用会立刻返回的。那我只能告诉你,很差意思,不必定是这样。我本身就曾经踩过这个坑。测试
咱们当时有个业务流程须要在执行完成后发送kakfa消息给某个业务方,为了尽可能减小影响我这个主流程的执行时间,采用了异步方式发送kafka消息。在使用中,由于配错了kafka的TOPIC信息,发现流程阻塞发送消息这里长达6秒(kafka默认的发送超时时间)。spa
究竟为啥异步方式还会阻塞呢?咱们继续看源码。线程
无论是同步模式仍是异步模式,最终都会调用到doSend
方法,注意看上图中的waitOnMetadata
方法,我上面说的阻塞的状况就是阻塞在这个方法里。那咱们继续看这个方法。3d
经过代码中的注释咱们大概能了解这个方法的功能,不过我这里仍是要解释下。(防止有人看不懂英文,哈哈)
waitOnMetadata
获取当前的集群元数据信息,若是缓存有,而且分区没有超过指定分区范围则缓存返回,不然触发更新,等待新的metadata。这个等待的操做在下面这行代码:
metadata.awaitUpdate(version, remainingWaitMs);
而后就继续跟喽,
这个方法很好理解,就是一直在等一个条件,这个条件达到了就返回,不然一直等待超时退出。而这个条件就是当前的版本号要大于上个版本号。
那么谁来更新版本号呢?就是咱们前面提到的sender
线程。当咱们的topic配置错误的时候致使metadata一直没法更新,而后一直等到超时。
破案了!
kafka的异步模式可让咱们在业务场景中发送消息时即刻返回,没必要等待发送的结果。可是当metadata取不到时,发送的过程仍是须要等待一直超时的。
程序员是一个尤为须要不断学习的工种,平时养成阅读源码的习惯,不光能避免踩一些坑,还能在遇到问题是快递定位到问题的根源。