Flink 从0到1学习 —— Flink 中如何管理配置?

前言

若是你了解 Apache Flink 的话,那么你应该熟悉该如何像 Flink 发送数据或者如何从 Flink 获取数据。可是在某些状况下,咱们须要将配置数据发送到 Flink 集群并从中接收一些额外的数据。java

在本文的第一部分中,我将描述如何将配置数据发送到 Flink 集群。咱们须要配置不少东西:方法参数、配置文件、机器学习模型。Flink 提供了几种不一样的方法,咱们将介绍如何使用它们以及什么时候使用它们。在本文的第二部分中,我将描述如何从 Flink 集群中获取数据。git

如何发送数据给 TaskManager?

在咱们深刻研究如何在 Apache Flink 中的不一样组件之间发送数据以前,让咱们先谈谈 Flink 集群中的组件,下图展现了 Flink 中的主要组件以及它们是如何相互做用的:github

当咱们运行 Flink 应用程序时,它会与 Flink JobManager 进行交互,这个 Flink JobManager 存储了那些正在运行的 Job 的详细信息,例如执行图。 JobManager 它控制着 TaskManager,每一个 TaskManager 中包含了一部分数据来执行咱们定义的数据处理方法。算法

在许多的状况下,咱们但愿可以去配置 Flink Job 中某些运行的函数参数。根据用例,咱们可能须要设置单个变量或者提交具备静态配置的文件,咱们下面将讨论在 Flink 中该如何实现?sql

除了向 TaskManager 发送配置数据外,有时咱们可能还但愿从 Flink Job 的函数方法中返回数据。缓存

如何配置用户自定义函数?

假设咱们有一个从 CSV 文件中读取电影列表的应用程序(它要过滤特定类型的全部电影):微信

//读取电影列表数据集合
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
        .ignoreFirstLine()
        .parseQuotedStrings('"')
        .ignoreInvalidLines()
        .types(Long.class, String.class, String.class);

lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> {
    // 以“|”符号分隔电影类型
    String[] genres = movie.f2.split("\\|");

    // 查找全部 “动做” 类型的电影
    return Stream.of(genres).anyMatch(g -> g.equals("Action"));
}).print();

咱们极可能想要提取不一样类型的电影,为此咱们须要可以配置咱们的过滤功能。 当你要实现这样的函数时,最直接的配置方法是实现构造函数:session

// 传递类型名称
lines.filter(new FilterGenre("Action"))
    .print();

...

class FilterGenre implements FilterFunction<Tuple3<Long, String, String>> {
    //类型
    String genre;
    //初始化构造方法
    public FilterGenre(String genre) {
        this.genre = genre;
    }

    @Override
    public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
        String[] genres = movie.f2.split("\\|");

        return Stream.of(genres).anyMatch(g -> g.equals(genre));
    }
}

或者,若是你使用 lambda 函数,你能够简单地使用它的闭包中的一个变量:闭包

final String genre = "Action";

lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> {
    String[] genres = movie.f2.split("\\|");

    //使用变量
    return Stream.of(genres).anyMatch(g -> g.equals(genre));
}).print();

Flink 将序列化此变量并将其与函数一块儿发送到集群。架构

若是你须要将大量变量传递给函数,那么这些方法就会变得很是烦人了。 为了解决这个问题,Flink 提供了 withParameters 方法。 要使用它,你须要实现那些 Rich 函数,好比你没必要实现 MapFunction 接口,而是实现 RichMapFunction。

Rich 函数容许你使用 withParameters 方法传递许多参数:

// Configuration 类来存储参数
Configuration configuration = new Configuration();
configuration.setString("genre", "Action");

lines.filter(new FilterGenreWithParameters())
        // 将参数传递给函数
        .withParameters(configuration)
        .print();

要读取这些参数,咱们须要实现 "open" 方法并读取其中的参数:

class FilterGenreWithParameters extends RichFilterFunction<Tuple3<Long, String, String>> {

    String genre;

    @Override
    public void open(Configuration parameters) throws Exception {
        //读取配置
        genre = parameters.getString("genre", "");
    }

    @Override
    public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
        String[] genres = movie.f2.split("\\|");

        return Stream.of(genres).anyMatch(g -> g.equals(genre));
    }
}

全部这些选项均可以使用,但若是须要为多个函数设置相同的参数,则可能会很繁琐。在 Flink 中要处理此种状况, 你能够设置全部 TaskManager 均可以访问的全局环境变量。

为此,首先须要使用 ParameterTool.fromArgs 从命令行读取参数:

public static void main(String... args) {
    //读取命令行参数
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    ...
}

而后使用 setGlobalJobParameters 设置全局做业参数:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
...

//该函数将可以读取这些全局参数
lines.filter(new FilterGenreWithGlobalEnv()) //这个函数是本身定义的
                .print();

如今咱们来看看这个读取这些参数的函数,和上面说的同样,它是一个 Rich 函数:

class FilterGenreWithGlobalEnv extends RichFilterFunction<Tuple3<Long, String, String>> {

    @Override
    public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
        String[] genres = movie.f2.split("\\|");
        //获取全局的配置
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        //读取配置
        String genre = parameterTool.get("genre");

        return Stream.of(genres).anyMatch(g -> g.equals(genre));
    }
}

要读取配置,咱们须要调用 getGlobalJobParameter 来获取全部全局参数,而后使用 get 方法获取咱们要的参数。

广播变量

若是你想将数据从客户端发送到 TaskManager,上面文章中讨论的方法都适合你,但若是数据以数据集的形式存在于 TaskManager 中,该怎么办? 在这种状况下,最好使用 Flink 中的另外一个功能 —— 广播变量。 它只容许将数据集发送给那些执行你 Job 里面函数的任务管理器。

假设咱们有一个数据集,其中包含咱们在进行文本处理时应忽略的单词,而且咱们但愿将其设置为咱们的函数。 要为单个函数设置广播变量,咱们须要使用 withBroadcastSet 方法和数据集。

DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
// 获取要忽略的单词集合
DataSet<String> wordsToIgnore = ...

data.map(new RichFlatMapFunction<String, String>() {

    // 存储要忽略的单词集合. 这将存储在 TaskManager 的内存中
    Collection<String> wordsToIgnore;

    @Override
    public void open(Configuration parameters) throws Exception {
        //读取要忽略的单词的集合
        wordsToIgnore = getRuntimeContext().getBroadcastVariable("wordsToIgnore");
    }

    @Override
    public String map(String line, Collector<String> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words)
            //使用要忽略的单词集合
            if (wordsToIgnore.contains(word))
                out.collect(new Tuple2<>(word, 1));
    }
    //经过广播变量传递数据集
}).withBroadcastSet(wordsToIgnore, "wordsToIgnore");

你应该记住,若是要使用广播变量,那么数据集将会存储在 TaskManager 的内存中,若是数据集和越大,那么占用的内存就会越大,所以使用广播变量适用于较小的数据集。

若是要向每一个 TaskManager 发送更多数据而且不但愿将这些数据存储在内存中,可使用 Flink 的分布式缓存向 TaskManager 发送静态文件。 要使用 Flink 的分布式缓存,你首先须要将文件存储在一个分布式文件系统(如 HDFS)中,而后在缓存中注册该文件:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//从 HDFS 注册文件
env.registerCachedFile("hdfs:///path/to/file", "machineLearningModel")

...

env.execute()

为了访问分布式缓存,咱们须要实现一个 Rich 函数:

class MyClassifier extends RichMapFunction<String, Integer> {

    @Override
    public void open(Configuration config) {
      File machineLearningModel = getRuntimeContext().getDistributedCache().getFile("machineLearningModel");
      ...
    }

    @Override
    public Integer map(String value) throws Exception {
      ...
    }
}

请注意,要访问分布式缓存中的文件,咱们须要使用咱们用于注册文件的 key,好比上面代码中的 machineLearningModel

Accumulator(累加器)

咱们前面已经介绍了如何将数据发送给 TaskManager,但如今咱们将讨论如何从 TaskManager 中返回数据。 你可能想知道为何咱们须要作这种事情。 毕竟,Apache Flink 就是创建数据处理流水线,读取输入数据,处理数据并返回结果。

为了表达清楚,让咱们来看一个例子。假设咱们须要计算每一个单词在文本中出现的次数,同时咱们要计算文本中有多少行:

//要处理的数据集合
DataSet<String> lines = ...

// Word count 算法
lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
})
.groupBy(0)
.sum(1)
.print();

// 计算要处理的文本中的行数
int linesCount = lines.count()
System.out.println(linesCount);

问题是若是咱们运行这个应用程序,它将运行两个 Flink 做业!首先获得单词统计数,而后计算行数。

这绝对是低效的,但咱们怎样才能避免这种状况呢?一种方法是使用累加器。它们容许你从 TaskManager 发送数据,并使用预约义的功能聚合此数据。 Flink 有如下内置累加器:

  • IntCounter,LongCounter,DoubleCounter:容许将 TaskManager 发送的 int,long,double 值汇总在一块儿

  • AverageAccumulator:计算双精度值的平均值

  • LongMaximum,LongMinimum,IntMaximum,IntMinimum,DoubleMaximum,DoubleMinimum:累加器,用于肯定不一样类型的最大值和最小值

  • 直方图 - 用于计算 TaskManager 的值分布

要使用累加器,咱们须要建立并注册一个用户定义的函数,而后在客户端上读取结果。下面咱们来看看该如何使用呢:

lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {

    //建立一个累加器
    private IntCounter linesNum = new IntCounter();

    @Override
    public void open(Configuration parameters) throws Exception {
        //注册一个累加器
        getRuntimeContext().addAccumulator("linesNum", linesNum);
    }

    @Override
    public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
        
        // 处理每一行数据后 linesNum 递增
        linesNum.add(1);
    }
})
.groupBy(0)
.sum(1)
.print();

//获取累加器结果
int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum");
System.out.println(linesNum);

这样计算就能够统计输入文本中每一个单词出现的次数以及它有多少行。

若是须要自定义累加器,还可使用 Accumulator 或 SimpleAccumulator 接口实现本身的累加器。

最后

本篇文章由 zhisheng 翻译,禁止任何无受权的转载。

翻译后地址:http://www.54tianzhisheng.cn/2019/03/28/flink-additional-data/

原文地址:https://brewing.codes/2017/10/24/flink-additional-data/

本文部分代码地址:https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-examples/src/main/java/com/zhisheng/examples/batch/accumulator

微信公众号:zhisheng

另外我本身整理了些 Flink 的学习资料,目前已经所有放到微信公众号了。你能够加个人微信:zhisheng_tian,而后回复关键字:Flink 便可无条件获取到。

更多私密资料请加入知识星球!

博客

一、Flink 从0到1学习 —— Apache Flink 介绍

二、Flink 从0到1学习 —— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

三、Flink 从0到1学习 —— Flink 配置文件详解

四、Flink 从0到1学习 —— Data Source 介绍

五、Flink 从0到1学习 —— 如何自定义 Data Source ?

六、Flink 从0到1学习 —— Data Sink 介绍

七、Flink 从0到1学习 —— 如何自定义 Data Sink ?

八、Flink 从0到1学习 —— Flink Data transformation(转换)

九、Flink 从0到1学习 —— 介绍 Flink 中的 Stream Windows

十、Flink 从0到1学习 —— Flink 中的几种 Time 详解

十一、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 ElasticSearch

十二、Flink 从0到1学习 —— Flink 项目如何运行?

1三、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Kafka

1四、Flink 从0到1学习 —— Flink JobManager 高可用性配置

1五、Flink 从0到1学习 —— Flink parallelism 和 Slot 介绍

1六、Flink 从0到1学习 —— Flink 读取 Kafka 数据批量写入到 MySQL

1七、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RabbitMQ

1八、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HBase

1九、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 HDFS

20、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Redis

2一、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Cassandra

2二、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 Flume

2三、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 InfluxDB

2四、Flink 从0到1学习 —— Flink 读取 Kafka 数据写入到 RocketMQ

2五、Flink 从0到1学习 —— 你上传的 jar 包藏到哪里去了

2六、Flink 从0到1学习 —— 你的 Flink job 日志跑到哪里去了

2七、阿里巴巴开源的 Blink 实时计算框架真香

2八、Flink 从0到1学习 —— Flink 中如何管理配置?

2九、Flink 从0到1学习—— Flink 不能够连续 Split(分流)?

30、Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文

3一、Flink 架构、原理与部署测试

3二、为何说流处理即将来?

3三、OPPO 数据中台之基石:基于 Flink SQL 构建实时数据仓库

3四、流计算框架 Flink 与 Storm 的性能对比

3五、Flink状态管理和容错机制介绍

3六、Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理

3七、360深度实践:Flink与Storm协议级对比

3八、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了

3九、Apache Flink 1.9 重大特性提早解读

40、Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)

4一、Flink 灵魂两百问,这谁顶得住?

源码解析

一、Flink 源码解析 —— 源码编译运行

二、Flink 源码解析 —— 项目结构一览

三、Flink 源码解析—— local 模式启动流程

四、Flink 源码解析 —— standalone session 模式启动流程

五、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Job Manager 启动

六、Flink 源码解析 —— Standalone Session Cluster 启动流程深度分析之 Task Manager 启动

七、Flink 源码解析 —— 分析 Batch WordCount 程序的执行过程

八、Flink 源码解析 —— 分析 Streaming WordCount 程序的执行过程

九、Flink 源码解析 —— 如何获取 JobGraph?

十、Flink 源码解析 —— 如何获取 StreamGraph?

十一、Flink 源码解析 —— Flink JobManager 有什么做用?

十二、Flink 源码解析 —— Flink TaskManager 有什么做用?

1三、Flink 源码解析 —— JobManager 处理 SubmitJob 的过程

1四、Flink 源码解析 —— TaskManager 处理 SubmitJob 的过程

1五、Flink 源码解析 —— 深度解析 Flink Checkpoint 机制

1六、Flink 源码解析 —— 深度解析 Flink 序列化机制

1七、Flink 源码解析 —— 深度解析 Flink 是如何管理好内存的?

1八、Flink Metrics 源码解析 —— Flink-metrics-core

1九、Flink Metrics 源码解析 —— Flink-metrics-datadog

20、Flink Metrics 源码解析 —— Flink-metrics-dropwizard

2一、Flink Metrics 源码解析 —— Flink-metrics-graphite

2二、Flink Metrics 源码解析 —— Flink-metrics-influxdb

2三、Flink Metrics 源码解析 —— Flink-metrics-jmx

2四、Flink Metrics 源码解析 —— Flink-metrics-slf4j

2五、Flink Metrics 源码解析 —— Flink-metrics-statsd

2六、Flink Metrics 源码解析 —— Flink-metrics-prometheus

2六、Flink Annotations 源码解析

2七、Flink 源码解析 —— 如何获取 ExecutionGraph ?

2八、大数据重磅炸弹——实时计算框架 Flink

2九、Flink Checkpoint-轻量级分布式快照

30、Flink Clients 源码解析原文出处:zhisheng的博客,欢迎关注个人公众号:zhisheng

相关文章
相关标签/搜索