大多数状况下,咱们使用 Kafka 只是做为消息处理。在有些状况下,咱们须要屡次读取 Kafka 集群中的数据。固然,咱们能够经过调用 Kafka 的 API 来完成,可是针对不一样的业务需求,咱们须要去编写不一样的接口,在通过编译,打包,发布等一系列流程。最后才能看到咱们预想的结果。那么,咱们能不能有一种简便的方式去实现这一部分功能,经过编写 SQL 的方式,来可视化咱们的结果。今天,笔者给你们分享一些心得,经过使用 SQL 的形式来完成这些需求。前端
实现这些功能,其架构和思路并不复杂。这里笔者将整个实现流程,经过一个原理图来呈现。以下图所示:json
这里笔者给你们详述一下上图的含义,消息数据源存放与 Kafka 集群当中,开启低阶和高阶两个消费线程,将消费的结果以 RPC 的方式共享出去(即:请求者)。数据共享出去后,回流经到 SQL 引擎处,将内存中的数据翻译成 SQL Tree,这里使用到了 Apache 的 Calcite 项目来承担这一部分工做。而后,咱们经过 Thrift 协议来响应 Web Console 的 SQL 请求,最后将结果返回给前端,让其以图表的实行可视化。架构
这里,咱们须要遵循 Calcite 的 JSON Models,好比,针对 Kafka 集群,咱们须要配置一下内容:学习
{ version: '1.0', defaultSchema: 'kafka', schemas: [ { name: 'kafka', type: 'custom', factory: 'cn.smartloli.kafka.visual.engine.KafkaMemorySchemaFactory', operand: { database: 'kafka_db' } } ] }
另外,这里最好对表也作一个表述,配置内容以下所示:fetch
[ { "table":"Kafka", "schemas":{ "_plat":"varchar", "_uid":"varchar", "_tm":"varchar", "ip":"varchar", "country":"varchar", "city":"varchar", "location":"jsonarray" } } ]
下面,笔者给你们演示经过 SQL 来操做相关内容。相关截图以下所示:ui
在查询处,填写相关 SQL 查询语句。点击 Table 按钮,获得以下所示结果:spa
咱们,能够将获取的结果以报表的形式进行导出。插件
固然,咱们能够在 Profile 模块下,浏览查询历史记录和当前正在运行的查询任务。至于其余模块,都属于辅助功能(展现集群信息,Topic 的 Partition 信息等)这里就很少赘述了。线程
分析下来,总体架构和实现的思路都不算太复杂,也不存在太大的难点,须要注意一些实现上的细节,好比消费 API 针对集群消息参数的调整,特别是低阶消费 API,尤其须要注意,其 fetch_size 的大小,以及 offset 是须要咱们本身维护的。在使用 Calcite 做为 SQL 树时,咱们要遵循其 JSON Model 和标准的 SQL 语法来操做数据源。翻译
这篇博客就和你们分享到这里,若是你们在研究学习的过程中有什么问题,能够加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!