有状态的计算做为容错以及数据一致性的保证,是当今实时计算必不可少的特性之一,流行的实时计算引擎包括 Google Dataflow、Flink、Spark (Structure) Streaming、Kafka Streams 都分别提供对内置 State 的支持。State 的引入使得实时应用能够不依赖外部数据库来存储元数据及中间数据,部分状况下甚至能够直接用 State 存储结果数据,这让业界不由思考: State 和 Database 是何种关系?有没有可能用 State 来代替数据库呢?数据库
在这个课题上,Flink 社区是比较早就开始探索的。整体来讲,Flink 社区的努力能够分为两条线: 一是在做业运行时经过做业查询接口访问 State 的能力,即 QueryableState;二是经过 State 的离线 dump 文件(Savepoint)来离线查询和修改 State 的能力,即即将引入的 Savepoint Processor API。网络
在 2017 年发布的 Flink 1.2 版本,Flink 引入了 QueryableState 的特性以容许用户经过特定的 client 查询做业 State 的内容 [1],这意味着 Flink 应用能够在彻底不依赖 State 存储介质之外的外部存储的状况下提供实时访问计算结果的能力。数据结构
只经过 Queryable State 提供实时数据访问架构
然而,QueryableState 虽然设想上比较理想化,但因为依赖底层架构的改动较多且功能也比较受限,它一直处于 Beta 版本并不能用于生产环境。针对这个问题,在前段时间腾讯的工程师杨华提出 QueryableState 的改进计划 [2]。在邮件列表中,社区就 QueryableState 是否能够用于代替数据库做了讨论并出现了不一样的观点。笔者结合我的看法将 State as Database 的主要优缺点整理以下。并发
优势:运维
缺点:函数
整体来讲,目前 State 代替数据库的缺点仍是远多于其优势,不过对于某些对数据可用性要求不高的做业来讲,使用 State 做为数据库仍是彻底合理的。因为定位上的不一样,Flink State 在短期内很难看到能够彻底替代数据库的可能性,但在数据访问特性上 State 往数据库方向发展是无需质疑的。优化
Savepoint Processor API 是社区最近提出的一个新特性(见 FLIP-42 [3]),用于离线对 State 的 dump 文件 Savepoint 进行分析、修改或者直接根据数据构建出一个初始的 Savepoint。Savepoint Processor API 属于 Flink State Evolution 的 State Management。若是说 QueryableState 是 DSL 的话,Flink State Evolution 就是 DML,而 Savepoint Processor API 就是 DML 中最为重要的部分。spa
Savepoint Processor API 的前身是第三方的 Bravo 项目 [4],主要思路提供 Savepoint 和 DataSet 相互转换的能力,典型应用是 Savepoint 读取成 DataSet,在 DataSet 上进行修改,而后再写为一个新的 Savepoint。这适合用于如下的场景:blog
修改 Savepoint,好比:
Savepoint 做为 State 的 dump 文件,经过 Savepoint Processor API 能够暴露数据查询和修改功能,相似于一个离线的数据库,但 State 的概念和典型关系型数据的概念仍是有不少不一样,FLIP-43 也对这些差别进行了类比和总结。
首先 Savepoint 是多个 operator 的 state 的物理存储集合,不一样 operator 的 state 是独立的,这相似于数据库下不一样 namespace 之间的 table。咱们能够获得 Savepoint 对应数据库,单个 operator 对应 Namespace。
Database
Savepoint
Namespace
Uid
Table
State
但就 table 而言,其在 Savepoint 里对应的概念根据 State 类型的不一样而有所差异。State 有 Operator State、Keyed State 和 Broadcast State 三种,其中 Operator State 和 Broadcast State 属于 non-partitioned state,即没有按 key 分区的 state,而相反地 Keyed State 则属于 partitioned state。对于 non-partitioned state 来讲,state 是一个 table,state 的每一个元素便是 table 里的一行;而对于 partitioned state 来讲,同一个 operator 下的全部 state 对应一个 table。这个 table 像是 HBase 同样有个 row key,而后每一个具体的 state 对应 table 里的一个 column。
举个例子,假设有一个游戏玩家得分和在线时长的数据流,咱们须要用 Keyed State 来记录玩家所在组的分数和游戏时长,用 Operator State 记录玩家的总得分和总时长。
在一段时间内数据流的输入以下:
user_id
user_name
user_group
score
1001
Paul
A
5,000
1002
Charlotte
A
3,600
1003
Kate
C
2,000
1004
Robert
B
3,900
user_id
user_name
user_group
time
1001
Paul
A
1,800
1002
Charlotte
A
1,200
1003
Kate
C
600
1004
Robert
B
2,000
用 Keyed State ,咱们分别注册 group_score 和 group_time 两个 MapState 表示组总得分和组总时长,并根据 user_group keyby 数据流以后将两个指标的累积值更新到 State 里,获得的表以下:
user_group
group_score
group_time
A
8,600
3,000
C
2,00
600
B
3,900
2,000
相对地,假如用 Operator State 来记录总得分和总时长(并行度设为 1),咱们注册 total_score 和 total_time 两个 State,获得的表有两个:
total_score |
------- |
14,500 |
total_time
5,600
至此 Savepoint 和 Database 的对应关系应该是比较清晰明了的。而对于 Savepoint 来讲还有不一样的 StateBackend 来决定 State 具体如何持续化,这显然对应的是数据库的存储引擎。在 MySQL 中,咱们能够经过简单的一行命令 ALTER TABLE xxx ENGINE = InnoDB; 来改变存储引擎,在背后 MySQL 会自动完成繁琐的格式转换工做。而对于 Savepoint 来讲,因为 StateBackend 各自的存储格式不兼容,目前尚不能方便地切换 StateBackend。为此,社区在不久前建立 FLIP-41 [5] 来进一步完善 Savepoint 的可操做性。
State as Database 是实时计算发展的大趋势,它并非要代替数据库的使用,而是借鉴数据库领域的经验拓展 State 接口使其操做方式更接近咱们熟悉的数据库。对于 Flink 而言,State 的外部使用能够分为在线的实时访问和离线的访问和修改,分别将由 Queryable State 和 Savepoint Processor API 两个特性支持。
本文为云栖社区原创内容,未经容许不得转载。