Kafka produce flush 引发的性能分析

调用kafka producer发送数据时,发现延迟级别在10-200ms不等,与正常的kafka写入速度不匹配,因而开始找问题~html

 

一.场景:

一批数据,须要遍历每一个数据并发送数据细节的信息到kafka,下面是我原始代码,每一个人发送后执行一次flush操做。java

val results = Array[DataObject](...)
results.foreach(data => {
    val info = new ProducerRecord[String, String](topic, message)
    producer.send(info)
})
kafka.flush()

服务器执行延迟在10-200ms不等apache

 

二.可能缘由分析:

1.send 函数形成阻塞

public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, (Callback)null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return this.doSend(interceptedRecord, callback);
    }

查看源码的send逻辑,一种有回调函数,一种没有回调函数,因此这里send是异步执行,不会形成堵塞,排除缓存

2.flush 函数形成阻塞

public void flush() {
        log.trace("Flushing accumulated records in producer.");
        this.accumulator.beginFlush();
        this.sender.wakeup();

        try {
            this.accumulator.awaitFlushCompletion();
        } catch (InterruptedException var2) {
            throw new InterruptException("Flush interrupted.", var2);
        }
    }

flush 这里accumulator会调用await相关方法,查看官方API的解释是:bash

flush()
Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.

调用此方法可以使全部缓冲记录当即可用于发送(即便linger.ms大于0)并在与这些记录关联的请求完成时发生阻塞。 ok,找到问题服务器

 

三.Flush 原理

基于flush引发的延迟,首先看一下kafka生产的过程并发

Step1:异步调用send发送日志,根据Properties的配置对kv进行序列化异步

Step2::根据k hash 获得分区信息,追加到对应topic下的partition,这里先会写入到本地缓存区函数

Step3: 本地缓存写入后,有独立的线程传送向producer发送ACKthis

1.分析:

flush 是将第二步写到缓存区的数据强制推送发送,正常状况下清空缓存区操做经过参数配置实现:

batch.size 离线缓存达到该size时执行一次flush

linger.ms 达到该时间间隔时,执行一次flush

调用flush时,会清空缓存区内存,调用 awaitFlushCompletion 时须要等待缓存区清空,这里会形成线程的堵塞

public void awaitFlushCompletion() throws InterruptedException {
        try {
            Iterator i$ = this.incomplete.all().iterator();

            while(i$.hasNext()) {
                RecordBatch batch = (RecordBatch)i$.next();
                batch.produceFuture.await();
            }
        } finally {
            this.flushesInProgress.decrementAndGet();
        }

    }

awaitFlushCompletion 将当前缓存区数据构造迭代器循环发送,并在finally阶段调整offset。

这里我设置发送延迟时间为1000ms

个人实际发送时间在1000ms之内,因此每次发送调用 flush 都会形成延迟,至关于手动调用频繁的刷新缓存区,增长的IO等待的时间,违背了批处理减小IO的规则,因此形成kafka写入时长增长,这里取消flush,经过参数控制 producer 生产解决问题。

第一次时间长是由于初始化kafka服务端,和最一开始添加 flush 相比,时间消耗基本能够忽略。

相关文章
相关标签/搜索