上周六在深圳分享了《Flink SQL 1.9.0 技术内幕和最佳实践》,会后许多小伙伴对最后演示环节的 Demo 代码很是感兴趣,火烧眉毛地想尝试下,因此写了这篇文章分享下这份代码。但愿对于 Flink SQL 的初学者能有所帮助。完整分享能够观看 Meetup 视频回顾 :https://developer.aliyun.com/...html
演示代码已经开源到了 GitHub 上:https://github.com/wuchong/fl...java
这份代码主要由两部分组成:1) 能用来提交 SQL 文件的 SqlSubmit 实现。2) 用于演示的 SQL 示例、Kafka 启动中止脚本、 一份测试数据集、Kafka 数据源生成器。mysql
经过本实战,你将学到:git
笔者一开始是想用 SQL Client 来贯穿整个演示环节,但惋惜 1.9 版本 SQL CLI 还不支持处理 CREATE TABLE 语句。因此笔者就只好本身写了个简单的提交脚本。后来想一想,也挺好的,可让听众同时了解如何经过 SQL 的方式,和编程的方式使用 Flink SQL。github
SqlSubmit 的主要任务是执行和提交一个 SQL 文件,实现很是简单,就是经过正则表达式匹配每一个语句块。若是是 CREATE TABLE 或 INSERT INTO 开头,则会调用 tEnv.sqlUpdate(...)。若是是 SET 开头,则会将配置设置到 TableConfig 上。其核心代码主要以下所示:正则表达式
EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); // 建立一个使用 Blink Planner 的 TableEnvironment, 并工做在流模式 TableEnvironment tEnv = TableEnvironment.create(settings); // 读取 SQL 文件 List<String> sql = Files.readAllLines(path); // 经过正则表达式匹配前缀,来区分不一样的 SQL 语句 List<SqlCommandCall> calls = SqlCommandParser.parse(sql); // 根据不一样的 SQL 语句,调用 TableEnvironment 执行 for (SqlCommandCall call : calls) { switch (call.command) { case SET: String key = call.operands[0]; String value = call.operands[1]; // 设置参数 tEnv.getConfig().getConfiguration().setString(key, value); break; case CREATE_TABLE: String ddl = call.operands[0]; tEnv.sqlUpdate(ddl); break; case INSERT_INTO: String dml = call.operands[0]; tEnv.sqlUpdate(dml); break; default: throw new RuntimeException("Unsupported command: " + call.command); } } // 提交做业 tEnv.execute("SQL Job");
在 flink-sql-submit 项目中,咱们准备了一份测试数据集(来自阿里云天池公开数据集,特别鸣谢),位于 src/main/resources/user_behavior.log。数据以 JSON 格式编码,大概长这个样子:sql
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
为了模拟真实的 Kafka 数据源,笔者还特意写了一个 source-generator.sh 脚本(感兴趣的能够看下源码),会自动读取 user_behavior.log 的数据并以默认每毫秒1条的速率灌到 Kafka 的 user_behavior topic 中。docker
有了数据源后,咱们就能够用 DDL 去建立并链接这个 Kafka 中的 topic(详见 src/main/resources/q1.sql)。数据库
CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取 'connector.properties.0.key' = 'zookeeper.connect', -- 链接信息 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', -- 数据源格式为 json 'format.derive-schema' = 'true' -- 从 DDL schema 肯定 json 解析规则 )
注:可能有用户会以为其中的 connector.properties.0.key 等参数比较奇怪,社区计划将在下一个版本中改进并简化 connector 的参数配置。apache
链接 MySQL 可使用 Flink 提供的 JDBC connector。例如
CREATE TABLE pvuv_sink ( dt VARCHAR, pv BIGINT, uv BIGINT ) WITH ( 'connector.type' = 'jdbc', -- 使用 jdbc connector 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url 'connector.table' = 'pvuv_sink', -- 表名 'connector.username' = 'root', -- 用户名 'connector.password' = '123456', -- 密码 'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改成1条 )
假设咱们的需求是计算每小时全网的用户访问量,和独立用户数。不少用户可能会想到使用滚动窗口来计算。但这里咱们介绍另外一种方式。即 Group Aggregation 的方式。
INSERT INTO pvuv_sink SELECT DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')
它使用 DATE_FORMAT 这个内置函数,将日志时间归一化成“年月日小时”的字符串格式,并根据这个字符串进行分组,即根据每小时分组,而后经过 COUNT(*) 计算用户访问量(PV),经过 COUNT(DISTINCT user_id) 计算独立用户数(UV)。这种方式的执行模式是每收到一条数据,便会进行基于以前计算的值作增量计算(如+1),而后将最新结果输出。因此实时性很高,但输出量也大。
咱们将这个查询的结果,经过 INSERT INTO 语句,写到了以前定义的 pvuv_sink MySQL 表中。
注:在深圳 Meetup 中,咱们有对这种查询的性能调优作了深度的介绍。
本实战演示环节须要安装一些必须的服务,包括:
1.下载 Flink 1.9.0 安装包并解压:https://www.apache.org/dist/f...
2.下载如下依赖 jar 包,并拷贝到 flink-1.9.0/lib/ 目录下。由于咱们运行时须要依赖各个 connector 实现。
http://central.maven.org/mave...
http://central.maven.org/mave...
http://central.maven.org/mave...
https://dev.mysql.com/downloa...
3.将 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改为 10,由于咱们的演示任务可能会消耗多于1个的 slot。
4.在 flink-1.9.0 目录下执行 ./bin/start-cluster.sh,启动集群。
运行成功的话,能够在 http://localhost:8081 访问到 Flink Web UI。
另外,还须要将 Flink 的安装路径填到 flink-sql-submit 项目的 env.sh 中,用于后面提交 SQL 任务,如个人路径是
FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0
下载 Kafka 2.2.0 安装包并解压:https://www.apache.org/dist/k...
将安装路径填到 flink-sql-submit 项目的 env.sh 中,如个人路径是
KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0
在 flink-sql-submit 目录下运行 ./start-kafka.sh 启动 Kafka 集群。
在命令行执行 jps,若是看到 Kafka 进程和 QuorumPeerMain 进程即代表启动成功。
能够在官方页面下载 MySQL 并安装:
https://dev.mysql.com/downloa...
若是有 Docker 环境的话,也能够直接经过 Docker 安装
https://hub.docker.com/_/mysql
$ docker pull mysql $ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql
而后在 MySQL 中建立一个 flink-test
的数据库,并按照上文的 schema 建立 pvuv_sink
表。
1.在 flink-sql-submit
目录下运行 ./source-generator.sh
,会自动建立 user_behavior topic
,并实时往里灌入数据。
2.在 flink-sql-submit
目录下运行 ./run.sh q1
, 提交成功后,能够在 Web UI 中看到拓扑。
在 MySQL 客户端,咱们也能够实时地看到每一个小时的 pv uv 值在不断地变化
本文带你们搭建基础集群环境,并使用 SqlSubmit 提交纯 SQL 任务来学习了解如何链接外部系统。flink-sql-submit/src/main/resources/q1.sql
中还有一些注释掉的调优参数,感兴趣的同窗能够将参数打开,观察对做业的影响。关于这些调优参数的原理,能够看下我在 深圳 Meetup 上的分享《Flink SQL 1.9.0 技术内幕和最佳实践》。
▼ Apache Flink 社区推荐 ▼
Apache Flink 及大数据领域顶级盛会 Flink Forward Asia 2019 重磅开启,目前正在征集议题,限量早鸟票优惠ing。了解 Flink Forward Asia 2019 的更多信息,请查看:
https://developer.aliyun.com/...
首届 Apache Flink 极客挑战赛重磅开启,聚焦机器学习与性能优化两大热门领域,40万奖金等你拿,加入挑战请点击: