Kafka分布式查询引擎

1.概述

Kafka是一个分布式消息中间件系统,里面存储着实际场景中的数据。Kafka原生是不支持点查询的,若是咱们想对存储在Topic中的数据进行查询,可能须要对Topic中的数据进行消费落地,而后构建索引(或者数据落地到自带因此的存储系统中,例如HBase、Hive等)。今天,笔者就为你们来介绍如何实现Kafka分布式查询引擎。html

2.内容

对于点查询,咱们能够总结为两个要点。其一,有数据供咱们查询;其二,对待查询的数据构建索引。在Kafka中,Topic存储数据,知足了第一点,虽然Kafka有索引的概念,可是它的索引是基于Offset的稀疏索引,并非对每条Message都会构建一个索引。而且,这个Offset索引对于实际状况查询场景来讲,也帮助不大。好比,你查询Topic01下的Partition_0,可是,也仅仅只是查下到某个Topic中分区下的Offset对应的一条记录,可是这条记录是啥,你并不知道。真实查询的状况,多是你须要查询某个ID,或者模糊查询某个Name是否存在。服务器

2.1 索引

其实有一种方式,是可行的。就是对Kafka源代码进行改造,在Broker落地每条数据的时候,构建一条索引(其实,这种方式与在原始的Kafka外面加一层Proxy相似,由Proxy充当与Client交互的角色,接收Client的数据存储并构建索引)。这样的实现方式以下图:多线程

 

若是对Kafka源代码熟悉,有能力改造其源代码,能够在Kafka中添加对每条数据构建索引的逻辑。若是,以为怕对Kafka的性能有影响,或者改造有难度。上述流程图的方式,也能够实现这种点查询。分布式

改造Kafka源代码添加索引,或者是Proxy的方式存储数据并构建索引,这种两种方式来讲,数据上都会要冗余一倍左右的的存储容量。oop

2.2 单节点查询

基于上述的问题,咱们对这种方案进行升级改造一下。由于不少状况下,生产环境的数据已是运行了很长时间了,加Proxy或者改造Kafka源代码的方式适合构建一个Kafka的新集群的时候使用。对于已有的Kafka集群,若是咱们要查询Topic中的数据,如何实现呢。性能

Kafka-Eagle中,我对Topic数据查询实现了基于SQL查询的实现方案。逻辑是这样的,编写SQL查询语句,对SQL进行解析,映射出一个Topic的Schema以及过滤条件,而后根据过滤条件消费Topic对应的数据,最后拿到数据集,经过SQL呈现出最后的结果。流程图以下:学习

 

可是,这样是由局限性的。因为,单节点的计算能力有限,因此对每一个Partition默认查询5000条数据,这个记录是能够增长或者减小的。若是在配置文件中对这个属性增大,好比设置为了50000条,那么对应的ke.sh脚本中的内存也须要增长,由于每次查询须要的内存增长了。否则,频繁若干用户同时查询,容易形成OOM的状况。大数据

可是,一般一个Topic中存储的数据通常达到上亿条数据以上,这种方式要从上亿条或者更多的数据中查询咱们想要的数据,可能就知足不了了。优化

2.3 分布式查询

基于这种状况,咱们能够对这中单节点查询的方式进行升级改造,将它变为分布式查询。其实,仔细来看,单节点查询的方式,就是一个分布式查询的缩版。那咱们须要实现这样一个分布式查询的Kafka SQL引擎呢?spa

首先,咱们能够借助Hadoop的MapReduce思想,“化繁为简,分而治之”。咱们将一个Topic当作一个比较大的数据集,每次咱们须要对这个数据集进行查询,能够将待查询的数据进行拆分若干份Segment,而后,充分利用服务器的CPU,进行多线程消费(这样就能够打破Kafka中一个线程只能消费一个分区的局限性)。实现流程图以下:

 

上图可知,由客户端发起请求,提交请求到Master节点,而后Master节点解析客户端的请求,并生成待执行策略。好比上述有三个工做节点,按照客户端的状况,Master会将生成的执行策略下发给三个工做节点,让其进行计算。

这里以其中一个工做节点为例子,好比WorkNode1接收到了Master下发的计算任务,接收到执行指令后,结合工做节点自身的资源状况(好比CPU和内存,这里CPU较为重要),将任务进行拆解为若干个子任务(子任务的个数取决于每一个批次的BatchSize,能够在属性中进行配置),而后让生成好的若干个子任务并行计算,获得若干个子结果,而后将若干个子结果汇总为一个最终结果做为当前工做节点的最终计算结果,最后将不一样的工做节点的结果进行最后的Merge做为本次查询的结果返回给Master节点(这里须要注意的是,多个工做节点汇总在同一个JobID下)。而后,Master节点收到工做节点返回的结果后,返回给客户端。

3.结果预览

查询10条Topic中的数据,工做节点执行以下:

select * from ke1115 where `partition` in (0) limit 10

 

 上图显示了,同一WorkNode节点下,同一JobID中,不一样线程子任务的计算进度日志。

 

KSqlStrategy显示了Master节点下发的待执行策略,msg表示各个工做节点返回的最终结果。

4.待优化

目前Kafka分布式查询引擎基础功能已实现能够用,任务托管、子任务查询内存优化等还有优化的空间,计划正在考虑集成到KafkaEagle系统中。 

5.结束语

这篇博客就和你们分享到这里,若是你们在研究学习的过程中有什么问题,能够加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同窗, 能够在公告栏那里点击购买连接购买博主的书进行学习,在此感谢你们的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。

相关文章
相关标签/搜索