F1 Query: Declarative Querying at Scale

距离 Google 的上一篇 F1 论文,也就是 F1: A Distributed SQL Database That Scales 已经 5 年过去了,Google 在今年的 VLDB 上终于发布了 F1 的新版本 F1 Query: Declarative Querying at Scale,咱们今天就来看一下这篇论文。安利一下,在 PingCAP 的 paper party 上,黄东旭,主页连接大神对这篇论文的讲解很是精彩,文章中也部分引用了他的观点,在此鸣谢。算法

2013 年的 F1 是基于 Spanner,主要提供 OLTP 服务,而新的 F1 则定位则是大一统:旨在处理 OLTP/OLAP/ETL 等多种不一样的 workload。可是这篇新的 F1 论文对 OLTP 的讨论则是少之又少,据八卦是 Spanner 开始原生支持以前 F1 的部分功能,致使 F1 对 OLTP 的领地被吞并了。下面看一下论文的具体内容,疏漏之处欢迎指正。网络

0. 摘要

F1 Query 是一个大一统的 SQL 查询处理平台,能够处理存储在 Google 内部不一样存储介质(Bigtable, Spanner, Google Spreadsheet)上面的不一样格式文件。简单来讲,F1 Query 能够同时支持以下功能:OLTP 查询,低延迟 OLAP 查询,ETL 工做流。F1 Query 的特性包括:多线程

  • 为不一样数据源的数据提供统一视图
  • 利用数据中心的资源提供高吞吐和低延迟的查询
  • High Scalability
  • Extensibility

1. 背景

在 Google 内部的数据处理和分析的 use case 很是复杂,对不少方面都有不一样的要求,好比数据大小、延迟、数据源以及业务逻辑支持。结果致使了许多数据处理系统都只 focus 在一个点上,好比事务式查询、OLAP 查询、ETL 工做流。这些不一样的系统每每具备不一样的特性,不论是使用仍是开发上都会有极大的不便利。架构

F1 Query 就在这个背景下诞生了,用论文中的话说就是并发

we present F1 Query, an SQL query engine that is unique not because of its focus on doing one thing well, but instead because it aims to cover all corners of the requirements space for enterprise data processing and analysis.

F1 Query 旨在覆盖数据处理和分析的全部方面。F1 Query 在内部已经应用到了多个产品线,好比 Advertising, Shopping, Analytics 和 Payment。框架

在 F1 Query 的系统设计过程当中,下面几点考量具备很是关键的做用。机器学习

  • Data Fragmentation: Google 内部的数据因为自己的特性不一样,会被存储到不一样的存储系统中。这样会致使一个应用程序依赖的数据可能横跨多个数据存储系统中,甚至以不一样的文件格式。对于这个问题,F1 Query 对于这些数据提供一个统一的数据视图。
  • Datacenter Architecture: F1 Query 的目标是多数据中心,这个和传统的 shared nothing 架构的数据处理系统不一样相同。传统的模式为了下降延迟,每每须要考虑 locality,也就是数据和计算越近越好。因为 Google 内部的网络环境优点,locality 的优点显得不是那么重要。因此 F1 Query 更强调计算和存储分离,这样计算节点和存储节点的扩展性(scalability)都会更好。毕竟 Google 内部的系统,scalability 才是第一法则。还有一点值得一提的是,因为使用了 GFS 的更强版本: Colossue File System,磁盘不会成为瓶颈。
  • Scalability: 在 F1 Query 中,short query 会在单个节点上执行,larger query 会以分布式的模式执行,largest query 以批处理 MapReduce 模式执行。对于这些模式,F1 Query 能够经过增长运算的并行度来优化。
  • Extensibility: 对于那些没法用 SQL 语义来表达的查询需求,F1 经过提供 user-defined functions (UDF)、user-defined aggregate functions (UDAs) 和 table-valued functions (TVF) 来支持。

2. 架构

F1 的架构图以下所示:异步

下面的方框里面是每一个 Datacenter 一套。关于各个组件的介绍以下:分布式

  • 用户经过 client libary 和 F1 Server 交互
  • F1 Master 负责 query 的状态的运行时监控和其余组件的管理
  • F1 Server 收到用户请求,对于 short query 直接单机执行查询;对于 larger query 转发到多台 worker 上并行执行查询。最后再汇总结果返回给 client。
  • F1 Worker 负责具体查询执行
  • F1 Server 和 Worker 都是无状态的,方便扩展

2.1 query 执行

用户经过 client libary 提交 query 到 F1 Server 上,F1 Server 首先解析和分析 SQL,而后将涉及到的数据源提取出来,若是某些数据源在当前 datacenter 不存在,则直接将 query 返回给 client 并告知哪些 F1 Server 距离哪些数据源更近。这里直接将请求返回给业务层,由业务层去 retry,设计的也是很是的简单。尽管前面说到要将存储和计算分离,可是这个地方的设计仍是考虑到了 locality,datacenter 级别的 locality,毕竟 locality 对查询延迟的影响仍是巨大的。ide

F1 Server 将 query 解析并优化成 DAG,而后由执行层来执行,具体执行模式(interactive 仍是 batch)由用户指定。原文是: Based on a client- specified execution mode preference, F1 Query executes queries on F1 servers and workers in an interactive mode or in a batch mode.

对于交互式查询模式(interactive mode)有单节点集中执行模式和多节点分布式执行模式,query 优化会根据启发式的算法来决定采用哪一种模式。集中式下,F1 Server 解析分析 query,而后在当前节点上直接执行并接收查询结果。分布式下,接收 query 的 F1 Server 充当一个 query coordinator 的角色,将 query 拆解并下发给 worker。交互式查询在数据量不太大的状况下每每具备不错的性能和高效的资源利用率。

除了交互式查询还有一种模式是批处理模式(batch mode)。批处理模式使用 MapReduce 框架异步提交执行执行,相比交互式这种 long-running 方式,批处理模式的可靠性(reliabitly)更高。

2.2 数据源

数据查询支持跨 datacenter。存储计算分离模式使得多数据源的支持更加简单,好比 Spanner, Bigtable, CSV, columnar file 等。为了支持多数据源,F1 Query 在他们之上抽象出了一层,让数据看起来都是存储在关系型表里面。而各个数据源的元数据就存储在 catalog service 里面。

对于没有存储到 catalog service 里面的表数据,只要提供一个DEFINE TABLE便可查询。

DEFINE TABLE People(
      format = ‘csv’,
      path = ‘/path/to/peoplefile’,
      columns = ‘name:STRING,
                 DateOfBirth:DATE’);
    SELECT Name, DateOfBirth FROM People
    WHERE Name = ‘John Doe’;

论文中没有提到的是单看这个 DEFINE TABLE 能够表现力不够,所说这些信息并不足以表现出数据的行为:

  • 是否支持 partition?
  • 是否支持 逻辑下推?
  • 是否支持索引?
  • 是否支持多种 扫描模式?
  • 对于新数据源的支持能够经过 Table-Valued Function (TVF) 的方式来支持。

2.3 Data Sink

query 的结果能够直接返回给 client,也能够插入到另一个表里面。

2.4 SQL

SQL 2011。之因此是 2011 是由于其余老的系统使用的是 2011。

3. 交互式查询

交互式查询模式是默认的查询模式。如前所述,交互式查询有集中式和分布式,具体使用哪一种由优化器分析 client 的 query 而后决定。

3.1 Single Threaded Execution Kernel

集中式的查询以下图所示,是一种 pull-based 的单线程执行方式。

3.2 Distributed Execution

如前所述,由优化器分析完 query 决定是否采用分布式模式。在分布式这种模式下接收到 query 的 F1 Server 充当一个 coordinator 的角色,将执行 plan 推给 worker。worker 是多线程的,能够并发执行单个 query 的无依赖的 fragment。Fragment 是执行计划切分出来的执行计划片断,很是像 MR 或者 Spark 中的 stage。Fragment 之间经过 Exchange Operator (数据重分布) 链接。

Fragment 的切分过程以下:优化器使用一种基于数据分布依赖的 bottom-up 策略。具体来讲每一个算子对于输入数据的分布都有要求,好比 hash 或者依赖其余字段的分布。典型的例子有 group by key 和 hash join。若是当前的数据分布知足先后两个算子的要求,则两个算子就被放到一个 Fragment 里面,不然就被分到两个 Fragment 里面,而后经过 Exchange Operator 来链接。

下一步就是计算每一个 Fragment 的并行度,Fragment 之间并行度互相独立。叶子节点的 Fragment 的底层 table scan 决定最初的并行度,而后上层经过 width calculator 逐步计算。好比 hash-join 的底层两个 Fragment 分别是 100-worker 和 50-worker,则 hash-join 这个 Fragment 会使用 100-worker 的并行度。下面是一个具体的例子。

SELECT Clicks.Region, COUNT(*) ClickCount
  FROM Ads JOIN Clicks USING (AdId)
  WHERE Ads.StartDate > ‘2018-05-14’ AND
        Clicks.OS = ‘Chrome OS’
  GROUP BY Clicks.Region
  ORDER BY ClickCount DESC;

上面 SQL 对应的 Fragment 和一种可能 worker 并行度以下图所示:

3.3 Partitioning Strategy

数据重分布也就是 Fragment 之间的 Exchange Operator,对于每条数据,数据发送者经过分区函数来计算数据的目的分区数,每一个分区数对应一个 worker。Exchange Operator 经过 RPC 调用,扩展能够支持到每一个 Fragment 千级的 partion 并发。要求再高就须要使用 batch mode。

查询优化器将 scan 操做做为执行计划的叶子节点和 N 个 worker 节点并发。为了并发执行 scan 操做,数据必需要被并发分布,而后由全部 worker 一块儿产生输出结果。有时候数据的 partition 会超过 N,而 scan 并发度为 N,多余的 partition 就交由空闲的 worker 去处理,这样能够避免数据倾斜。

3.4 Performance Considerations

F1 Query 的主要性能问题在于数据倾斜和访问模式不佳。Hash join 对于 hot key 尤其敏感。当 hot key 被 worker 载入到内存的时候可能会由于数据量太大而写入磁盘,从而致使性能降低。

论文中举了一个 lookup join 的例子,这里不打算详述了。

对于这种数据倾斜的问题,F1 Query 的解决方案是 Dynamic Key Range,可是论文中对其描述仍是不够详细。

F1 Query 对于交互式查询采用存内存计算,并且没有 check point。由于是内存计算,因此速度很是的快,可是因为没有 checkpoint 等 failover 的机制,只能依赖于业务层的重试。

4. 批处理

像 ETL,都是经过 Batch Mode 来处理的。Google 之前都是经过 MapReduce 或者 FlumeJava 来开发的,开发成本通常比较高。相比 SQL 这种方式,不能有效复用 SQL 优化,因此 F1 Query 选择使用 SQL 来作。

如前所述,交互式查询不适合处理 worker failure,而 batch mode,也就是批处理这种模式特别适合处理 failover(每个 stage 结果落盘)。批处理模式复用交互式 SQL query 的一些特性,好比 query 优化,执行计划生成。交互式模式和批处理模式的核心区别在于调度方式不一样:交互式模式是同步的,而批处理模式是异步的。

4.1 Batch Execution Framework

批处理使用的框架是 MapReduce,Fragment 被抽象成 MapReduce 中的 stage,stage 的输出结果被存储到 Colossus file system (GFS 二代)。

在 Fragment 映射有一点值得注意的是严格来讲,Fragment 的 DAG 映射到 mr 是 map-reduce-reduce,对这种模式作一个简单的变通变成:map-reduce-map<identity>-reduce,以下图:

关于 MapReduce 的更详细信息能够参考 Google 03 年那篇论文。

4.2 Batch Service Framework

Framework 会对 batch mode query 的执行进行编排。具体包括:query 注册,query 分发,调度已经监控 mr 做业的执行。当 F1 Server 接收到一个 batch mode query,它会先生成执行计划并将 query 注册到 Query Registry,全局惟一的 Spanner db,用来 track batch mode query。Query Distributor 而后将 query 分发给 datacenter。Query Scheduler 会按期从 Registry 拿到 query,而后生成执行计划并交给 Query Executor 来处理。

Service Framework 的健壮性很是好:Query Distributor 是选主(master-elect)模式;Query Scheduler 在每一个 datacenter 有多个。query 的全部执行状态都是保存在 Query Registry,这就保证其余的组件是无状态的。容错处理:MapReduce 的 stage 会被重试,若是 datacenter 出问题,query 会被分配到新的 datacenter 上从新执行。

5. 优化器

SQL 优化器相似 Spark Catalyst,架构以下图,不细说了。

6. EXTENSIBILITY

对于不少复杂业务逻辑没法用 SQL 来描述,F1 针对这种状况提供了一种用户自定义函数的方法,包括 UDF (user-define functions),UDA (aggrega- tion functions) 和 TVF (table-valued functions)。对于简单的UDF需求,一般直接以SQL或者LUA的形式做为query的一部分;对于更复杂或者性能要求高的UDF需求,则能够用其它高级语言以UDF Server的形式实现。

UDF Server 和 F1 Query 是 RPC 调用关系,有 client 单独部署在同一个 datacenter。udf server 彻底有 client 来控制,无状态,基本能够无限扩展。

6.1 Scalar Functions

UDF 并非新的概念,UDF Server 这种部署方式看上去还算新颖一点。可是 UDF Server 这种单独部署模式一个可能的问题是延迟问题,这里经过批量流水线的方式来减小延迟。下面是 UDF 的一个例子。

local function string2unixtime(value)
  local y,m,d = match("(%d+)%-(%d+)%-(%d+)")
  return os.time({year=y, month=m, day=d})
end

6.2 Aggregate Functions

UDA 是对多行输入产生一个单一的输出,要实现 UDA,用户须要实现算子 Initialize, Accumulate, and Finalize。另外如要要对多个 UDA 的子聚合结果进行再聚合,用户能够实现 Reaccumulate。

6.3 Table-Valued Functions

TVF 的输入是一个 table,输出是另一个 table。这种在机器学习的模型训练场景下比较有用。下面是论文中的具体的一个例子:EventsFromPastDays 就是一个 TVF。

SELECT * FROM EventsFromPastDays(
     3, TABLE Clicks);

固然 TVF 也支持用 SQL 来描述,以下。

CREATE TABLE FUNCTION EventsFromPastDays(
     num_days INT64, events ANY TABLE) AS
     SELECT * FROM events
     WHERE date >= DATE_SUB(
         CURRENT_DATE(),
         INTERVAL num_days DAY);

7. Production Metric

下面是 F1 Query 在 Production 环境下的几个 metrics。

8. 总结

回过头来看 F1 Query 最新的这篇论文给人最大的启发就是大一统的思想,这个颇有多是行业发展趋势。回想一下 MapReduce 论文由 Google 于 2003 年发表,开源实现 Hadoop 于 2005 问世。不妨期待了一下将来的 3 到 5 年的 F1 Query 的开源产品。

做者介绍:陶克路,花名敌珐,阿里巴巴技术专家。Apache Pulsar 等开源软件 Contrijiesbutor。技术领域包括大数据和云原生技术栈,目前致力于构建大数据领域业界领先的 APM 产品。


本文做者:陶克路

阅读原文

本文为阿里云内容,未经容许不得转载。

相关文章
相关标签/搜索