Apache Flink 漫谈系列 - 持续查询(Continuous Queries)

实际问题

咱们知道在流计算场景中,数据是源源不断的流入的,数据流永远不会结束,那么计算就永远不会结束,若是计算永远不会结束的话,那么计算结果什么时候输出呢?本篇将介绍Apache Flink利用持续查询来对流计算结果进行持续输出的实现原理。sql

数据管理

在介绍持续查询以前,咱们先看看Apache Flink对数据的管理和传统数据库对数据管理的区别,以MySQL为例,以下图:
数据库

如上图所示传统数据库是数据存储和查询计算于一体的架构管理方式,这个很明显,oracle数据库不可能管理MySQL数据库数据,反之亦然,每种数据库厂商都有本身的数据库管理和存储的方式,各自有特有的实现。在这点上Apache Flink海纳百川(也有corner case),将data store 进行抽象,分为source(读) 和 sink(写)两种类型接口,而后结合不一样存储的特色提供经常使用数据存储的内置实现,固然也支持用户自定义的实现。架构

那么在宏观设计上Apache Flink与传统数据库同样均可以对数据表进行SQL查询,并将产出的结果写入到数据存储里面,那么Apache Flink上面的SQL查询和传统数据库查询的区别是什么呢?Apache Flink又是如何作到求同(语义相同)存异(实现机制不一样),完美支持ANSI-SQL的呢?oracle

静态查询

传统数据库中对表(好比 flink_tab,有user和clicks两列,user主键)的一个查询SQL(select * from flink_tab)在数据量容许的状况下,会马上返回表中的全部数据,在查询结果显示以后,对数据库表flink_tab的DML操做将与执行的SQL无关了。也就是说传统数据库下面对表的查询是静态查询,将计算的最终查询的结果当即输出,以下:app

select * from flink_tab;
+----+------+--------+
| id | user | clicks |
+----+------+--------+
|  1 | Mary |      1 |
+----+------+--------+
1 row in set (0.00 sec)

当我执行完上面的查询,查询结果当即返回,上面状况告诉咱们表 flink_tab里面只有一条记录,id=1,user=Mary,clicks=1; 这样传统数据库表的一条查询语句就彻底结束了。传统数据库表在查询那一刻咱们这里叫Static table,是指在查询的那一刻数据库表的内容再也不变化了,查询进行一次计算完成以后表的变化也与本次查询无关了,咱们将在Static Table 上面的查询叫作静态查询。优化

持续查询

什么是连续查询呢?连续查询发生在流计算上面,在 《Apache Flink 漫谈系列 - 流表对偶(duality)性》 中咱们提到过Dynamic Table,连续查询是做用在Dynamic table上面的,永远不会结束的,随着表内容的变化计算在不断的进行着...ui

静态/持续查询特色

静态查询和持续查询的特色就是《Apache Flink 漫谈系列 - 流表对偶(duality)性》中所提到的批与流的计算特色,批一次查询返回一个计算结果就结束查询,流一次查询不断修正计算结果,查询永远不结束,表格示意以下:阿里云

查询类型 计算次数 计算结果
静态查询 1 最终结果
持续查询 无限 不断更新

静态/持续查询关系

接下来咱们以flink_tab表实际操做为例,体验一下静态查询与持续查询的关系。假如咱们对flink_tab表再进行一条增长和一次更新操做,以下:spa

MySQL> insert into flink_tab(user, clicks) values ('Bob', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update flink_tab set clicks=2 where user='Mary';
Query OK, 1 row affected (0.06 sec)

这时候咱们再进行查询 select * from flink_tab ,结果以下:插件

MySQL> select * from flink_tab;
+----+------+--------+
| id | user | clicks |
+----+------+--------+
|  1 | Mary |      2 |
|  2 | Bob  |      1 |
+----+------+--------+
2 rows in set (0.00 sec)

那么咱们看见,相同的查询SQL(select * from flink_tab),计算结果彻底 不 同样了。这说明相同的sql语句,在不一样的时刻执行计算,获得的结果可能不同(有点像废话),就以下图同样:

假设不断的有人在对表flink_tab作操做,同时有一我的间歇性的发起对表数据的查询,上图咱们只是在三个时间点进行了3次查询。而且在这段时间内数据表的内容也在变化。引发上面变化的DML以下:

MySQL> insert into flink_tab(user, clicks) values ('Llz', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update flink_tab set clicks=2 where user='Bob';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> update flink_tab set clicks=3 where user='Mary';
Query OK, 1 row affected (0.05 sec)
Rows matched: 1  Changed: 1  Warnings: 0

到如今咱们不难想象,上面图内容的核心要点以下:

  • 时间
  • 表数据变化
  • 触发计算
  • 计算结果更新

接下来咱们利用传统数据库现有的机制模拟一下持续查询...

无PK的 Append only 场景

接下来咱们把上面隐式存在的时间属性timestamp做为表flink_tab_ts(timestamp,user,clicks三列,无主键)的一列,再写一个 触发器(Trigger) 示例观察一下:

timestamp user clicks
1525099013 Mary 1
1525099026 Bob 1
1525099035 Mary 2
1525099047 Llz 1
1525099056 Bob 2
1525099065 Mary 3
// INSERT 的时候查询一下数据flink_tab_ts,将结果写到trigger.sql中
 DELIMITER ;;
create trigger flink_tab_ts_trigger_insert after insert
on flink_tab_ts for each row
  begin
       select ts, user, clicks from flink_tab_ts into OUTFILE '/Users/jincheng.sunjc/testdir/atas/trigger.sql';
  end ;;
DELIMITER ;

上面的trigger要将查询结果写入本地文件,默认MySQL是不容许写入的,咱们查看一下:

MySQL> show variables like '%secure%';
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| require_secure_transport | OFF   |
| secure_file_priv         | NULL  |
+--------------------------+-------+
2 rows in set (0.00 sec)

上面secure_file_priv属性为NULL,说明MySQL不容许写入file,我须要修改my.cnf在添加secure_file_priv=''打开写文件限制;

MySQL> show variables like '%secure%';
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| require_secure_transport | OFF   |
| secure_file_priv         |       |
+--------------------------+-------+
2 rows in set (0.00 sec)

下面咱们对flink_tab_ts进行INSERT操做:

咱们再来看看6次trigger 查询计算的结果:

你们到这里发现我写了Trigger的存储过程以后,每次在数据表flink_tab_ts进行DML操做的时候,Trigger就会触发一次查询计算,产出一份新的计算结果,观察上面的查询结果发现,结果表不停的增长(Append only)。

有PK的Update场景

咱们利用flink_tab_ts的6次DML操做和自定义的触发器TriggerL来介绍了什么是持续查询,作处理静态查询与持续查询的关系。那么上面的演示目的是为了说明持续查询,全部操做都是insert,没有基于主键的更新,也就是说Trigger产生的结果都是append only的,那么你们想想,若是咱们操做flink_tab这张表,按主键user进行插入和更新操做,一样利用Trigger机制来进行持续查询,结果是怎样的的呢? 初始化表,trigger:

drop table flink_tab;
create table flink_tab(
    user VARCHAR(100) NOT NULL,
    clicks INT NOT NULL,
    PRIMARY KEY (user)
 );

 DELIMITER ;;
create trigger flink_tab_trigger_insert after insert
on flink_tab for each row
  begin
       select user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql';
  end ;;
DELIMITER ;

DELIMITER ;;
create trigger flink_tab_trigger_ after update
on flink_tab for each row
  begin
        select ts, user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql';
  end ;;
DELIMITER ;

 一样我作以下6次DML操做,Trigger 6次查询计算:

在来看看此次的结果与append only 有什么不一样?

我想你们早就知道这结果了,数据库里面定义的PK全部变化会按PK更新,那么触发的6次计算中也会获得更新后的结果,这应该不难理解,查询结果也是不断更新的(Update)!

关系定义 

上面Append Only 和 Update两种场景在MySQL上面均可以利用Trigger机制模拟 持续查询的概念,也就是说数据表中每次数据变化,咱们都触发一次相同的查询计算(只是计算时候数据的集合发生了变化),由于数据表不断的变化,这个表就能够看作是一个动态表Dynamic Table,而查询SQL(select * from flink_tab_ts) 被触发器Trigger在知足某种条件后不停的触发计算,进而也不断地产生新的结果。这种做用在Dynamic Table,而且有某种机制(Trigger)不断的触发计算的查询咱们就称之为 持续查询。

那么到底静态查询和动态查询的关系是什么呢?在语义上 持续查询 中的每一次查询计算的触发都是一次静态查询(相对于当时查询的时间点),  在实现上 Apache Flink会利用上一次查询结果+当前记录 以增量的方式完成查询计算。

特别说明: 上面咱们利用 数据变化+Trigger方式描述了持续查询的概念,这里有必要特别强调一下的是数据库中trigger机制触发的查询,每次都是一个全量查询,这与Apache Flink上面流计算的持续查询概念相同,但实现机制彻底不一样,Apache Flink上面的持续查询内部实现是增量处理的,随着时间的推移,每条数据的到来实时处理当前的那一条记录,不会处理曾经来过的历史记录!

Apache Flink 如何作到持续查询

动态表上面持续查询

在 《Apache Flink 漫谈系列 - 流表对偶(duality)性》 中咱们了解到流和表能够相互转换,在Apache Flink流计算中携带流事件的Schema,通过算子计算以后再产生具备新的Schema的事件,流入下游节点,在产生新的Schema的Event和不断流转的过程就是持续查询做用的结果,以下图:

增量计算

咱们进行查询大多数场景是进行数据聚合,好比查询SQL中利用count,sum等aggregate function进行聚合统计,那么流上的数据源源不断的流入,咱们既不能等全部事件流入结束(永远不会结束)再计算,也不会每次来一条事件就像传统数据库同样将所有事件集合从新总体计算一次,在持续查询的计算过程当中,Apache Flink采用增量计算的方式,也就是每次计算都会将计算结果存储到state中,下一条事件到来的时候利用上次计算的结果和当前的事件进行聚合计算,好比 有一个订单表,以下:

一个简单的计数和求和查询SQL:

// 求订单总数和全部订单的总金额
select count(id) as cnt,sum(amount)as sumAmount from order_tab;

这样一个简单的持续查询计算,Apache Flink内部是如何处理的呢?以下图:

如上图,Apache Flink中每来一条事件,就进行一次计算,而且每次计算后结果会存储到state中,供下一条事件到来时候进行计算,即:

result(n) = calculation(result(n-1), n)。

无PK的Append Only 场景 

在实际的业务场景中,咱们只须要进行简单的数据统计,而后就将统计结果写入到业务的数据存储系统里面,好比上面统计订单数量和总金额的场景,订单表自己是一个append only的数据源(假设没有更新,截止到2018.5.14日,Apache Flink内部支持的数据源都是append only的),在持续查询过程当中通过count(id),sum(amount)统计计算以后产生的动态表也是append only的,种场景Apache Flink内部只须要进行aggregate function的聚合统计计算就能够,以下:

有PK的Update 场景

如今咱们将上面的订单场景稍微变化一下,在数据表上面咱们将金额字段amount,变为地区字段region,数据以下:

查询统计的变为,在计算具备相同订单数量的地区数量;查询SQL以下:

CREATE TABLE order_tab(
   id BIGINT,
   region VARCHAR
 ) 

CREATE TABLE region_count_sink(
   order_cnt BIGINT, 
   region_cnt BIGINT,
   PRIMARY KEY(order_cnt) -- 主键
) 

-- 按地区分组计算每一个地区的订单数量
CREATE VIEW order_count_view AS
    SELECT
        region, count(id) AS order_cnt
    FROM  order_tab 
    GROUP BY region;

-- 按订单数量分组统计具备相同订单数量的地区数量
INSERT INTO region_count_sink 
    SELECT 
        order_cnt,
        count(region) as region_cnt
    FROM order_count_view 
    GROUP BY order_cnt;

上面查询SQL的代码结构以下(这个图示在Alibaba StreamCompute的集成IDE环境生成的,了解更多):

上面SQL中咱们发现有两层查询计算逻辑,第一个查询计算逻辑是与SOURCE相连的按地区统计订单数量的分组统计,第二个查询计算逻辑是在第一个查询产出的动态表上面进行按订单数量统计地区数量的分组统计,咱们一层一层分析。

错误处理
  • 第一层分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region; 

  • 第二层分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;

按照第一层分析的结果,再分析第二层产出的结果,咱们分析的过程是对的,可是最终写到sink表的计算结果是错误的,那咱们错在哪里了呢?

其实当 (SH,2)这条记录来的时候,之前来过的(SH, 1)已是脏数据了,当(BJ, 2)来的时候,已经参与过计算的(BJ, 1)也变成脏数据了,一样当(BJ, 3)来的时候,(BJ, 2)也是脏数据了,上面的分析,没有处理脏数据进而致使最终结果的错误。那么Apache Flink内部是如何正确处理的呢?

正确处理
  • 第一层分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region;
  • 第二层分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;

上面咱们将有更新的事件进行打标的方式来处理脏数据,这样在Apache Flink内部计算的时候 算子会根据事件的打标来处理事件,在aggregate function中有两个对应的方法(retract和accumulate)来处理不一样标识的事件,如上面用到的count AGG,内部实现以下:

def accumulate(acc: CountAccumulator): Unit = {
    acc.f0 += 1L // acc.f0 存储记数
}

def retract(acc: CountAccumulator, value: Any): Unit = {
    if (value != null) {
      acc.f0 -= 1L //acc.f0 存储记数
    }
}

Apache Flink内部这种为事件进行打标的机制叫作 retraction。retraction机制保障了在流上已经流转到下游的脏数据须要被撤回问题,进而保障了持续查询的正确语义。

Apache Flink Connector 类型

本篇一开始就对比了MySQL的数据存储和Apache Flink数据存储的区别,Apache Flink目前是一个计算平台,将数据的存储以高度抽象的插件机制与各类已有的数据存储无缝对接。目前Apache Flink中将数据插件称之为连接器Connector,Connnector又按数据的读和写分红Soruce(读)和Sink(写)两种类型。对于传统数据库表,PK是一个很重要的属性,在频繁的按某些字段(PK)进行更新的场景,在表上定义PK很是重要。那么做为彻底支持ANSI-SQL的Apache Flink平台在Connector上面是否也支持PK的定义呢?

Apache Flink Source

如今(2018.11.5)Apache Flink中用于数据流驱动的Source Connector上面没法定义PK,这样在某些业务场景下会形成数据量较大,形成计算资源没必要要的浪费,甚至有聚合结果不是用户“指望”的状况。咱们以双流JOIN为例来讲明:

SQL:

CREATE TABLE inventory_tab(
   product_id VARCHAR,
   product_count BIGINT
); 

CREATE TABLE sales_tab(
   product_id VARCHAR,
   sales_count BIGINT
  ) ;

CREATE TABLE join_sink(
   product_id VARCHAR, 
   product_count BIGINT,
   sales_count BIGINT,
   PRIMARY KEY(product_id)
);

CREATE VIEW join_view AS
    SELECT
        l.product_id, 
        l.product_count,
        r.sales_count
    FROM inventory_tab l 
        JOIN  sales_tab r 
        ON l.product_id = r.product_id;

INSERT INTO join_sink 
  SELECT 
      product_id, 
      product_count,
      sales_count
  FROM join_view ;

代码结构图:

实现示意图:

上图描述了一个双流JOIN的场景,双流JOIN的底层实现会将左(L)右(R)两面的数据都持久化到Apache Flink的State中,当L流入一条事件,首先会持久化到LState,而后在和RState中存储的R中全部事件进行条件匹配,这样的逻辑若是R流product_id为P001的产品销售记录已经流入4条,L流的(P001, 48) 流入的时候会匹配4条事件流入下游(join_sink)。

问题

上面双流JOIN的场景,咱们发现其实inventory和sales表是有业务的PK的,也就是两张表上面的product_id是惟一的,可是因为咱们在Sorure上面没法定义PK字段,表上面全部的数据都会以append only的方式从source流入到下游计算节点JOIN,这样就致使了JOIN内部全部product_id相同的记录都会被匹配流入下游,上面的例子是 (P001, 48) 来到的时候,就向下游流入了4条记录,不难想象每一个product_id相同的记录都会与历史上全部事件进行匹配,进而操做下游数据压力。

那么这样的压力是必要的吗?从业务的角度看,不是必要的,由于对于product_id相同的记录,咱们只须要对左右两边最新的记录进行JOIN匹配就能够了。好比(P001, 48)到来了,业务上面只须要右流的(P001, 22)匹配就好,流入下游一条事件(P001, 48, 22)。 那么目前在Apache Flink上面如何作到这样的优化呢?

解决方案

上面的问题根本上咱们要构建一张有PK的动态表,这样按照业务PK进行更新处理,咱们能够在Source后面添加group by 操做生产一张有PK的动态表。以下:(以下DDL和LAST_VALUE 是Alibaba内部对Flink的增强,目前尚未推回社区,你们慢慢等待,你们想体验能够登陆阿里云流计算平台,详见

SQL:

CREATE TABLE inventory_tab(
   product_id VARCHAR,
   product_count BIGINT
  ) 

 CREATE TABLE sales_tab(
   product_id VARCHAR,
   sales_count BIGINT
  )
CREATE VIEW inventory_view AS
    SELECT 
    product_id,
    LAST_VALUE(product_count) AS product_count
    FROM inventory_tab
    GROUP BY product_id;

CREATE VIEW sales_view AS
    SELECT 
    product_id,
    LAST_VALUE(sales_count) AS sales_count
    FROM sales_tab
    GROUP BY product_id;

CREATE TABLE join_sink(
   product_id VARCHAR, 
   product_count BIGINT,
   sales_count BIGINT,
   PRIMARY KEY(product_id)
)WITH (
    type = 'print'
) ;

CREATE VIEW join_view AS
    SELECT
        l.product_id, 
        l.product_count,
        r.sales_count
    FROM inventory_view l 
        JOIN  sales_view r 
        ON l.product_id = r.product_id;

 INSERT INTO join_sink 
  SELECT 
      product_id, 
      product_count,
      sales_count
  FROM join_view ;

代码结构:

实现示意图:

如上方式能够将无PK的source通过一次节点变成有PK的动态表,以Apache Flink的retract机制和业务要素解决数据瓶颈,减小计算资源的消耗。

说明1: 上面方案LAST_VALUE是Alibaba内部对Flink的加强功能,社区尚未支持。

Apache Flink Sink

在Apache Flink上面能够根据实际外部存储的特色(是否支持PK),以及总体job的执行plan来动态推导Sink的执行模式,具体有以下三种类型:

  • Append 模式 - 该模式用户在定义Sink的DDL时候不定义PK,在Apache Flink内部生成的全部只有INSERT语句;
  • Upsert 模式 - 该模式用户在定义Sink的DDL时候能够定义PK,在Apache Flink内部会根据事件打标(retract机制)生成INSERT/UPDATE和DELETE 语句,其中若是定义了PK, UPDATE语句按PK进行更新,若是没有定义PK UPDATE会按整行更新;
  • Retract 模式 - 该模式下会产生INSERT和DELETE两种信息,Sink Connector 根据这两种信息构造对应的数据操做指令;

小结

本篇以MySQL为例介绍了传统数据库的静态查询和利用MySQL的Trigger+DML操做来模拟持续查询,并介绍了Apache Flink上面利用增量模式完成持续查询,并以双流JOIN为例说明了持续查询可能会遇到的问题,而且介绍Apache Flink觉得事件打标产生delete事件的方式解决持续查询的问题,进而保证语义的正确性,完美的在流计算上支持续查询。



本文做者:金竹

阅读原文

本文为云栖社区原创内容,未经容许不得转载。

相关文章
相关标签/搜索