Kafka - SQL 引擎分享

1.概述

  大多数状况下,咱们使用 Kafka 只是做为消息处理。在有些状况下,咱们须要屡次读取 Kafka 集群中的数据。固然,咱们能够经过调用 Kafka 的 API 来完成,可是针对不一样的业务需求,咱们须要去编写不一样的接口,在通过编译,打包,发布等一系列流程。最后才能看到咱们预想的结果。那么,咱们能不能有一种简便的方式去实现这一部分功能,经过编写 SQL 的方式,来可视化咱们的结果。今天,笔者给你们分享一些心得,经过使用 SQL 的形式来完成这些需求。前端

2.内容

  实现这些功能,其架构和思路并不复杂。这里笔者将整个实现流程,经过一个原理图来呈现。以下图所示:json

  这里笔者给你们详述一下上图的含义,消息数据源存放与 Kafka 集群当中,开启低阶和高阶两个消费线程,将消费的结果以 RPC 的方式共享出去(即:请求者)。数据共享出去后,回流经到 SQL 引擎处,将内存中的数据翻译成 SQL Tree,这里使用到了 Apache 的 Calcite 项目来承担这一部分工做。而后,咱们经过 Thrift 协议来响应 Web Console 的 SQL 请求,最后将结果返回给前端,让其以图表的实行可视化。架构

3.插件配置

  这里,咱们须要遵循 Calcite 的 JSON Models,好比,针对 Kafka 集群,咱们须要配置一下内容:学习

{
    version: '1.0',
    defaultSchema: 'kafka',  
    schemas: [  
        {
            name: 'kafka',  
            type: 'custom',
            factory: 'cn.smartloli.kafka.visual.engine.KafkaMemorySchemaFactory',  
            operand: {
                database: 'kafka_db'
            }  
        } 
    ]
}

  另外,这里最好对表也作一个表述,配置内容以下所示:fetch

[
    {
        "table":"Kafka",
        "schemas":{
            "_plat":"varchar",
            "_uid":"varchar",
            "_tm":"varchar",
            "ip":"varchar",
            "country":"varchar",
            "city":"varchar",
            "location":"jsonarray"
        }
    }
]

4.操做

  下面,笔者给你们演示经过 SQL 来操做相关内容。相关截图以下所示:ui

  在查询处,填写相关 SQL 查询语句。点击 Table 按钮,获得以下所示结果:spa

  咱们,能够将获取的结果以报表的形式进行导出。插件

  固然,咱们能够在 Profile 模块下,浏览查询历史记录和当前正在运行的查询任务。至于其余模块,都属于辅助功能(展现集群信息,Topic 的 Partition 信息等)这里就很少赘述了。线程

5.总结

  分析下来,总体架构和实现的思路都不算太复杂,也不存在太大的难点,须要注意一些实现上的细节,好比消费 API 针对集群消息参数的调整,特别是低阶消费 API,尤其须要注意,其 fetch_size 的大小,以及 offset 是须要咱们本身维护的。在使用 Calcite 做为 SQL 树时,咱们要遵循其 JSON Model 和标准的 SQL 语法来操做数据源。翻译

6.结束语

这篇博客就和你们分享到这里,若是你们在研究学习的过程中有什么问题,能够加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

相关文章
相关标签/搜索