使用多线程增长kafka消费能力

前提:本例适合那些没有顺序要求的消息主题。java

kafka经过一系列优化,写入和读取速度可以达到数万条/秒。经过增长分区数量,可以经过部署多个消费者增长并行消费能力。但仍是有不少状况下,某些业务的执行速度实在是太慢,这个时候咱们就要用到多线程去消费,提升应用机器的利用率,而不是一味的给kafka增长压力。 git

使用Spring建立一个kafka消费者是很是简单的。咱们选择的方式是继承kafka的 ShutdownableThread,而后实现它的 doWork方法便可。


参考:github.com/apache/kafk…程序员

多线程消费某个分区的数据

即然是使用多线程,咱们就须要新建一个线程池。 github

咱们建立了一个最大容量为20的线程池,其中有两个参数须要注意一下。(参考 《JAVA多线程使用场景和注意事项简版》)。

咱们使用了了零容量的SynchronousQueue,一进一出,避免队列里缓冲数据,这样在系统异常关闭时,就能排除由于阻塞队列丢消息的可能。 而后使用了CallerRunsPolicy饱和策略,使得多线程处理不过来的时候,可以阻塞在kafka的消费线程上。redis

而后,咱们将真正处理业务的逻辑放在任务中多线程执行,每次执行完毕,咱们都手工的commit一次ack,代表这条消息我已经处理了。因为是线程池认领了这些任务,顺序性是没法保证的,可能有些任务没有执行完毕,后面的任务就已经把它的offset给提交了。o.Oapache

不过这暂时不重要,首先让它并行化运行就好。 api

惋惜的是,当咱们运行程序,直接抛出了异常,没法进行下去。
程序直接说了:

KafkaConsumer is not safe for multi-threaded access
复制代码

显然,kafka的消费端不是线程安全的,它拒绝你这么调用它的api。kafka的初衷是好的,想要避免一些并发环境的问题,但我确实须要使用多线程处理。安全

kafka消费者经过比较调用者的线程id来判断是不是由外部线程发起请求。bash

private void acquire() {
复制代码
long threadId = Thread.currentThread().getId();
    if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
        throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
    refcount.incrementAndGet();
复制代码

}session

复制代码

得,只能将commitSync函数放在线程外面了,先提交ack、再执行任务。

加入管道

咱们获取的消息,可能在真正被执行以前,会进行一些过滤,好比一些空值或者特定条件的判断。虽然能够直接放在消费者线程里运行,但显的特别的乱,能够加入一个生产者消费者模型(你能够认为这是多此一举)。这里采用的是阻塞队列依然是SynchronousQueue,它充当了管道的功能。

咱们把任务放入管道后,立马commit。若是线程池已经满了,将一直阻塞在消费者线程里,直到有空缺。而后,咱们单独启动了一个线程,用来接收这些数据,而后提交到这部分的代码看起来大概这样。

应用可以启动了,消费速度贼快。

参数配置

kafka的参数很是的多,咱们比较关心的有如下几个参数。

max.poll.records

调用一次poll,返回的最大条数。这个值设置的大,那么处理的就慢,很容易超出max.poll.interval.ms的值(默认5分钟),形成消费者的离线。在耗时很是大的消费中,是须要特别注意的。

enable.auto.commit

是否开启自动提交(offset)若是开启,consumer已经消费的offset信息将会间歇性的提交到kafka中(持久保存)

当开启offset自动提交时,提交请求的时间频率由参数auto.commit.interval.ms控制。

fetch.max.wait.ms

若是broker端反馈的数据量不足时(fetch.min.bytes),fetch请求等待的最长时间。若是数据量知足须要,则当即返回。

session.timeout.ms

consumer会话超时时长,若是在此时间内,server还没有接收到consumer任何请求(包括心跳检测),那么server将会断定此consumer离线。此值越大,server等待consumer失效、rebalance时间就越长。

heartbeat.interval.ms

consumer协调器与kafka集群之间,心跳检测的时间间隔。kafka集群经过心跳判断consumer会话的活性,以判断consumer是否在线,若是离线则会把此consumer注册的partition分配(assign)给相同group的其余consumer。此值必须小于“session.timeout.ms”,即会话过时时间应该比心跳检测间隔要大,一般为session.timeout.ms的三分之一,不然心跳检测就失去意义。


在本例中,咱们的参数简单的设置以下,主要调整了每次获取的条数和检测时间。其余的都是默认。

消息保证

仔细的同窗可能会看到,咱们的代码依然不是彻底安全的。这是因为咱们提早提交了ack致使的。程序正常运行下,这无伤大雅。但在应用异常关闭的时候,那些正在执行中的消息,极可能会丢失,对于一致性要求很是高的应用,咱们要从两个手段上进行保证。

使用关闭钩子

第一种就是考虑kill -15的状况。这种方式比较简单,只要覆盖ShutdownableThread的shutdown方法便可,应用将有机会执行线程池中的任务,确保消费完毕再关闭应用。

@Override
    public void shutdown() {
        super.shutdown();
        executor.shutdown();
}
复制代码

使用日志处理

应用oom,或者直接kill -9了,事情就变得麻烦起来。

维护一个单独的日志文件(或者本地db),在commit以前写入一条日志,而后在真正执行完毕以后写入一条对应的日志。当系统启动时,读取这些日志文件,获取没有执行成功的任务,从新执行。

想要效率,还想要可靠,是得下点苦力气的。

借助redis处理

这种方式与日志方式相似,但因为redis的效率很高(可达数万),并且方便,是优于日志方式的。

可使用Hash结构,提交任务的同时写入Redis,任务执行完毕删掉这个值,那么剩下的就是出现问题的消息。

在系统启动时,首先检测一下redis中是否有异常数据。若是有,首先处理这些数据,而后正常消费。

End

多线程是为了增长效率,redis等是为了增长可靠性。业务代码是很是好编写的,搞懂了逻辑就搞定了大部分;业务代码有时候又是困难的,你要编写大量辅助功能增长它的效率、照顾它的边界。

以程序员的角度来讲,最有竞争力的代码都是为了照顾小几率发生的边界异常。

kafka在吞吐量和可靠性方面,有各类的权衡,不少都是鱼和熊掌的关系。没必要纠结于它自己,咱们能够借助外部的工具,获取更大的收益。在这种状况下,redis当机与应用同时当机的几率仍是比较小的。5个9的消息保证是能够作到的,剩下的那点不完美问题消息,你为何不从日志里找呢?


扩展阅读:

一、JAVA多线程使用场景和注意事项简版

二、Kafka基础知识索引

三、360度测试:KAFKA会丢数据么?其高可用是否知足需求?

相关文章
相关标签/搜索