Apache Flink 零基础入门(三):DataStream API 编程

做者:崔星灿
整理:高赟算法

前面已经为你们介绍了 Flink 的基本概念以及安装部署的过程,从而但愿可以帮助读者创建起对 Flink 的初步印象。本次课程开始,咱们将进入第二部分,即 Flink 实际开发的相关内容。本次课程将首先介绍 Flink 开发中比较核心的 DataStream API 。咱们首先将回顾分布式流处理的一些基本概念,这些概念对于理解实际的 DataStream API 有很是大的做用。而后,咱们将详细介绍 DataStream API 的设计,最后咱们将经过一个例子来演示 DataStream API 的使用。数据库

1. 流处理基本概念

对于什么是流处理,从不一样的角度有不一样的定义。其实流处理与批处理这两个概念是对立统一的,它们的关系有点相似于对于 Java 中的 ArrayList 中的元素,是直接看做一个有限数据集并用下标去访问,仍是用迭代器去访问。编程

_04

图1. 左图硬币分类器。硬币分类器也能够看做一个流处理系统,用于硬币分类的各部分组件提早串联在一块儿,硬币不断进入系统,并最终被输出到不一样的队列中供后续使用。右图同理。

流处理系统自己有不少本身的特色。通常来讲,因为须要支持无限数据集的处理,流处理系统通常采用一种数据驱动的处理方式。它会提早设置一些算子,而后等到数据到达后对数据进行处理。为了表达复杂的计算逻辑,包括 Flink 在内的分布式流处理引擎通常采用 DAG 图来表示整个计算逻辑,其中 DAG 图中的每个点就表明一个基本的逻辑单元,也就是前面说的算子。因为计算逻辑被组织成有向图,数据会按照边的方向,从一些特殊的 Source 节点流入系统,而后经过网络传输、本地传输等不一样的数据传输方式在算子之间进行发送和处理,最后会经过另一些特殊的 Sink 节点将计算结果发送到某个外部系统或数据库中。网络

_05

图2. 一个 DAG 计算逻辑图与实际的物理时模型。逻辑图中的每一个算子在物理图中可能有多个并发。

对于实际的分布式流处理引擎,它们的实际运行时物理模型要更复杂一些,这是因为每一个算子均可能有多个实例。如图 2 所示,做为 Source 的 A 算子有两个实例,中间算子 C 也有两个实例。在逻辑模型中,A 和 B 是 C 的上游节点,而在对应的物理逻辑中,C 的全部实例和 A、B 的全部实例之间可能都存在数据交换。在物理模型中,咱们会根据计算逻辑,采用系统自动优化或人为指定的方式将计算工做分布到不一样的实例中。只有当算子实例分布到不一样进程上时,才须要经过网络进行数据传输,而同一进程中的多个实例之间的数据传输一般是不须要经过网络的。并发

表1. Apache Storm 构造 DAG 计算图。Apache Storm 的接口定义更加“面向操做”,所以更加底层。
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
表2. Apache Flink 构造 DAG 计算图。Apache Flink 的接口定义更加“面向数据”,所以更加高层。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile ("input");
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
counts.writeAsText("output");

因为流处理的计算逻辑是经过 DAG 图来表示的,所以它们的大部分 API 都是围绕构建这种计算逻辑图来设计的。例如,对于几年前很是流行的 Apache Storm,它的 Word Count 的示例如表 1 所示。基于 Apache Storm 用户须要在图中添加 Spout 或 Bolt 这种算子,并指定算子以前的链接方式。这样,在完成整个图的构建以后,就能够将图提交到远程或本地集群运行。框架

与之对比,Apache Flink 的接口虽然也是在构建计算逻辑图,可是 Flink 的 API 定义更加面向数据自己的处理逻辑,它把数据流抽象成为一个无限集,而后定义了一组集合上的操做,而后在底层自动构建相应的 DAG 图。能够看出,Flink 的 API 要更“上层”一些。许多研究者在进行实验时,可能会更喜欢自由度高的 Storm,由于它更容易保证明现预想的图结构;而在工业界则更喜欢 Flink 这类高级 API,由于它使用更加简单。dom

2. Flink DataStream API 概览

基于前面对流处理的基本概念,本节将详细介绍 Flink DataStream API 的使用方式。咱们首先仍是从一个简单的例子开始看起。表3是一个流式 Word Count 的示例,虽然它只有 5 行代码,可是它给出了基于 Flink DataStream API 开发程序的基本结构。分布式

表3. 基于 Flink DataStream API 的 Word Count 示例.
//一、设置运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//二、配置数据源读取数据
DataStream<String> text = env.readTextFile ("input");
//三、进行一系列转换
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
//四、配置数据汇写出数据
counts.writeAsText("output");
//五、提交执行
env.execute("Streaming WordCount");

为了实现流式 Word Count,咱们首先要先得到一个 StreamExecutionEnvironment 对象。它是咱们构建图过程当中的上下文对象。基于这个对象,咱们能够添加一些算子。对于流处理程度,咱们通常须要首先建立一个数据源去接入数据。在这个例子中,咱们使用了 Environment 对象中内置的读取文件的数据源。这一步以后,咱们拿到的是一个 DataStream 对象,它能够看做一个无限的数据集,能够在该集合上进行一序列的操做。例如,在 Word Count 例子中,咱们首先将每一条记录(即文件中的一行)分隔为单词,这是经过 FlatMap 操做来实现的。调用 FlatMap 将会在底层的 DAG 图中添加一个 FlatMap 算子。而后,咱们获得了一个记录是单词的流。咱们将流中的单词进行分组(keyBy),而后累积计算每个单词的数据(sum(1))。计算出的单词的数据组成了一个新的流,咱们将它写入到输出文件中。ide

最后,咱们须要调用 env#execute 方法来开始程序的执行。须要强调的是,前面咱们调用的全部方法,都不是在实际处理数据,而是在构通表达计算逻辑的 DAG 图。只有当咱们将整个图构建完成并显式的调用 Execute 方法后,框架才会把计算图提供到集群中,接入数据并执行实际的逻辑。优化

基于流式 Word Count 的例子能够看出,基于 Flink 的 DataStream API 来编写流处理程序通常须要三步:经过 Source 接入数据、进行一系统列的处理以及将数据写出。最后,不要忘记显式调用 Execute 方式,不然前面编写的逻辑并不会真正执行。

_09

图3. Flink DataStream 操做概览

从上面的例子中还能够看出,Flink DataStream API 的核心,就是表明流数据的 DataStream 对象。整个计算逻辑图的构建就是围绕调用 DataStream 对象上的不一样操做产生新的 DataStream 对象展开的。总体来讲,DataStream 上的操做能够分为四类。第一类是对于单条记录的操做,好比筛除掉不符合要求的记录(Filter 操做),或者将每条记录都作一个转换(Map 操做)。第二类是对多条记录的操做。好比说统计一个小时内的订单总成交量,就须要将一个小时内的全部订单记录的成交量加到一块儿。为了支持这种类型的操做,就得经过 Window 将须要的记录关联到一块儿进行处理。第三类是对多个流进行操做并转换为单个流。例如,多个流能够经过 Union、Join 或 Connect 等操做合到一块儿。这些操做合并的逻辑不一样,可是它们最终都会产生了一个新的统一的流,从而能够进行一些跨流的操做。最后, DataStream 还支持与合并对称的操做,即把一个流按必定规则拆分为多个流(Split 操做),每一个流是以前流的一个子集,这样咱们就能够对不一样的流做不一样的处理。

_10

图4. 不一样类型的 DataStream 子类型。不一样的子类型支持不一样的操做集合。

为了支持这些不一样的流操做,Flink 引入了一组不一样的流类型,用来表示某些操做的中间流数据集类型。完整的类型转换关系如图4所示。首先,对于一些针对单条记录的操做,如 Map 等,操做的结果仍然是是基本的 DataStream 类型。而后,对于 Split 操做,它会首先产生一个 SplitStream,基于 SplitStream 可使用 Select 方法来筛选出符合要求的记录并再将获得一个基本的流。

相似的,对于 Connect 操做,在调用 streamA.connect(streamB)后能够获得一个专门的 ConnectedStream。ConnectedStream 支持的操做与普通的 DataStream 有所区别,因为它表明两个不一样的流混合的结果,所以它容许用户对两个流中的记录分别指定不一样的处理逻辑,而后它们的处理结果造成一个新的 DataStream 流。因为不一样记录的处理是在同一个算子中进行的,所以它们在处理时能够方便的共享一些状态信息。上层的一些 Join 操做,在底层也是须要依赖于 Connect 操做来实现的。

另外,如前所述,咱们能够经过 Window 操做对流能够按时间或者个数进行一些切分,从而将流切分红一个个较小的分组。具体的切分逻辑能够由用户进行选择。当一个分组中全部记录都到达后,用户能够拿到该分组中的全部记录,从而能够进行一些遍历或者累加操做。这样,对每一个分组的处理均可以获得一组输出数据,这些输出数据造成了一个新的基本流。

对于普通的 DataStream,咱们必须使用 allWindow 操做,它表明对整个流进行统一的 Window 处理,所以是不能使用多个算子实例进行同时计算的。针对这一问题,就须要咱们首先使用 KeyBy 方法对记录按 Key 进行分组,而后才能够并行的对不一样 Key 对应的记录进行单独的 Window 操做。KeyBy 操做是咱们平常编程中最重要的操做之一,下面咱们会更详细的介绍。

_11

图5. 基本流上的 Window 操做与 KeyedStream 上的 Window 操对比。KeyedStream 上的 Window 操做使采用多个实例并发处理成为了可能。

基本 DataStream 对象上的 allWindow 与 KeyedStream 上的 Window 操做的对好比图5所示。为了可以在多个并发实例上并行的对数据进行处理,咱们须要经过 KeyBy 将数据进行分组。KeyBy 和 Window 操做都是对数据进行分组,可是 KeyBy 是在水平分向对流进行切分,而 Window 是在垂直方式对流进行切分。

使用 KeyBy 进行数据切分以后,后续算子的每个实例能够只处理特定 Key 集合对应的数据。除了处理自己外,Flink 中容许算子维护一部分状态(State),在KeyedStream 算子的状态也是能够分布式存储的。因为 KeyBy 是一种肯定的数据分配方式(下文将介绍其它分配方式),所以即便发生 Failover 做业重启,甚至发生了并发度的改变,Flink 均可以从新分配 Key 分组并保证处理某个 Key 的分组必定包含该 Key 的状态,从而保证一致性。

最后须要强调的是,KeyBy 操做只有当 Key 的数量超过算子的并发实例数才能够较好的工做。因为同一个 Key 对应的全部数据都会发送到同一个实例上,所以若是Key 的数量比实例数量少时,就会致使部分实例收不到数据,从而致使计算能力不能充分发挥。

3. 其它问题

除 KeyBy 以外,Flink 在算子以前交换数据时还支持其它的物理分组方式。如图 1 所示,Flink DataStream 中物理分组方式包括:

  • Global: 上游算子将全部记录发送给下游算子的第一个实例。
  • Broadcast: 上游算子将每一条记录发送给下游算子的全部实例。
  • Forward:只适用于上游算子实例数与下游算子相同时,每一个上游算子实例将记录发送给下游算子对应的实例。
  • Shuffle:上游算子对每条记录随机选择一个下游算子进行发送。
  • Rebalance:上游算子经过轮询的方式发送数据。
  • Rescale:当上游和下游算子的实例数为 n 或 m 时,若是 n < m,则每一个上游实例向ceil(m/n)或floor(m/n)个下游实例轮询发送数据;若是 n > m,则 floor(n/m) 或 ceil(n/m) 个上游实例向下游实例轮询发送数据。
  • PartitionCustomer:当上述内置分配方式不知足需求时,用户还能够选择自定义分组方式。

_13

图6. 除keyBy外其它的物理分组方式。

除分组方式外,Flink DataStream API 中另外一个重要概念就是类型系统。图 7 所示,Flink DataStream 对像都是强类型的,每个 DataStream 对象都须要指定元素的类型,Flink 本身底层的序列化机制正是依赖于这些信息对序列化等进行优化。具体来讲,在 Flink 底层,它是使用 TypeInformation 对象对类型进行描述的,TypeInformation 对象定义了一组类型相关的信息供序列化框架使用。

_14

图7. Flink DataStream API 中的类型系统

Flink 内置了一部分经常使用的基本类型,对于这些类型,Flink 也内置了它们的TypeInformation,用户通常能够直接使用而不须要额外的声明,Flink 本身能够经过类型推断机制识别出相应的类型。可是也会有一些例外的状况,好比,Flink DataStream API 同时支持 Java 和 Scala,Scala API 许多接口是经过隐式的参数来传递类型信息的,因此若是须要经过 Java 调用 Scala 的 API,则须要把这些类型信息经过隐式参数传递过去。另外一个例子是 Java 中对泛型存在类型擦除,若是流的类型自己是一个泛型的话,则可能在擦除以后没法推断出类型信息,这时候也须要显式的指定。

在 Flink 中,通常 Java 接口采用 Tuple 类型来组合多个字段,而 Scala 则更常用 Row 类型或 Case Class。相对于 Row,Tuple 类型存在两个问题,一个是字段个数不能超过 25 个,此外,全部字段不容许有 null 值。最后,Flink 也支持用户自定义新的类型和 TypeInformation,并经过 Kryo 来实现序列化,可是这种方式可带来一些迁移等方面的问题,因此尽可能不要使用自定义的类型。

4.示例

而后,咱们再看一个更复杂的例子。假设咱们有一个数据源,它监控系统中订单的状况,当有新订单时,它使用 Tuple2<String, Integer> 输出订单中商品的类型和交易额。而后,咱们但愿实时统计每一个类别的交易额,以及实时统计所有类别的交易额。

表4. 实时订单统计示例。

public class GroupedProcessingTimeWindowSample {
    private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {
        private volatile boolean isRunning = true;

        @Override
        public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
            Random random = new Random();
            while (isRunning) {
                Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5);
                String key = "类别" + (char) ('A' + random.nextInt(3));
                int value = random.nextInt(10) + 1;

                System.out.println(String.format("Emits\t(%s, %d)", key, value));
                ctx.collect(new Tuple2<>(key, value));
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStream<Tuple2<String, Integer>> ds = env.addSource(new DataSource());
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = ds.keyBy(0);

        keyedStream.sum(1).keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
            @Override
            public Object getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return "";
            }
        }).fold(new HashMap<String, Integer>(), new FoldFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
            @Override
            public HashMap<String, Integer> fold(HashMap<String, Integer> accumulator, Tuple2<String, Integer> value) throws Exception {
                accumulator.put(value.f0, value.f1);
                return accumulator;
            }
        }).addSink(new SinkFunction<HashMap<String, Integer>>() {
            @Override
            public void invoke(HashMap<String, Integer> value, Context context) throws Exception {
                  // 每一个类型的商品成交量
                  System.out.println(value);
                  // 商品成交总量                
                  System.out.println(value.values().stream().mapToInt(v -> v).sum());
            }
        });

        env.execute();
    }
}

示例的实现如表4所示。首先,在该实现中,咱们首先实现了一个模拟的数据源,它继承自 RichParallelSourceFunction,它是能够有多个实例的 SourceFunction 的接口。它有两个方法须要实现,一个是 Run 方法,Flink 在运行时对 Source 会直接调用该方法,该方法须要不断的输出数据,从而造成初始的流。在 Run 方法的实现中,咱们随机的产生商品类别和交易量的记录,而后经过 ctx#collect 方法进行发送。另外一个方法是 Cancel 方法,当 Flink 须要 Cancel Source Task 的时候会调用该方法,咱们使用一个 Volatile 类型的变量来标记和控制执行的状态。

而后,咱们在 Main 方法中就能够开始图的构建。咱们首先建立了一个 StreamExecutioniEnviroment 对象。建立对象调用的 getExecutionEnvironment 方法会自动判断所处的环境,从而建立合适的对象。例如,若是咱们在 IDE 中直接右键运行,则会建立 LocalStreamExecutionEnvironment 对象;若是是在一个实际的环境中,则会建立 RemoteStreamExecutionEnvironment 对象。

基于 Environment 对象,咱们首先建立了一个 Source,从而获得初始的<商品类型,成交量>流。而后,为了统计每种类别的成交量,咱们使用 KeyBy 按 Tuple 的第 1 个字段(即商品类型)对输入流进行分组,并对每个 Key 对应的记录的第 2 个字段(即成交量)进行求合。在底层,Sum 算子内部会使用 State 来维护每一个Key(即商品类型)对应的成交量之和。当有新记录到达时,Sum 算子内部会更新所维护的成交量之和,并输出一条<商品类型,更新后的成交量>记录。

若是只统计各个类型的成交量,则程序能够到此为止,咱们能够直接在 Sum 后添加一个 Sink 算子对不断更新的各种型成交量进行输出。可是,咱们还须要统计全部类型的总成交量。为了作到这一点,咱们须要将全部记录输出到同一个计算节点的实例上。咱们能够经过 KeyBy 而且对全部记录返回同一个 Key,将全部记录分到同一个组中,从而能够所有发送到同一个实例上。

而后,咱们使用 Fold 方法来在算子中维护每种类型商品的成交量。注意虽然目前 Fold 方法已经被标记为 Deprecated,可是在 DataStream API 中暂时尚未能替代它的其它操做,因此咱们仍然使用 Fold 方法。这一方法接收一个初始值,而后当后续流中每条记录到达的时候,算子会调用所传递的 FoldFunction 对初始值进行更新,并发送更新后的值。咱们使用一个 HashMap 来对各个类别的当前成交量进行维护,当有一条新的<商品类别,成交量>到达时,咱们就更新该 HashMap。这样在 Sink 中,咱们收到的是最新的商品类别和成交量的 HashMap,咱们能够依赖这个值来输出各个商品的成交量和总的成交量。

须要指出的是,这个例子主要是用来演示 DataStream API 的用法,实际上还会有更高效的写法,此外,更上层的 Table / SQL 还支持 Retraction 机制,能够更好的处理这种状况。

_17

图8. API 原理图

最后,咱们对 DataStream API 的原理进行简要的介绍。当咱们调用 DataStream#map 算法时,Flink 在底层会建立一个 Transformation 对象,这一对象就表明咱们计算逻辑图中的节点。它其中就记录了咱们传入的 MapFunction,也就是 UDF(User Define Function)。随着咱们调用更多的方法,咱们建立了更多的 DataStream 对象,每一个对象在内部都有一个 Transformation 对象,这些对象根据计算依赖关系组成一个图结构,就是咱们的计算图。后续 Flink 将对这个图结构进行进一步的转换,从而最终生成提交做业所须要的 JobGraph。

5. 总结

本文主要介绍了 Flink DataStream API,它是当前 Flink 中比较底层的一套 API。在实际的开发中,基于该 API 须要用户本身处理 State 与 Time 等一些概念,所以须要较大的工做量。后续课程还会介绍更上层的 Table / SQL 层的 API,将来 Table / SQL 可能会成为 Flink 主流的 API,可是对于接口来讲,越底层的接口表达能力越强,在一些须要精细操做的状况下,仍然须要依赖于 DataStream API。

相关文章
相关标签/搜索