Apache Flink 进阶入门(二):Time 深度解析

前言

Flink 的 API 大致上能够划分为三个层次:处于最底层的 ProcessFunction、中间一层的 DataStream API 和最上层的 SQL/Table API,这三层中的每一层都很是依赖于时间属性。时间属性是流处理中最重要的一个方面,是流处理系统的基石之一,贯穿这三层 API。在 DataStream API 这一层中由于封装方面的缘由,咱们可以接触到时间的地方不是不少,因此咱们将重点放在底层的 ProcessFunction 和最上层的 SQL/Table API。数组

Flink 时间语义

在不一样的应用场景中时间语义是各不相同的,Flink 做为一个先进的分布式流处理引擎,它自己支持不一样的时间语义。其核心是 Processing Time 和 Event Time(Row Time),这两类时间主要的不一样点以下表所示:缓存

Processing Time 是来模拟咱们真实世界的时间,其实就算是处理数据的节点本地时间也不必定就是完彻底全的咱们真实世界的时间,因此说它是用来模拟真实世界的时间。而 Event Time 是数据世界的时间,就是咱们要处理的数据流世界里面的时间。关于他们的获取方式,Process Time 是经过直接去调用本地机器的时间,而 Event Time 则是根据每一条处理记录所携带的时间戳来断定。网络

这两种时间在 Flink 内部的处理以及仍是用户的实际使用方面,难易程度都是不一样的。相对而言的 Processing Time 处理起来更加的简单,而 Event Time 要更麻烦一些。而在使用 Processing Time 的时候,咱们获得的处理结果(或者说流处理应用的内部状态)是不肯定的。而由于在 Flink 内部对 Event Time 作了各类保障,使用 Event Time 的状况下,不管重放数据多少次,都能获得一个相对肯定可重现的结果。分布式

所以在判断应该使用 Processing Time 仍是 Event Time 的时候,能够遵循一个原则:当你的应用遇到某些问题要从上一个 checkpoint 或者 savepoint 进行重放,是否是但愿结果彻底相同。若是但愿结果彻底相同,就只能用 Event Time;若是接受结果不一样,则能够用 Processing Time。Processing Time 的一个常见的用途是,咱们要根据现实时间来统计整个系统的吞吐,好比要计算现实时间一个小时处理了多少条数据,这种状况只能使用 Processing Time。性能

时间的特性

时间的一个重要特性是:时间只能递增,不会来回穿越。 在使用时间的时候咱们要充分利用这个特性。假设咱们有这么一些记录,而后咱们来分别看一下 Processing Time 还有 Event Time 对于时间的处理。优化

  • 对于 Processing Time,由于咱们是使用的是本地节点的时间(假设这个节点的时钟同步没有问题),咱们每一次取到的 Processing Time 确定都是递增的,递增就表明着有序,因此说咱们至关于拿到的是一个有序的数据流。
  • 而在用 Event Time 的时候由于时间是绑定在每一条的记录上的,因为网络延迟、程序内部逻辑、或者其余一些分布式系统的缘由,数据的时间可能会存在必定程度的乱序,好比上图的例子。在 Event Time 场景下,咱们把每个记录所包含的时间称做 Record Timestamp。若是 Record Timestamp 所获得的时间序列存在乱序,咱们就须要去处理这种状况。

若是单条数据之间是乱序,咱们就考虑对于整个序列进行更大程度的离散化。简单地讲,就是把数据按照必定的条数组成一些小批次,但这里的小批次并非攒够多少条就要去处理,而是为了对他们进行时间上的划分。通过这种更高层次的离散化以后,咱们会发现最右边方框里的时间就是必定会小于中间方框里的时间,中间框里的时间也必定会小于最左边方框里的时间。url

这个时候咱们在整个时间序列里插入一些相似于标志位的一些特殊的处理数据,这些特殊的处理数据叫作 watermark。一个 watermark 本质上就表明了这个 watermark 所包含的 timestamp 数值,表示之后到来的数据已经再也没有小于或等于这个时间的了。spa

Timestamp 和 Watermark 行为概览

接下来咱们重点看一下 Event Time 里的 Record Timestamp(简写成 timestamp)和 watermark 的一些基本信息。绝大多数的分布式流计算引擎对于数据都是进行了 DAG 图的抽象,它有本身的数据源,有处理算子,还有一些数据汇。数据在不一样的逻辑算子之间进行流动。watermark 和 timestamp 有本身的生命周期,接下来我会从 watermark 和 timestamp 的产生、他们在不一样的节点之间的传播、以及在每个节点上的处理,这三个方面来展开介绍。设计

Timestamp 分配和 Watermark 生成

Flink 支持两种 watermark 生成方式。第一种是在 SourceFunction 中产生,至关于把整个的 timestamp 分配和 watermark 生成的逻辑放在流处理应用的源头。咱们能够在 SourceFunction 里面经过这两个方法产生 watermark:3d

  • 经过 collectWithTimestamp 方法发送一条数据,其中第一个参数就是咱们要发送的数据,第二个参数就是这个数据所对应的时间戳;也能够调用 emitWatermark 方法去产生一条 watermark,表示接下来不会再有时间戳小于等于这个数值记录。
  • 另外,有时候咱们不想在 SourceFunction 里生成 timestamp 或者 watermark,或者说使用的 SourceFunction 自己不支持,咱们还能够在使用 DataStream API 的时候指定,调用的 DataStream.assignTimestampsAndWatermarks 这个方法,可以接收不一样的 timestamp 和 watermark 的生成器。

整体上而言生成器能够分为两类:第一类是按期生成器;第二类是根据一些在流处理数据流中遇到的一些特殊记录生成的。

二者的区别主要有三个方面,首先按期生成是现实时间驱动的,这里的“按期生成”主要是指 watermark(由于 timestamp 是每一条数据都须要有的),即按期会调用生成逻辑去产生一个 watermark。而根据特殊记录生成是数据驱动的,便是否生成 watermark 不是由现实时间来决定,而是当看到一些特殊的记录就表示接下来可能不会有符合条件的数据再发过来了,这个时候至关于每一次分配 Timestamp 以后都会调用用户实现的 watermark 生成方法,用户须要在生成方法中去实现 watermark 的生成逻辑。

你们要注意的是就是咱们在分配 timestamp 和生成 watermark 的过程,虽然在 SourceFunction 和 DataStream 中均可以指定,可是仍是建议生成的工做越靠近 DataSource 越好。这样会方便让程序逻辑里面更多的 operator 去判断某些数据是否乱序。Flink 内部提供了很好的机制去保证这些 timestamp 和 watermark 被正确地传递到下游的节点。

Watermark 传播

具体的传播策略基本上遵循这三点。

  • 首先,watermark 会以广播的形式在算子之间进行传播。好比说上游的算子,它链接了三个下游的任务,它会把本身当前的收到的 watermark 以广播的形式传到下游。
  • 第二,若是在程序里面收到了一个 Long.MAX_VALUE 这个数值的 watermark,就表示对应的那一条流的一个部分不会再有数据发过来了,它至关于就是一个终止的一个标志。
  • 第三,对于单流而言,这个策略比较好理解,而对于有多个输入的算子,watermark 的计算就有讲究了,一个原则是:单输入取其大,多输入取小。

举个例子,假设这边蓝色的块表明一个算子的一个任务,而后它有三个输入,分别是 W一、W二、W3,这三个输入能够理解成任何的输入,这三个输入多是属于同一个流,也多是属于不一样的流。而后在计算 watermark 的时候,对于单个输入而言是取他们的最大值,由于咱们都知道 watermark 应该遵循一个单调递增的一个原则。对于多输入,它要统计整个算子任务的 watermark 时,就会取这三个计算出来的 watermark 的最小值。即一个多个输入的任务,它的 watermark 受制于最慢的那条输入流。这一点相似于木桶效应,整个木桶中装的水会就是受制于最矮的那块板。

watermark 在传播的时候有一个特色是,它的传播是幂等的。屡次收到相同的 watermark,甚至收到以前的 watermark 都不会对最后的数值产生影响,由于对于单个输入永远是取最大的,而对于整个任务永远是取一个最小的。

同时咱们能够注意到这种设计其实有一个局限,具体体如今它没有区分你这个输入是一条流多个 partition 仍是来自于不一样的逻辑上的流的 JOIN。对于同一个流的不一样 partition,咱们对他作这种强制的时钟同步是没有问题的,由于一开始就是把一条流拆散成不一样的部分,但每个部分之间共享相同的时钟。可是若是算子的任务是在作相似于 JOIN 操做,那么要求你两个输入的时钟强制同步其实没有什么道理的,由于彻底有多是把一条离如今时间很近的数据流和一个离当前时间很远的数据流进行 JOIN,这个时候对于快的那条流,由于它要等慢的那条流,因此说它可能就要在状态中去缓存很是多的数据,这对于整个集群来讲是一个很大的性能开销。

ProcessFunction

在正式介绍 watermark 的处理以前,先简单介绍 ProcessFunction,由于 watermark 在任务里的处理逻辑分为内部逻辑和外部逻辑。外部逻辑其实就是经过 ProcessFunction 来体现的,若是你须要使用 Flink 提供的时间相关的 API 的话就只能写在 ProcessFunction 里。

ProcessFunction 和时间相关的功能主要有三点:

  • 第一点就是根据你当前系统使用的时间语义不一样,你能够去获取当前你正在处理这条记录的 Record Timestamp,或者当前的 Processing Time。
  • 第二点就是它能够获取当前算子的时间,能够把它理解成当前的 watermark。
  • 第三点就是为了在 ProcessFunction 中去实现一些相对复杂的功能,容许注册一些 timer(定时器)。好比说在 watermark 达到某一个时间点的时候就触发定时器,全部的这些回调逻辑也都是由用户来提供,涉及到以下三个方法,registerEventTimeTimer、registerProcessingTimeTimer 和 onTimer。在 onTimer 方法中就须要去实现本身的回调逻辑,当条件知足时回调逻辑就会被触发。

一个简单的应用是,咱们在作一些时间相关的处理的时候,可能须要缓存一部分数据,但这些数据不能一直去缓存下去,因此须要有一些过时的机制,咱们能够经过 timer 去设定这么一个时间,指定某一些数据可能在未来的某一个时间点过时,从而把它从状态里删除掉。全部的这些和时间相关的逻辑在 Flink 内部都是由本身的 Time Service(时间服务)完成的。

Watermark处理

一个算子的实例在收到 watermark 的时候,首先要更新当前的算子时间,这样的话在 ProcessFunction 里方法查询这个算子时间的时候,就能获取到最新的时间。第二步它会遍历计时器队列,这个计时器队列就是咱们刚刚说到的 timer,你能够同时注册不少 timer,Flink 会把这些 Timer 按照触发时间放到一个优先队列中。第三步 Flink 获得一个时间以后就会遍历计时器的队列,而后逐一触发用户的回调逻辑。 经过这种方式,Flink 的某一个任务就会将当前的 watermark 发送到下游的其余任务实例上,从而完成整个 watermark 的传播,从而造成一个闭环。

Table API 中的时间

下面咱们来看一看 Table/SQL API 中的时间。为了让时间参与到 Table/SQL 这一层的运算中,咱们须要提早把时间属性放到表的 schema 中,这样的话咱们才可以在 SQL 语句或者 Table 的一些逻辑表达式里面去使用这些时间去完成需求。

Table 中指定时间列

其实以前社区就怎么在 Table/SQL 中去使用时间这个问题作过必定的讨论,是把获取当前 Processing Time 的方法是做为一个特殊的 UDF,仍是把这一个列物化到整个的 schema 里面,最终采用了后者。咱们这里就分开来说一讲 Processing Time 和 Event Time 在使用的时候怎么在 Table 中指定。

对于 Processing Time,咱们知道要获得一个 Table 对象(或者注册一个 Table)有两种手段:

(1)能够从一个 DataStream 转化成一个 Table;

(2)直接经过 TableSource 去生成这么一个 Table;

对于第一种方法而言,咱们只须要在你已有的这些列中(例子中 f1 和 f2 就是两个已有的列),在最后用“列名.proctime”这种写法就能够把最后的这一列注册为一个 Processing Time,之后在写查询的时候就能够去直接使用这一列。若是 Table 是经过 TableSource 生成的,就能够经过实现这一个 DefinedRowtimeAttributes 接口,而后就会自动根据你提供的逻辑去生成对应的 Processing Time。

相对而言,在使用 Event Time 时则有一个限制,由于 Event Time 不像 Processing Time 那样是随拿随用。若是你要从 DataStream 去转化获得一个 Table,必需要提早保证原始的 DataStream 里面已经存在了 Record Timestamp 和 watermark。若是你想经过 TableSource 生成的,也必定要保证你要接入的一个数据里面存在一个类型为 long 或者 timestamp 的这么一个时间字段。

具体来讲,若是你要从 DataStream 去注册一个表,和 proctime 相似,你只须要加上“列名.rowtime”就能够。须要注意的是,若是你要用 Processing Time,必须保证你要新加的字段是整个 schema 中的最后一个字段,而 Event Time 的时候你其实能够去替换某一个已有的列,而后 Flink 会自动的把这一列转化成须要的 rowtime 这个类型。 若是是经过 TableSource 生成的,只须要实现 DefinedRowtimeAttributes 接口就能够了。须要说明的一点是,在 DataStream API 这一侧其实不支持同时存在多个 Event Time(rowtime),可是在 Table 这一层理论上能够同时存在多个 rowtime。由于 DefinedRowtimeAttributes 接口的返回值是一个对于 rowtime 描述的 List,即其实能够同时存在多个 rowtime 列,在未来可能会进行一些其余的改进,或者基于去作一些相应的优化。

时间列和Table操做

指定完了时间列以后,当咱们要真正去查询时就会涉及到一些具体的操做。这里我列举的这些操做都是和时间列紧密相关,或者说必须在这个时间列上才能进行的。好比说“Over 窗口聚合”和“Group by 窗口聚合”这两种窗口聚合,在写 SQL 提供参数的时候只能容许你在这个时间列上进行这种聚合。第三个就是时间窗口聚合,你在写条件的时候只支持对应的时间列。最后就是排序,咱们知道在一个无尽的数据流上对数据作排序几乎是不可能的事情,但由于这个数据自己到来的顺序已是按照时间属性来进行排序,因此说咱们若是要对一个 DataStream 转化成 Table 进行排序的话,你只能是按照时间列进行排序,固然同时你也能够指定一些其余的列,可是时间列这个是必须的,而且必须放在第一位。

为何说这些操做只能在时间列上进行?由于咱们有的时候能够把到来的数据流就当作是一张按照时间排列好的一张表,而咱们任何对于表的操做,其实都是必须在对它进行一次顺序扫描的前提下完成的。由于你们都知道数据流的特性之一就是一过性,某一条数据处理过去以后,未来其实不太好去访问它。固然由于 Flink 中内部提供了一些状态机制,咱们能够在必定程度上去弱化这个特性,可是最终仍是不能超越的限制状态不能太大。全部这些操做为何只能在时间列上进行,由于这个时间列可以保证咱们内部产生的状态不会无限的增加下去,这是一个最终的前提。

本文视频回顾讲解内容更生动易理解,查看视频请点击:https://ververica.cn/developers/flink-training-course2/



本文做者:崔星灿

阅读原文

本文为云栖社区原创内容,未经容许不得转载。

相关文章
相关标签/搜索