Flink SQL Table 咱们一块儿去看2018中超联赛-Flink牛刀小试

版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,若有任何问题,可随时联系。java

写在前面的话

Flink是一个新型的流式处理引擎,做者自身只是对Spark底层较为熟悉,有兴趣能够查阅个人Spark core ,Spark Streaming 以及 Spark SQL 源码解读系列。在这里咱们只是品味一下号称第四代大数据处理引擎的Flink,做者也并无深刻到Flink底层源码级别。请见谅若是您已是FLink大牛了!看一下2018中超联赛积分榜:sql

1 SQL Table(牛刀小试)

  • Table API 是以 表 为中心的声明式DSL,其中表可能会动态变化(在表达流数据时)。Table API遵循(扩展的)关系模型:表有二维数据结构(schema)(相似于关系数据库中的表),同时API提供可比较的操做,例如select、project、join、group-by、aggregate等。Table API程序声明式地定义了 什么逻辑操做应该执行 而不是准确地肯定 这些操做代码的看上去如何 。 尽管Table API能够经过多种类型的用户自定义函数(UDF)进行扩展,其仍不如 核心API 更具表达能力,可是使用起来却更加简洁(代码量更少)。除此以外,Table API程序在执行以前会通过内置优化器进行优化。数据库

  • 你能够在表与 DataStream/DataSet 之间无缝切换,以容许程序将 Table API 与 DataStream 以及 DataSet 混合使用。express

  • Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 相似,可是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询能够直接在Table API定义的表上执行。apache

  • Apache Flink对SQL的支持能够追溯到一年前发布的0.9.0-milestone1版本。此版本经过引入Table API来提供相似于SQL查询的功能,此功能能够操做分布式的数据集,而且能够自由地和Flink其余API进行组合。Tables在发布之初就支持静态的以及流式数据(也就是提供了DataSet和DataStream相关APIs)。咱们能够将DataSet或DataStream转成Table;同时也能够将Table转换成DataSet或DataStream。api

  • The highest level abstraction offered by Flink is SQL. This abstraction is similar to the Table API both in semantics and expressiveness, but represents programs as SQL query expressions. The SQL abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the Table API.数据结构

  • 用户能够经过 TableEnvironment 类中的 sqlQuery() 方法执行SQL查询,查询结果会以 Table 形式返回。用户可将 Table 用于后续的 SQL 及 Table 查询,或将其转换成 DataSet 或 DataStream,亦可将它写入到某个 TableSink 中。不管是经过 SQL 仍是 Table API 提交的查询均可以进行无缝衔接,系统内部会对它们进行总体优化,并最终转换成一个 Flink 程序执行。分布式

  • 为了在 SQL 查询中使用某个 Table,用户必须先在 TableEnvironment 中对其进行注册。Table 的注册来源能够是某个 TableSource,某个现有的 Table,或某个DataStream 或 DataSet。此外,用户还能够经过在 TableEnvironment 中注册外部 Catalog 的方式来指定数据源位置。ide

  • 为方便使用,在执行 Table.toString() 方法时,系统会自动以一个惟一名称在当前 TableEnvironment 中注册该 Table 并返回该惟一名称。所以,在如下示例中,Table 对象均可以直接之内联(字符串拼接)方式出如今 SQL 语句中。函数

  • 注意: 现阶段Flink对于SQL的支持还并不完善。若是在查询中使用了系统尚不支持的功能,会引起 TableException 。

2 上代码分析(球队粒度进行进球聚合排序)

  • 1 进行pojo对象的数据封装。
  • 2 BatchTableEnvironment tableEnv环境生成: BatchTableEnvironment.getTableEnvironment(env);
  • 3 Table表生成:Table topScore = tableEnv.fromDataSet(topInput)
  • 4 Table表注册:tableEnv.registerTable("topScore",topScore);
  • 5 Table表查询:tableEnv.sqlQuery
  • 6 Table表转换回DataSet: tableEnv.toDataSet

2.1 详情请参考代码

import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    
    public class TableSQL {
    
        public static void main(String[] args) throws Exception{
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
            
            DataSet<String> input = env.readTextFile("C:\\CoreForBigData\\FLINK\\TopCore.csv");
            input.print();
            DataSet<TopScorers> topInput = input.map(new MapFunction<String, TopScorers>() {
                @Override
                public TopScorers map(String s) throws Exception {
                    String[] splits = s.split("\t");
                    return new TopScorers(Integer.valueOf(splits[0]),splits[1],splits[2],Integer.valueOf(splits[3]),Integer.valueOf(splits[4]),Integer.valueOf(splits[5]),Integer.valueOf(splits[6]),Integer.valueOf(splits[7]),Integer.valueOf(splits[8]),Integer.valueOf(splits[9]),Integer.valueOf(splits[10]));
                }
            });

            //将DataSet转换为Table
            Table topScore = tableEnv.fromDataSet(topInput);
            //将topScore注册为一个表
            tableEnv.registerTable("topScore",topScore);
    
            Table tapiResult = tableEnv.scan("topScore").select("club");
            tapiResult.printSchema();
    
            Table groupedByCountry = tableEnv.sqlQuery("select club, sum(jinqiu) as sum_score from topScore group by club order by sum_score desc");
            
            //转换回dataset
            DataSet<Result> result = tableEnv.toDataSet(groupedByCountry, Result.class);
    
            //将dataset map成tuple输出
            result.map(new MapFunction<Result, Tuple2<String,Integer>>() {
                @Override
                public Tuple2<String, Integer> map(Result result) throws Exception {
                    String country = result.club;
                    int sum_total_score = result.sum_score;
                    return Tuple2.of(country,sum_total_score);
                }
            }).print();
    
        }
    
        /**
         * 源数据的映射类
         */
        public static class TopScorers {
            /**
             * 排名,球员,球队,出场,进球,射正,任意球,犯规,黄牌,红牌
             */
            public Integer rank;
            public String player;
            public String club;
            public Integer chuchang;
            public Integer jinqiu;
            public Integer zhugong;
            public Integer shezheng;
            public Integer renyiqiu;
            public Integer fangui;
            public Integer huangpai;
            public Integer hongpai;
    
            public TopScorers() {
                super();
            }
    
            public TopScorers(Integer rank, String player, String club, Integer chuchang, Integer jinqiu, Integer zhugong, Integer shezheng, Integer renyiqiu, Integer fangui, Integer huangpai, Integer hongpai) {
                this.rank = rank;
                this.player = player;
                this.club = club;
                this.chuchang = chuchang;
                this.jinqiu = jinqiu;
                this.zhugong = zhugong;
                this.shezheng = shezheng;
                this.renyiqiu = renyiqiu;
                this.fangui = fangui;
                this.huangpai = huangpai;
                this.hongpai = hongpai;
            }
        }
    
        /**
         * 统计结果对应的类
         */
        public static class Result {
            public String club;
            public Integer sum_score;
    
            public Result() {}
        }
    }
复制代码

2.2 结果展现(2018恒大队很厉害,进球55个)

3 理论升华一下

3.1 Create a TableEnvironment

// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for streaming queries
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);

// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment for batch queries
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
复制代码

3.2 DSL风格用法

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// emit or convert Table
// execute query
复制代码

3.3 Register a DataStream or DataSet as Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
复制代码

3.4 Convert a DataStream or DataSet into a Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);

// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
复制代码

4 收工

经过2018中超联赛,咱们管中窥豹,学会了Flink SQL Table 的核心思想,固然本文并不完善,但愿本文可以给你们带来一些收获。辛苦成文,彼此珍惜,谢谢!

版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:禁止转载,欢迎学习。QQ邮箱地址:1120746959@qq.com,若有任何问题,可随时联系。

秦凯新 于深圳 201811262252

相关文章
相关标签/搜索