前提:本例适合那些没有顺序要求的消息主题。java
kafka经过一系列优化,写入和读取速度可以达到数万条/秒。经过增长分区数量,可以经过部署多个消费者增长并行消费能力。但仍是有不少状况下,某些业务的执行速度实在是太慢,这个时候咱们就要用到多线程去消费,提升应用机器的利用率,而不是一味的给kafka增长压力。 git
ShutdownableThread
,而后实现它的
doWork
方法便可。
参考:github.com/apache/kafk…程序员
即然是使用多线程,咱们就须要新建一个线程池。 github
咱们使用了了零容量的SynchronousQueue
,一进一出,避免队列里缓冲数据,这样在系统异常关闭时,就能排除由于阻塞队列丢消息的可能。 而后使用了CallerRunsPolicy
饱和策略,使得多线程处理不过来的时候,可以阻塞在kafka的消费线程上。redis
而后,咱们将真正处理业务的逻辑放在任务中多线程执行,每次执行完毕,咱们都手工的commit一次ack
,代表这条消息我已经处理了。因为是线程池认领了这些任务,顺序性是没法保证的,可能有些任务没有执行完毕,后面的任务就已经把它的offset给提交了。o.O
apache
不过这暂时不重要,首先让它并行化运行就好。 api
KafkaConsumer is not safe for multi-threaded access
复制代码
显然,kafka的消费端不是线程安全的,它拒绝你这么调用它的api。kafka的初衷是好的,想要避免一些并发环境的问题,但我确实须要使用多线程处理。安全
kafka消费者经过比较调用者的线程id来判断是不是由外部线程发起请求。bash
private void acquire() { 复制代码
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
复制代码
}session
复制代码
得,只能将commitSync
函数放在线程外面了,先提交ack、再执行任务。
咱们获取的消息,可能在真正被执行以前,会进行一些过滤,好比一些空值或者特定条件的判断。虽然能够直接放在消费者线程里运行,但显的特别的乱,能够加入一个生产者消费者模型(你能够认为这是多此一举)。这里采用的是阻塞队列依然是SynchronousQueue
,它充当了管道的功能。
咱们把任务放入管道后,立马commit。若是线程池已经满了,将一直阻塞在消费者线程里,直到有空缺。而后,咱们单独启动了一个线程,用来接收这些数据,而后提交到这部分的代码看起来大概这样。
应用可以启动了,消费速度贼快。
kafka的参数很是的多,咱们比较关心的有如下几个参数。
调用一次poll,返回的最大条数。这个值设置的大,那么处理的就慢,很容易超出max.poll.interval.ms
的值(默认5分钟),形成消费者的离线。在耗时很是大的消费中,是须要特别注意的。
是否开启自动提交(offset)若是开启,consumer已经消费的offset信息将会间歇性的提交到kafka中(持久保存)
当开启offset自动提交时,提交请求的时间频率由参数auto.commit.interval.ms
控制。
若是broker端反馈的数据量不足时(fetch.min.bytes),fetch请求等待的最长时间。若是数据量知足须要,则当即返回。
consumer会话超时时长,若是在此时间内,server还没有接收到consumer任何请求(包括心跳检测),那么server将会断定此consumer离线。此值越大,server等待consumer失效、rebalance时间就越长。
consumer协调器与kafka集群之间,心跳检测的时间间隔。kafka集群经过心跳判断consumer会话的活性,以判断consumer是否在线,若是离线则会把此consumer注册的partition分配(assign)给相同group的其余consumer。此值必须小于“session.timeout.ms”,即会话过时时间应该比心跳检测间隔要大,一般为session.timeout.ms的三分之一,不然心跳检测就失去意义。
在本例中,咱们的参数简单的设置以下,主要调整了每次获取的条数和检测时间。其余的都是默认。
仔细的同窗可能会看到,咱们的代码依然不是彻底安全的。这是因为咱们提早提交了ack致使的。程序正常运行下,这无伤大雅。但在应用异常关闭的时候,那些正在执行中的消息,极可能会丢失,对于一致性要求很是高的应用,咱们要从两个手段上进行保证。
第一种就是考虑kill -15的状况。这种方式比较简单,只要覆盖ShutdownableThread的shutdown方法便可,应用将有机会执行线程池中的任务,确保消费完毕再关闭应用。
@Override
public void shutdown() {
super.shutdown();
executor.shutdown();
}
复制代码
应用oom,或者直接kill -9了,事情就变得麻烦起来。
维护一个单独的日志文件(或者本地db),在commit以前写入一条日志,而后在真正执行完毕以后写入一条对应的日志。当系统启动时,读取这些日志文件,获取没有执行成功的任务,从新执行。
想要效率,还想要可靠,是得下点苦力气的。
这种方式与日志方式相似,但因为redis的效率很高(可达数万),并且方便,是优于日志方式的。
可使用Hash结构,提交任务的同时写入Redis,任务执行完毕删掉这个值,那么剩下的就是出现问题的消息。
多线程是为了增长效率,redis等是为了增长可靠性。业务代码是很是好编写的,搞懂了逻辑就搞定了大部分;业务代码有时候又是困难的,你要编写大量辅助功能增长它的效率、照顾它的边界。
以程序员的角度来讲,最有竞争力的代码都是为了照顾小几率发生的边界异常。
kafka在吞吐量和可靠性方面,有各类的权衡,不少都是鱼和熊掌的关系。没必要纠结于它自己,咱们能够借助外部的工具,获取更大的收益。在这种状况下,redis当机与应用同时当机的几率仍是比较小的。5个9的消息保证是能够作到的,剩下的那点不完美问题消息,你为何不从日志里找呢?
扩展阅读:
三、360度测试:KAFKA会丢数据么?其高可用是否知足需求?