流计算中一个常见的需求就是为数据流补齐字段。由于数据采集端采集到的数据每每比较有限,在作数据分析以前,就要先将所需的维度信息补全。好比采集到的交易日志中只记录了商品 id,可是在作业务时须要根据店铺维度或者行业纬度进行聚合,这就须要先将交易日志与商品维表进行关联,补全所需的维度信息。这里所说的维表与数据仓库中的概念相似,是维度属性的集合,好比商品维,地点维,用户维等等。html
在流计算中,这是一个典型的 stream-to-table jon 的问题。本文主要讲解在 Flink SQL 中是如何解决这个问题的,用户如何简单上手使用这个功能。sql
因为维表是一张不断变化的表(静态表只是动态表的一种特例)。那如何 JOIN 一张不断变化的表呢?若是用传统的 JOIN 语法SELECT * FROM T JOIN dim_table on T.id = dim_table.id
来表达维表 JOIN,是不完整的。由于维表是一直在更新变化的,若是用这个语法那么关联上的是哪一个时刻的维表呢?咱们是不知道的,结果是不肯定的。因此 Flink SQL 的维表 JOIN 语法引入了 SQL:2011 Temporal Table 的标准语法,用来声明关联的是维表哪一个时刻的快照。维表 JOIN 语法/示例以下。数据库
假设咱们有一个 Orders 订单数据流,但愿根据产品 ID 补全流上的产品维度信息,因此须要跟 Products 维度表进行关联。Orders 和 Products 的 DDL 声明语句以下:apache
CREATE TABLE Orders ( orderId VARCHAR, -- 订单 id productId VARCHAR, -- 产品 id units INT, -- 购买数量 orderTime TIMESTAMP -- 下单时间 ) with ( type = 'tt', -- tt 日志流 ... ) CREATE TABLE Products ( productId VARCHAR, -- 产品 id name VARCHAR, -- 产品名称 unitPrice DOUBLE -- 单价 PERIOD FOR SYSTEM_TIME, -- 这是一张随系统时间而变化的表,用来声明维表 PRIMARY KEY (productId) -- 维表必须声明主键 ) with ( type = 'alihbase', -- HBase 数据源 ... )
如上声明了一张来自 TT 的 Orders 订单数据流,和一张存储于 HBase 中的 Products 产品维表。为了补齐订单流的产品信息,须要 JOIN 维表,这里能够分为 JOIN 当前表和 JOIN 历史表。缓存
SELECT * FROM Orders AS o [LEFT] JOIN Products FOR SYSTEM_TIME AS OF PROCTIME() AS p ON o.productId = p.productId
Flink SQL 支持 LEFT JOIN 和 INNER JOIN 的维表关联。如上语法所示的,维表 JOIN 语法与传统的 JOIN 语法并没有二异。只是 Products 维表后面须要跟上 FOR SYSTEM_TIME AS OF PROCTIME()
的关键字,其含义是每条到达的数据所关联上的是到达时刻的维表快照,也就是说,当数据到达时,咱们会根据数据上的 key 去查询远程数据库,拿到匹配的结果后关联输出。这里的 PROCTIME
即 processing time。使用 JOIN 当前维表功能须要注意的是,若是维表插入了一条数据能匹配上以前左表的数据时,JOIN的结果流,不会发出更新的数据以弥补以前的未匹配。JOIN行为只发生在处理时间(processing time),即便维表中的数据都被删了,以前JOIN流已经发出的关联上的数据也不会被撤回或改变。网络
SELECT * FROM Orders AS o [LEFT] JOIN Products FOR SYSTEM_TIME AS OF o.orderTime AS p ON o.productId = p.productId
有时候想关联上的维度数据,并非当前时刻的值,而是某个历史时刻的值。好比,产品的价格一直在发生变化,订单流但愿补全的是下单时的价格,而不是当前的价格,那就是 JOIN 历史维表。语法上只须要将上文的 PROCTIME()
改为 o.orderTime
便可。含义是关联上的是下单时刻的 Products 维表。并发
Flink 在获取维度数据时,会根据左流的时间去查对应时刻的快照数据。所以 JOIN 历史维表须要外部存储支持多版本存储,如 HBase,或者存储的数据中带有多版本信息。异步
注:JOIN 历史维表功能目前暂未开放ide
在实际使用的过程当中,会遇到许多性能问题。为了解决这些性能问题,咱们作了大量的优化,性能获得大幅提高,在双11的洪峰下表现特别淡定。性能
咱们的优化主要是为了解决两方面的问题:
1. 提升吞吐。维表的IO请求严重阻塞了数据流的计算处理。
2. 下降维表数据库读压力。如 HBase 也只能承受单机每秒 20 万的读请求。
我在 《Flink 原理与实现:Aysnc I/O》 中介绍了 Async IO 的使用场景和实现原理。原始的维表 JOIN 是同步访问的方式,来一条数据,去数据库查询一次,等待返回后输出关联结果。能够发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式能够并发地处理多个请求和回复,从而连续的请求之间不须要阻塞等待。
数据库的维表查询请求,有大量相同 key 的重复请求。如何减小重复请求?本地缓存是经常使用的方案。Flink SQL 目前提供两种缓存方案:LRU 和 ALL。(详见文档)
经过 cache='LRU'
参数能够开启 LRU 缓存优化,Blink 会为每一个 JoinTable 节点建立一个 LRU 本地缓存。当每一个数据进来的时候,先去缓存中查询,若是存在则直接关联输出,减小了一次 IO 请求。若是不存在,再发起数据库查询请求(异步或同步方式),请求返回的结果会先存入缓存中以备下次查询。
为了防止缓存无限制增加,因此使用的是 LRU 缓存,而且能够经过 cacheSize
调整缓存的大小。为了按期更新维表数据,能够经过 cacheTTLMs
调整缓存的失效时间。cacheTTLMs
是做用于每条缓存数据上的,也就是某条缓存数据在指定 timeout 时间内没有被访问,则会从缓存中移除。
Async 和 LRU-Cache 能极大提升吞吐率并下降数据库的读压力,可是仍然会有大量的 IO 请求存在,尤为是当 miss key(维表中不存在的 key)不少的时候。若是维表数据不大(一般百万级之内),那么其实能够将整个维表缓存到本地。那么 miss key 永远不会去请求数据库。由于本地缓存就是维表的镜像,缓存中不存在那么远程数据库中也不存在。
ALL cache 能够经过 cache='ALL'
参数开启,经过cacheTTLMs
控制缓存的刷新间隔。Flink SQL 会为 JoinTable 节点起一个异步线程去同步缓存。在 Job 刚启动时,会先阻塞主数据流,直到缓存数据加载完毕,保证主数据流流过期缓存就已经 ready。在以后的更新缓存的过程当中,不会阻塞主数据流。由于异步更新线程会将维表数据加载到临时缓存中,加载完毕后再与主缓存作原子替换。只有替换操做是加了锁的。
由于几乎没有 IO 操做,因此使用 cache ALL 的维表 JOIN 性能能够很是高。可是因为内存须要能同时容纳下两份维表拷贝,所以须要加大内存的配置。
在使用 LRU 缓存时,若是存在大量的 invalid key ,或者数据库中不存在的 key。因为命中不了缓存,致使缓存的收益较低,仍然会有大量请求打到数据库。所以咱们将未命中的 key 也加进了缓存,提升了未命中 key 和 invalid key 状况下的缓存命中率。
默认 JoinTable 节点与上游节点之间的数据是经过 shuffle 传输的。这在缓存大小有限、key总量大、热点不明显的状况下, 缓存的收益可能较低。这种状况下能够将上游节点与 JoinTable 节点的数据传输改为按 key 分区。这样一般能够缩小单个节点的 key 个数,提升缓存的命中率。好比一段时间内 JoinTable 节点总共须要处理100万个key, 节点并发100, 在数据不倾斜时单节点平均只需处理1万个key = 100万/100并发. 若是不作 key 分区, 单节点实际处理的key个数可能远大于1万。使用上也很是简单,在维表的 DDL 参数中加上partitionedJoin='true'
便可。
在使用维表 JOIN 时,若是维表数据不大,或者 miss key (维表中不存在的 key)很是多,则可使用 ALL cache,可是可能须要适当调大节点的内存,由于内存须要能同时容纳下两份维表拷贝。若是用不了 ALL cache,则可使用 Async + LRU 来提升节点的吞吐。
流计算中维表 JOIN 是一个很是常见的需求,遇到的挑战也很是多。好比超大维表问题,ALL cache 没法装下整个维表。将来咱们打算引入 Partitioned-ALL-cache,也就是上游数据到 JoinTable 节点根据 JOIN key 分区,那么每一个节点只须要加载属于该分区key的缓存数据,从而作到了缓存的水平扩展。从而遇到超大维表时能够经过扩并发也可以全量缓存下维表数据。另外,ALL cache 如今每一个节点是都会起一个线程去加载全量维表数据,若是有1000个节点,则会全量读数据库1000次。将来打算经过 Side Input功能作到只须要全量读取一次,维表数据会自动分发到各个节点。
另外,Async 极大地提升了吞吐,可是每一次 IO 请求只取了单 key 的数据,效率比较低。将来计划使用 Batch Get 来提升每次 IO 请求的吞吐。
目前在 Table API 上已经支持了 Multi-Join 的优化,能极大提升多维表连续 JOIN 时的性能,减小网络数据的传输开销。将来会在 SQL 上也支持 Multi-Join 的优化。