虽然Flink已经支持了DataSet和DataStream API,可是有没有一种更好的方式去编程,而不用关心具体的API实现?不须要去了解Java和Scala的具体实现。java
Flink provides three layered APIs. Each API offers a different trade-off between conciseness and expressiveness and targets different use cases.sql
Flink提供了三层API,每一层API提供了一个在简洁性和表达力之间的权衡 。express
最低层是一个有状态的事件驱动。在这一层进行开发是很是麻烦的。apache
虽然不少功能基于DataSet和DataStreamAPI是能够完成的,须要熟悉这两套API,并且必需要熟悉Java和Scala,这是有必定的难度的。一个框架若是在使用的过程当中无法使用SQL来处理,那么这个框架就有很大的限制。虽然对于开发人员无所谓,可是对于用户来讲却不显示。所以SQL是很是面向大众语言。编程
比如MapReduce使用Hive SQL,Spark使用Spark SQL,Flink使用Flink SQL。api
虽然Flink支持批处理/流处理,那么如何作到API层面的统一?框架
这样Table和SQL应运而生。ide
这其实就是一个关系型API,操做起来如同操做Mysql同样简单。ui
Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing. The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way. spa
Apache Flink经过使用Table API和SQL 两大特性,来统一批处理和流处理。 Table API是一个查询API,集成了Scala和Java语言,而且容许使用select filter join等操做。
使用Table SQL API须要额外依赖
java:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency>
scala:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> </dependency>
首先导入上面的依赖,而后读取sales.csv文件,文件内容以下:
transactionId,customerId,itemId,amountPaid 111,1,1,100.0 112,2,2,505.0 113,1,3,510.0 114,2,4,600.0 115,3,2,500.0 116,4,2,500.0 117,1,2,500.0 118,1,2,500.0 119,1,3,500.0 120,1,2,500.0 121,2,4,500.0 122,1,2,500.0 123,1,4,500.0 124,1,2,500.0
object TableSQLAPI { def main(args: Array[String]): Unit = { val bEnv = ExecutionEnvironment.getExecutionEnvironment val bTableEnv = BatchTableEnvironment.create(bEnv) val filePath="E:/test/sales.csv" // 已经拿到DataSet val csv = bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine = true) // DataSet => Table } case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double ) }
首先拿到DataSet,接下来将DataSet转为Table,而后就能够执行SQL了
// DataSet => Table val salesTable = bTableEnv.fromDataSet(csv) // 注册成Table Table => table bTableEnv.registerTable("sales", salesTable) // sql val resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId") bTableEnv.toDataSet[Row](resultTable).print()
输出结果以下:
4,500.0 3,500.0 1,4110.0 2,1605.0
这种方式只须要使用SQL就能够实现以前写mapreduce的功能。大大方便了开发过程。
package com.vincent.course06; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.types.Row; public class JavaTableSQLAPI { public static void main(String[] args) throws Exception { ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv); DataSource<Sales> salesDataSource = bEnv.readCsvFile("E:/test/sales.csv").ignoreFirstLine(). pojoType(Sales.class, "transactionId", "customerId", "itemId", "amountPaid"); Table sales = bTableEnv.fromDataSet(salesDataSource); bTableEnv.registerTable("sales", sales); Table resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId"); DataSet<Row> rowDataSet = bTableEnv.toDataSet(resultTable, Row.class); rowDataSet.print(); } public static class Sales { public String transactionId; public String customerId; public String itemId; public Double amountPaid; @Override public String toString() { return "Sales{" + "transactionId='" + transactionId + '\'' + ", customerId='" + customerId + '\'' + ", itemId='" + itemId + '\'' + ", amountPaid=" + amountPaid + '}'; } } }