Apache Flink 零基础入门(八): SQL 编程实践

做者:伍翀(云邪)git

本文是 Apache Flink 零基础入门系列文章第八篇,将经过五个实例讲解 Flink SQL 的编程实践。github

注: 本教程实践基于 Ververica 开源的 sql-training 项目。基于 Flink 1.7.2 。sql

经过本课你能学到什么?

本文将经过五个实例来贯穿 Flink SQL 的编程实践,主要会涵盖如下几个方面的内容。docker

  1. 如何使用 SQL CLI 客户端
  2. 如何在流上运行 SQL 查询
  3. 运行 window aggregate 与 non-window aggregate,理解其区别
  4. 如何用 SQL 消费 Kafka 数据
  5. 如何用 SQL 将结果写入 Kafka 和 ElasticSearch

本文假定您已具有基础的 SQL 知识。数据库

环境准备

本文教程是基于 Docker 进行的,所以你只须要安装了 Docker 便可。不须要依赖 Java、Scala 环境、或是IDE。编程

注意:Docker 默认配置的资源可能不太够,会致使运行 Flink Job 时卡死。所以推荐配置 Docker 资源到 3-4 GB,3-4 CPUs。bootstrap

本次教程的环境使用 Docker Compose 来安装,包含了所需的各类服务的容器,包括:性能优化

  • Flink SQL Client:用来提交query,以及可视化结果
  • Flink JobManager 和 TaskManager:用来运行 Flink SQL 任务。
  • Apache Kafka:用来生成输入流和写入结果流。
  • Apache Zookeeper:Kafka 的依赖项
  • ElasticSearch:用来写入结果

咱们已经提供好了Docker Compose 配置文件,能够直接下载 docker-compose.yml 文件。bash

而后打开命令行窗口,进入存放 docker-compose.yml 文件的目录,而后运行如下命令:微信

  • Linux & MacOS
docker-compose up -d
复制代码
  • Windows
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d
复制代码

docker-compose 命令会启动全部所需的容器。第一次运行的时候,Docker 会自动地从 Docker Hub 下载镜像,这可能会须要一段时间(将近 2.3GB)。以后运行的话,几秒钟就能启动起来了。运行成功的话,会在命令行中看到如下输出,而且也能够在 http://localhost:8081 访问到 Flink Web UI。

运行 Flink SQL CLI 客户端

运行下面命令进入 Flink SQL CLI 。

docker-compose exec sql-client ./sql-client.sh
复制代码

该命令会在容器中启动 Flink SQL CLI 客户端。而后你会看到以下的欢迎界面。

数据介绍

Docker Compose 中已经预先注册了一些表和数据,能够运行 SHOW TABLES; 来查看。本文会用到的数据是 Rides 表,这是一张出租车的行车记录数据流,包含了时间和位置信息,运行 DESCRIBE Rides; 能够查看表结构。

Flink SQL> DESCRIBE Rides;
root
 |-- rideId: Long           // 行为ID (包含两条记录,一条入一条出)
 |-- taxiId: Long           // 出租车ID 
 |-- isStart: Boolean       // 开始 or 结束
 |-- lon: Float             // 经度
 |-- lat: Float             // 纬度
 |-- rideTime: TimeIndicatorTypeInfo(rowtime)     // 时间
 |-- psgCnt: Integer        // 乘客数
复制代码

Rides 表的详细定义见 training-config.yaml

实例1:过滤

例如咱们如今只想查看发生在纽约的行车记录

注:Docker 环境中已经预约义了一些内置函数,如 isInNYC(lon, lat) 能够肯定一个经纬度是否在纽约,toAreaId(lon, lat) 能够将经纬度转换成区块。

所以,此处咱们可使用 isInNYC 来快速过滤出纽约的行车记录。在 SQL CLI 中运行以下 Query:

SELECT * FROM Rides WHERE isInNYC(lon, lat);
复制代码

SQL CLI 便会提交一个 SQL 任务到 Docker 集群中,从数据源(Rides 流存储在Kafka中)不断拉取数据,并经过 isInNYC 过滤出所需的数据。SQL CLI 也会进入可视化模式,并不断刷新展现过滤后的结果:

也能够到 http://localhost:8081 查看 Flink 做业的运行状况。

实例2:Group Aggregate

咱们的另外一个需求是计算搭载每种乘客数量的行车事件数。也就是搭载1个乘客的行车数、搭载2个乘客的行车... 固然,咱们仍然只关心纽约的行车事件。

所以,咱们能够按照乘客数psgCnt作分组,使用 COUNT(*) 计算出每一个分组的事件数,注意在分组前须要先过滤出isInNYC的数据。在 SQL CLI 中运行以下 Query:

SELECT psgCnt, COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;
复制代码

SQL CLI 的可视化结果以下所示,结果每秒都在发生变化。不过最大的乘客数不会超过 6 人。

实例3:Window Aggregate

为了持续地监测纽约的交通流量,须要计算出每一个区块每5分钟的进入的车辆数。咱们只关心至少有5辆车子进入的区块。

此处须要涉及到窗口计算(每5分钟),因此须要用到 Tumbling Window 的语法。“每一个区块” 因此还要按照 toAreaId 进行分组计算。“进入的车辆数” 因此在分组前须要根据 isStart 字段过滤出进入的行车记录,并使用 COUNT(*) 统计车辆数。最后还有一个 “至少有5辆车子的区块” 的条件,这是一个基于统计值的过滤条件,因此能够用 SQL HAVING 子句来完成。

最后的 Query 以下所示:

SELECT 
  toAreaId(lon, lat) AS area, 
  TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end, 
  COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat) and isStart
GROUP BY 
  toAreaId(lon, lat), 
  TUMBLE(rideTime, INTERVAL '5' MINUTE) 
HAVING COUNT(*) >= 5;
复制代码

在 SQL CLI 中运行后,其可视化结果以下所示,每一个 area + window_end 的结果输出后就不会再发生变化,可是会每隔 5 分钟会输出一批新窗口的结果。由于 Docker 环境中的source咱们作了10倍的加速读取(相对于原始速度),因此演示的时候,大概每隔30秒就会输出一批新窗口。

Window Aggregate 与 Group Aggregate 的区别

从实例2和实例3的结果显示上,能够体验出来 Window Aggregate 与 Group Aggregate 是有一些明显的区别的。其主要的区别是,Window Aggregate 是当window结束时才输出,其输出的结果是最终值,不会再进行修改,其输出流是一个 Append 流。而 Group Aggregate 是每处理一条数据,就输出最新的结果,其结果是在不断更新的,就好像数据库中的数据同样,其输出流是一个 Update 流

另一个区别是,window 因为有 watermark ,能够精确知道哪些窗口已通过期了,因此能够及时清理过时状态,保证状态维持在稳定的大小。而 Group Aggregate 由于不知道哪些数据是过时的,因此状态会无限增加,这对于生产做业来讲不是很稳定,因此建议对 Group Aggregate 的做业配上 State TTL 的配置。

例如统计每一个店铺天天的实时PV,那么就能够将 TTL 配置成 24+ 小时,由于一天前的状态通常来讲就用不到了。

SELECT  DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id
复制代码

固然,若是 TTL 配置地过小,可能会清除掉一些有用的状态和数据,从而致使数据精确性地问题。这也是用户须要权衡地一个参数。

实例4:将 Append 流写入 Kafka

上一小节介绍了 Window Aggregate 和 Group Aggregate 的区别,以及 Append 流和 Update 流的区别。在 Flink 中,目前 Update 流只能写入支持更新的外部存储,如 MySQL, HBase, ElasticSearch。Append 流能够写入任意地存储,不过通常写入日志类型的系统,如 Kafka。

这里咱们但愿将**“每10分钟的搭乘的乘客数”**写入Kafka。

咱们已经预约义了一张 Kafka 的结果表 Sink_TenMinPsgCntstraining-config.yaml 中有完整的表定义)。

在执行 Query 前,咱们先运行以下命令,来监控写入到 TenMinPsgCnts topic 中的数据:

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning
复制代码

每10分钟的搭乘的乘客数可使用 Tumbling Window 来描述,咱们使用 INSERT INTO Sink_TenMinPsgCnts 来直接将 Query 结果写入到结果表。

INSERT INTO Sink_TenMinPsgCnts 
SELECT 
  TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,  
  TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
  CAST(SUM(psgCnt) AS BIGINT) AS cnt 
FROM Rides 
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);
复制代码

咱们能够监控到 TenMinPsgCnts topic 的数据以 JSON 的形式写入到了 Kafka 中:

实例5:将 Update 流写入 ElasticSearch

最后咱们实践一下将一个持续更新的 Update 流写入 ElasticSearch 中。咱们但愿将**“每一个区域出发的行车数”**,写入到 ES 中。

咱们也已经预约义好了一张 Sink_AreaCnts 的 ElasticSearch 结果表(training-config.yaml 中有完整的表定义)。该表中只有两个字段 areaIdcnt

一样的,咱们也使用 INSERT INTO 将 Query 结果直接写入到 Sink_AreaCnts 表中。

INSERT INTO Sink_AreaCnts 
SELECT toAreaId(lon, lat) AS areaId, COUNT(*) AS cnt 
FROM Rides 
WHERE isStart
GROUP BY toAreaId(lon, lat);
复制代码

在 SQL CLI 中执行上述 Query 后,Elasticsearch 会自动地建立 area-cnts 索引。Elasticsearch 提供了一个 REST API 。咱们能够访问

随着 Query 的一直运行,你也能够观察到一些统计值(_all.primaries.docs.count, _all.primaries.docs.deleted)在不断的增加:http://localhost:9200/area-cnts/_stats

总结

本文带你们使用 Docker Compose 快速上手 Flink SQL 的编程,并对比 Window Aggregate 和 Group Aggregate 的区别,以及这两种类型的做业如何写入到 外部系统中。感兴趣的同窗,能够基于这个 Docker 环境更加深刻地去实践,例如运行本身写的 UDF , UDTF, UDAF。查询内置地其余源表等等。


▼ Apache Flink 社区推荐 ▼

Apache Flink 及大数据领域顶级盛会 Flink Forward Asia 2019 重磅开启,目前正在征集议题,限量早鸟票优惠ing。了解 Flink Forward Asia 2019 的更多信息,请查看:

developer.aliyun.com/special/ffa…

首届 Apache Flink 极客挑战赛重磅开启,聚焦机器学习与性能优化两大热门领域,40万奖金等你拿,加入挑战请点击:

tianchi.aliyun.com/markets/tia…

关注 Flink 官方社区微信公众号,了解更多 Flink 资讯!

相关文章
相关标签/搜索