Apache Calcite是一个动态数据管理框架,它包含了许多典型数据库管理系统的部分,但省略了一些关键功能:数据存储、数据处理算法和元数据存储。java
基于Apache Calcite,咱们能够为任何第三方存储引擎开发SQL查询引擎。git
https://calcite.apache.org/github
https://github.com/apache/calcite算法
要想了解Calcite,其实官方文档确实不妨一看。尽管官方文档会说起很是多你以前可能没有接触过的概念,但好在它文档内容很少,这样让你对SQL执行中可能涉及的一些关键术语留下一个印象,那对之后深刻学习和使用Calcite仍是有帮助的,毕竟若是真的想用好Calcite,或者说只是使用Calcite,这些关键术语都是须要掌握和理解的。sql
不过,仅仅看官方文档,那仍是远远不够的。回过头再来看Calcite的文档时,你会发现,它彻底是写给“高端玩家”的,它是对Calcite高度抽象总结的文档,并非写给初学者来进行学习的,以致于你想经过官方文档来跑个QuickStart,那也是至关困难,我的以为没有必定的折腾能力或者对SQL执行没有理解经验的话,确实不太容易达成。所以,不能仅仅只看官方文档,你还须要经过其它途径获取更多关于它的信息,关于初学者如何快速掌握Apache Calcite,如下是我我的的一些心得体会:数据库
1.先简单用起来apache
Calcite做为一个数据管理框架,首先,你得把它用起来才能慢慢理解它究竟是干吗的。理论上,经过Calcite这个组件,你能够以SQL的方式来访问任何你想要访问的数据源,好比你的文件(无论你是什么格式)、Java对象、第三方存储引擎(Redis、Kafka、Elasticsearch)等,因此我是用了“任何”来讲明它的能力,这是它实实在在存在的能力。编程
本文档会手把手教你,怎么样经过Calcite以SQL的方式来访问CSV文件、Java内部对象、Elasticsearch数据源。json
2.生产使用与思考浏览器
因此一旦你知道Calcite能够经过SQL的方式来访问任何的数据源以后,我知道有想法的同窗已经会考虑到:
使用者不须要感知数据存储在哪里,在他们看来,这就是一个只提供SQL查询入口的查询系统,它屏蔽了它所接入的各类存储系统的差别;
答案是确定的,Calcite是一个组件,本质上也是一个框架,它提供了各类扩展点来让你实现这样的功能。
固然若是你想借用Calcite针对某个存储系统开发一个好的SQL引擎,仍是须要至关大的努力的,好比VolcanoPlanner就须要好好理解下,比较惋惜的是,直到如今我也没有精力去研究它,以致于我想为Elasticsearch开发一个SQL引擎的想法都迟迟未能实现。
所谓的“借用Calcite针对某个存储系统开发一个好的SQL引擎”,其实在Calcite里有一个专业的术语,叫作“数据源适配器”。
Calcite自己也提供了多个存储引擎的适配器,好比Druid、Elasticsearch、SparkSQL等等,固然开源的就并不必定得,前面之因此一直说起要从新写一个Elasticsearch的适配器,是由于我以为Calcite自己提供的ES适配器能力比较弱,相信用过的同窗都会有所体会。
3.深度使用与思考
实际上若是只是想知道Calcite怎么使用的,有哪些功能可使用的,咱们不妨站在巨人的肩膀上,看看业界的开源项目是怎么使用它的。
一个不错的参考是Apache Druid,其SQL引擎正是基于Apache Calcite来开发构建的,所以对于Calcite更多高级功能的使用,咱们不妨去研究一下Apache Druid-SQL模块的源码,相信会有很是大的收获。
4.VolcanPlanner
有时间和精力研究一下其在Calcite的实现,我的以为会很是不错。
本文档会手把手教你,怎么样经过Calcite以SQL的方式来访问CSV文件、Java内部对象、Elasticsearch数据源。
对于Calcite更多的实现细节,仍是本身想办法根据实际应用场景,去思考一下它的各个模块功能,好比想了解某一个功能原理,就去看其源码结构和细节,我相信这自己对我的能力的提高都是极其有帮助的。
先构建一个maven项目,而后引入Calcite的依赖:
<dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> <version>1.20.0</version> </dependency> <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-example-csv</artifactId> <version>1.20.0</version> </dependency> <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-elasticsearch</artifactId> <version>1.20.0</version> </dependency>
先准备一个CSV文件:
EMPNO:long,NAME:string,DEPTNO:int,GENDER:string,CITY:string,EMPID:int,AGE:int,SLACKER:boolean,MANAGER:boolean,JOINEDAT:date 100,"Fred",10,,,30,25,true,false,"1996-08-03" 110,"Eric",20,"M","San Francisco",3,80,,false,"2001-01-01" 110,"John",40,"M","Vancouver",2,,false,true,"2002-05-03" 120,"Wilma",20,"F",,1,5,,true,"2005-09-07" 130,"Alice",40,"F","Vancouver",2,,false,true,"2007-01-01"
Calcite会把每一个csv文件映射成一个SQL表。csv文件表头指定该列数据类型,根据必定规则映射到对应的SQL类型。如没有指定,则统一映射成VARCHAR。
文件命名为depts.csv,Caclite会构建表名为文件名的table,即depts.
而后编写下面的代码经过Calcite以SQL方式访问数据:
// Author: xpleaf public class CsvDemo { public static void main(String[] args) throws Exception { // 0.获取csv文件的路径,注意获取到文件所在上层路径就能够了 String path = Objects.requireNonNull(CsvDemo.class.getClassLoader().getResource("csv").getPath()); // 1.构建CsvSchema对象,在Calcite中,不一样数据源对应不一样Schema,好比CsvSchema、DruidSchema、ElasticsearchSchema等 CsvSchema csvSchema = new CsvSchema(new File(path), CsvTable.Flavor.SCANNABLE); // 2.构建Connection // 2.1 设置链接参数 Properties info = new Properties(); // 不区分sql大小写 info.setProperty("caseSensitive", "false"); // 2.2 获取标准的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 获取Calcite封装的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.构建RootSchema,在Calcite中,RootSchema是全部数据源schema的parent,多个不一样数据源schema能够挂在同一个RootSchema下 // 以实现查询不一样数据源的目的 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 4.将不一样数据源schema挂载到RootSchema,这里添加CsvSchema rootSchema.add("csv", csvSchema); // 5.执行SQL查询,经过SQL方式访问csv文件 String sql = "select * from csv.depts"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍历打印查询结果集 System.out.println(ResultSetUtil.resultString(resultSet)); } }
执行代码,其输出结果以下:
100, Fred, 10, , , 30, 25, true, false, 1996-08-03 110, Eric, 20, M, San Francisco, 3, 80, null, false, 2001-01-01 110, John, 40, M, Vancouver, 2, null, false, true, 2002-05-03 120, Wilma, 20, F, , 1, 5, null, true, 2005-09-07 130, Alice, 40, F, Vancouver, 2, null, false, true, 2007-01-01
思考:
csv是官方文档有说起的一个例子,在总体上若是须要对Calcite源码的使用有一个认识(尤为是如何开发适配器),能够基于这个demo,对照文档说起的各个概念、类,经过分析源码来进行理解,好比:
- 1.Schema是怎么构建的,在Calcite的位置和具体做用是什么;
- 2.Table是怎么构建的,在Calcite的位置和具体做用是什么;
- 3.在执行查询时是如何作SQL Parse、Validate、Optimize和执行的;
你均可以经过这个demo来一探究竟,固然,虽然我这里短短几句话带过,实际上若是你想研究这个过程,可能须要花费你较多时间,我建议不急着步子一下跨得太大,慢慢来,不急的。
另外,其实经过官方文档的介绍,对于怎么去开发一个Caclite的数据源适配器,应该也是有必定的体会的,其实若是只是实现一个简单的适配器(不考虑太多的SQL优化规则),那这个难度仍是不大的。
我经过这个例子,包括后面的几个例子,其实都是想告诉你,如何快速使用Calcite(也就是至关给你写了一个QuickStart),从而对Calcite总体使用有一个认识,若是你想更深度使用Calcite,建议:
有用过SparkSQL的同窗会知道,在SparkSQL中,可使用编程的方式来将对象实例转换为DataFrame,进而注册Table,以经过SQL来访问这些对象实例:
public class _01SparkRDD2DataFrame { public static void main(String[] args) { Logger.getLogger("org.apache.spark").setLevel(Level.OFF); SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName(_01SparkRDD2DataFrame.class.getSimpleName()) .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(new Class[]{Person.class}); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(jsc); List<Person> persons = Arrays.asList( new Person(1, "name1", 25, 179), new Person(2, "name2", 22, 176), new Person(3, "name3", 27, 178), new Person(1, "name4", 24, 175) ); DataFrame df = sqlContext.createDataFrame(persons, Person.class); // 构造方法有多个,使用personsRDD的方法也是能够的 // where age > 23 and height > 176 df.select(new Column("id"), new Column("name"), new Column("age"), new Column("height")) .where(new Column("age").gt(23).and(new Column("height").lt(179))) .show(); df.registerTempTable("person"); sqlContext.sql("select * from person where age > 23 and height < 179").show(); jsc.close(); } }
以上代码例子来自xpleaf的文章《Spark SQL笔记整理(二):DataFrame编程模型与操做案例》
注意这里给出的案例仍是Spark 1.x的用法,Spark 2.x以及以后的版本则可能不推荐这种用法了,具体请参考Spark的官方文档。
那么对应到Calcite,它也提供了相似的方式来经过SQL访问对象实例数据。
为了进行演示,咱们先构建Object对象类:
public class HrSchema { public final Employee[] emps = { new Employee(100, 10, "Bill", 10000, 1000), new Employee(200, 20, "Eric", 8000, 500), new Employee(150, 10, "Sebastian", 7000, null), new Employee(110, 10, "Theodore", 11500, 250), }; @Override public String toString() { return "HrSchema"; } public static class Employee { public int empid; public int deptno; public String name; public float salary; public Integer commission; public Employee(int empid, int deptno, String name, float salary, Integer commission) { this.empid = empid; this.deptno = deptno; this.name = name; this.salary = salary; this.commission = commission; } @Override public String toString() { return "Employee [empid: " + empid + ", deptno: " + deptno + ", name: " + name + "]"; } @Override public boolean equals(Object obj) { return obj == this || obj instanceof Employee && empid == ((Employee) obj).empid; } } }
Calcite会将HrSchema的emps映射为一张表。
编写Calcite代码以下:
public class ObjectDemo { public static void main(String[] args) throws Exception { // 1.构建CsvSchema对象,在Calcite中,不一样数据源对应不一样Schema,好比CsvSchema、DruidSchema、ElasticsearchSchema等 ReflectiveSchema reflectiveSchema = new ReflectiveSchema(new HrSchema()); // 2.构建Connection // 2.1 设置链接参数 Properties info = new Properties(); // 不区分sql大小写 info.setProperty("caseSensitive", "false"); // 2.2 获取标准的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 获取Calcite封装的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.构建RootSchema,在Calcite中,RootSchema是全部数据源schema的parent,多个不一样数据源schema能够挂在同一个RootSchema下 // 以实现查询不一样数据源的目的 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 4.将不一样数据源schema挂载到RootSchema,这里添加ReflectiveSchema rootSchema.add("hr", reflectiveSchema); // 5.执行SQL查询,经过SQL方式访问object对象实例 String sql = "select * from hr.emps"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍历打印查询结果集 System.out.println(ResultSetUtil.resultString(resultSet)); } }
执行代码,其输出结果以下:
100, 10, Bill, 10000.0, 1000 200, 20, Eric, 8000.0, 500 150, 10, Sebastian, 7000.0, null 110, 10, Theodore, 11500.0, 250
通常在使用Calcite构建统一查询系统时,Object对象表会被用于构建数据表的元数据信息表(即表有哪些字段、字段的类型、用于构建数据表的元数据信息)等,详情能够参考Apache Druid-SQL源码。
不用有压力,若是你以前彻底没有接触过Elasticsearch,也不用担忧学习成本的问题,你就彻底能够把它简单理解为一个数据库就行了,不用想那么复杂,而且,它开箱即用,没有任何部署成本。
下载:
https://www.elastic.co/cn/downloads/elasticsearch
根据对应的操做系统下载相应的版本就能够。
下载完成后,解压,进入bin目录,执行elasticsearch.bat
或elasticsearch
(取决于你的操做系统)就能够启动Elasticsearch,在浏览器上面访问localhost:9200
,返回以下信息:
{ "name": "yeyonghaodeMacBook-Pro.local", "cluster_name": "elasticsearch", "cluster_uuid": "6sMhfd0fSgSnqk7M_CTmug", "version": { "number": "7.11.1", "build_flavor": "default", "build_type": "tar", "build_hash": "ff17057114c2199c9c1bbecc727003a907c0db7a", "build_date": "2021-02-15T13:44:09.394032Z", "build_snapshot": false, "lucene_version": "8.7.0", "minimum_wire_compatibility_version": "6.8.0", "minimum_index_compatibility_version": "6.0.0-beta1" }, "tagline": "You Know, for Search" }
则说明服务已经部署成功。
接下来咱们经过postman来建立index(表)和写入数据到ES:
PUT http://localhost:9200/teachers/_doc/1 { "name":"xpleaf", "age":26, "rate":0.86, "percent":0.95, "join_time":1551058601000 }
数据写入成功后,经过postman来查询数据:
GET http://localhost:9200/teachers/_search { "took": 115, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "teachers", "_type": "_doc", "_id": "1", "_score": 1.0, "_source": { "name": "xpleaf", "age": 26, "rate": 0.86, "percent": 0.95, "join_time": 1551058601000 } } ] } }
固然你可能会说,ES自己也提供了SQL的能力,但实际上它是属于x-pack组件的一部分,是商用的,所以使用需谨慎,而且我我的以为,它提供的SQL能力比较弱。
固然Calcite的Elasticsearch适配器其实也写得通常。
有了前面的准备以后,咱们编写以下Calcite代码:
public class ElasticsearchDemo { public static void main(String[] args) throws Exception { // 1.构建ElasticsearchSchema对象,在Calcite中,不一样数据源对应不一样Schema,好比CsvSchema、DruidSchema、ElasticsearchSchema等 RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build(); ElasticsearchSchema elasticsearchSchema = new ElasticsearchSchema(restClient, new ObjectMapper(), "teachers"); // 2.构建Connection // 2.1 设置链接参数 Properties info = new Properties(); // 不区分sql大小写 info.setProperty("caseSensitive", "false"); // 2.2 获取标准的JDBC Connection Connection connection = DriverManager.getConnection("jdbc:calcite:", info); // 2.3 获取Calcite封装的Connection CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); // 3.构建RootSchema,在Calcite中,RootSchema是全部数据源schema的parent,多个不一样数据源schema能够挂在同一个RootSchema下 // 以实现查询不一样数据源的目的 SchemaPlus rootSchema = calciteConnection.getRootSchema(); // 4.将不一样数据源schema挂载到RootSchema,这里添加ElasticsearchSchema rootSchema.add("es", elasticsearchSchema); // 5.执行SQL查询,经过SQL方式访问object对象实例 String sql = "select * from es.teachers"; Statement statement = calciteConnection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); // 6.遍历打印查询结果集 System.out.println(ResultSetUtil.resultString(resultSet)); } }
执行代码,其输出结果以下:
{name=xpleaf, age=26, rate=0.86, percent=0.95, join_time=1551058601000}
经过前面的基本介绍和QuickStart,相信你对Apache Calcite已经有了最基本的了解,固然若是想要在生产环境真正使用Calcite,使用它来定制化构建咱们的统一查询系统,仅仅了解这些确定是远远不够的,确实是路漫漫其修远兮,不过不急,不要紧的,后面有机会我将会介绍更多Calcite的高级用法。
其实不少高级用法都是经过研读Apache Druid-SQL的源码得知的,因此我会一直强调,若是较多时间和精力,不妨阅读它的源码。
public class ResultSetUtil { public static String resultString(ResultSet resultSet) throws SQLException { return resultString(resultSet, false); } public static String resultString(ResultSet resultSet, boolean printHeader) throws SQLException { List<List<Object>> resultList = resultList(resultSet, printHeader); return resultString(resultList); } public static List<List<Object>> resultList(ResultSet resultSet) throws SQLException { return resultList(resultSet, false); } public static String resultString(List<List<Object>> resultList) throws SQLException { StringBuilder builder = new StringBuilder(); resultList.forEach(row -> { String rowStr = row.stream() .map(columnValue -> columnValue + ", ") .collect(Collectors.joining()); rowStr = rowStr.substring(0, rowStr.lastIndexOf(", ")) + "\n"; builder.append(rowStr); }); return builder.toString(); } public static List<List<Object>> resultList(ResultSet resultSet, boolean printHeader) throws SQLException { ArrayList<List<Object>> results = new ArrayList<>(); final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); if (printHeader) { ArrayList<Object> header = new ArrayList<>(); for (int i = 1; i <= columnCount; i++) { header.add(metaData.getColumnName(i)); } results.add(header); } while (resultSet.next()) { ArrayList<Object> row = new ArrayList<>(); for (int i = 1; i <= columnCount; i++) { row.add(resultSet.getObject(i)); } results.add(row); } return results; } }