入门教程 | 5分钟从零构建第一个 Flink 应用

本文转载自 Jark’s Blog ,做者伍翀(云邪),Apache Flink Committer,阿里巴巴高级开发工程师。 本文将从开发环境准备、建立 Maven 项目,编写 Flink 程序、运行程序等方面讲述如何迅速搭建第一个 Flink 应用。 在本文中,咱们将从零开始,教您如何构建第一个 Flink 应用程序。java

开发环境准备

Flink 能够运行在 Linux, Max OS X, 或者是 Windows 上。为了开发 Flink 应用程序,在本地机器上须要有 Java 8.xmaven 环境。apache

若是有 Java 8 环境,运行下面的命令会输出以下版本信息:api

$ java -version
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
若是有 maven 环境,运行下面的命令会输出以下版本信息:

$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"
另外咱们推荐使用 ItelliJ IDEA (社区免费版已够用)做为 Flink 应用程序的开发 IDE。Eclipse 虽然也能够,可是 Eclipse 在 Scala 和 Java 混合型项目下会有些已知问题,因此不太推荐 Eclipse。下一章节,咱们会介绍如何建立一个 Flink 工程并将其导入 ItelliJ IDEA。
复制代码

建立 Maven 项目

咱们将使用 Flink Maven Archetype 来建立咱们的项目结构和一些初始的默认依赖。在你的工做目录下,运行以下命令来建立项目:bash

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.6.1 \
    -DgroupId=my-flink-project \
    -DartifactId=my-flink-project \
    -Dversion=0.1 \
    -Dpackage=myflink \
    -DinteractiveMode=false
复制代码

你能够编辑上面的 groupId, artifactId, package 成你喜欢的路径。使用上面的参数,Maven 将自动为你建立以下所示的项目结构:多线程

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── myflink
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties
复制代码

咱们的 pom.xml 文件已经包含了所需的 Flink 依赖,而且在 src/main/java 下有几个示例程序框架。接下来咱们将开始编写第一个 Flink 程序。框架

编写 Flink 程序

启动 IntelliJ IDEA,选择 "Import Project"(导入项目),选择 my-flink-project 根目录下的 pom.xml。根据引导,完成项目导入。运维

在 src/main/java/myflink 下建立 SocketWindowWordCount.java 文件:socket

package myflink;

public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

  }
}
复制代码

如今这程序还很基础,咱们会一步步往里面填代码。注意下文中咱们不会将 import 语句也写出来,由于 IDE会自动将他们添加上去。在本节末尾,我会将完整的代码展现出来,若是你想跳过下面的步骤,能够直接将最后的完整代码粘到编辑器中。maven

Flink 程序的第一步是建立一个 StreamExecutionEnvironment 。这是一个入口类,能够用来设置参数和建立数据源以及提交任务。因此让咱们把它添加到 main 函数中:编辑器

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
复制代码

下一步咱们将建立一个从本地端口号 9000 的 socket 中读取数据的数据源:

DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");
复制代码

这建立了一个字符串类型的 DataStreamDataStream 是 Flink 中作流处理的核心 API,上面定义了很是多常见的操做(如,过滤、转换、聚合、窗口、关联等)。在本示例中,咱们感兴趣的是每一个单词在特定时间窗口中出现的次数,好比说5秒窗口。为此,咱们首先要将字符串数据解析成单词和次数(使用Tuple2<String, Integer>表示),第一个字段是单词,第二个字段是次数,次数初始值都设置成了1。咱们实现了一个flatmap,由于一行数据中可能有多个单词。

DataStream<Tuple2<String, Integer>> wordCounts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split("\\s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        });
复制代码

接着咱们将数据流按照单词字段(即0号索引字段)作分组,这里能够简单地使用 keyBy(int index)方法,获得一个以单词为 key 的Tuple2<String, Integer>数据流。而后咱们能够在流上指定想要的窗口,并根据窗口中的数据计算结果。在咱们的例子中,咱们想要每5秒聚合一次单词数,每一个窗口都是从零开始统计的。

DataStream<Tuple2<String, Integer>> windowCounts = wordCounts
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);
复制代码

第二个调用的 .timeWindow()指定咱们想要5秒的翻滚窗口(Tumble)。第三个调用为每一个key每一个窗口指定了sum聚合函数,在咱们的例子中是按照次数字段(即1号索引字段)相加。获得的结果数据流,将每5秒输出一次这5秒内每一个单词出现的次数。

最后一件事就是将数据流打印到控制台,并开始执行:

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
复制代码

最后的 env.execute调用是启动实际Flink做业所必需的。全部算子操做(例如建立源、聚合、打印)只是构建了内部算子操做的图形。只有在execute()被调用时才会在提交到集群上或本地计算机上执行。

下面是完整的代码,部分代码通过简化(代码在 GitHub 上也能访问到):

package myflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

    // 建立 execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 经过链接 socket 获取输入数据,这里链接到本地9000端口,若是9000端口已被占用,请换一个端口
    DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");

    // 解析数据,按 word 分组,开窗,聚合
    DataStream<Tuple2<String, Integer>> windowCounts = text
        .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          @Override
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            for (String word : value.split("\\s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        })
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

    // 将结果打印到控制台,注意这里使用的是单线程打印,而非多线程
    windowCounts.print().setParallelism(1);

    env.execute("Socket Window WordCount");
  }
}
复制代码

运行程序

要运行示例程序,首先咱们在终端启动 netcat 得到输入流:

nc -lk 9000
复制代码

若是是 Windows 平台,能够经过 nmap.org/ncat/ 安装 ncat 而后运行:

ncat -lk 9000
复制代码

而后直接运行SocketWindowWordCount的 main 方法。

只须要在 netcat 控制台输入单词,就能在 SocketWindowWordCount 的输出控制台看到每一个单词的词频统计。若是想看到大于1的计数,请在5秒内反复键入相同的单词。

Cheers ! 🎉

  • The End-

Apache Flink 入门教程将长期连载更新,除文章外,社区每周也经过直播的形式系统输出 Apache Flink 从基础、进阶、运维、实战四个部分的内容。

进阶课程 主题:《Flink Time 深度解析》 讲师:崔星灿(Apache Flink Committer,加拿大约克大学博士后) 直播:5月21日 20:00-21:00 周二晚上20:00,Apache Flink China社区大群(钉钉群号:21789141)一块儿围观崔老师关于 Flink Time 的深度解析,往期直播视频请点击回顾

相关文章
相关标签/搜索