Flink部署及做业提交(On Flink Standalone)

Flink部署准备及源码编译

官方文档:html

前置准备

用于编译源码的机器最好知足以下配置:java

  • CPU > 4核
  • 内存 > 8G
  • Note:我这里使用的机器配置是4核8G,若是内存过小编译环节会发生OOM

部署Flink以前首先须要安装好JDK,能够选择8或11版本,我这里选择的是JDK11:node

[root@flink01 ~]# java -version
java version "11.0.8" 2020-07-14 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
[root@flink01 ~]#

因为咱们选择的是源码编译的方式安装Flink,因此还须要提早安装好Maven:python

[root@flink01 /usr/local/src]# mvn --version
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/maven
Java version: 11.0.8, vendor: Oracle Corporation, runtime: /usr/local/jdk/11
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.el7.x86_64", arch: "amd64", family: "unix"
[root@flink01 /usr/local/src]#

Flink有个web-dashboard项目的编译须要依赖于NodeJS,因此也须要事先安装好:linux

[root@flink01 ~]# node -v
v12.18.4
[root@flink01 ~]#

该项目的构建依赖于angular的cli工具,可使用以下命令进行安装:c++

[root@flink01 ~]# npm install -g -registry=https://registry.npm.taobao.org @angular/cli
[root@flink01 ~]# ng --version

     _                      _                 ____ _     ___
    / \   _ __   __ _ _   _| | __ _ _ __     / ___| |   |_ _|
   / △ \ | '_ \ / _` | | | | |/ _` | '__|   | |   | |    | |
  / ___ \| | | | (_| | |_| | | (_| | |      | |___| |___ | |
 /_/   \_\_| |_|\__, |\__,_|_|\__,_|_|       \____|_____|___|
                |___/

Angular CLI: 10.1.3
Node: 12.18.4
OS: linux x64

Angular: 
... 
Ivy Workspace: 

Package                      Version
------------------------------------------------------
@angular-devkit/architect    0.1001.3 (cli-only)
@angular-devkit/core         10.1.3 (cli-only)
@angular-devkit/schematics   10.1.3 (cli-only)
@schematics/angular          10.1.3 (cli-only)
@schematics/update           0.1001.3 (cli-only)

[root@flink01 ~]#

而后须要在Maven的配置文件中,配置以下两个仓库,cloudera仓库用于下载cdh发行版的Hadoop依赖:git

<mirrors>
    <!-- 配置阿里云的中央镜像仓库 -->
    <mirror>
      <id>nexus-aliyun</id>
      <mirrorOf>central</mirrorOf>
      <name>Nexus aliyun</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>
  </mirrors>

...

  <profiles>
    <!-- 经过profile配置cloudera仓库 -->
    <profile>
      <repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
          <releases>
            <enabled>true</enabled>
          </releases>
          <snapshots>
            <enabled>false</enabled>
          </snapshots>
        </repository>
      </repositories>
    </profile>
  </profiles>

  <!-- 激活profile -->
  <activeProfiles>
    <activeProfile>cloudera-profile</activeProfile>
  </activeProfiles>

源码编译

Flink下载地址:github

安装编译源码可能会用到的工具:web

[root@flink01 ~]# yum install -y cmake3 git gcc-c++ ncurses-devel perl-Data-Dumper boost boost-doc boost-devel bzip2 openssl-devel libtirpc-devel.x86_64

打开下载页面,复制Flink源码包的下载地址,而后到Linux上经过wget命令进行下载:sql

[root@flink01 ~]# cd /usr/local/src
[root@flink01 /usr/local/src]# wget https://github.com/apache/flink/archive/release-1.11.2.tar.gz

解压下载好的源码包:

[root@flink01 /usr/local/src]# tar -zxvf flink-release-1.11.2.tar.gz
[root@flink01 /usr/local/src]# cd flink-release-1.11.2

因为flink-runtime-webweb-dashboard模块用到了NodeJS,在编译的过程当中须要下载一些依赖的包,但默认的NodeJS仓库在国内几乎没法使用,因此须要更换为淘宝的NodeJS仓库,编辑pom.xml文件:

[root@flink01 /usr/local/src/flink-release-1.11.2]# vim flink-runtime-web/pom.xml

npm install 部分的arguments标签的内容由:

<arguments>ci --cache-max=0 --no-save</arguments>

改成:

<arguments>install -registry=https://registry.npm.taobao.org --cache-max=0 --no-save</arguments>

而后就可使用Maven编译源码文件了:

[root@flink01 /usr/local/src/flink-release-1.11.2]# mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.15.1 -Dfast

但我这编译flink-runtime-web模块的时候报错了,错误提示以下:

[ERROR] Node.js version v10.9.0 detected.
[ERROR] The Angular CLI requires a minimum Node.js version of either v10.13 or v12.0.
[ERROR] 
[ERROR] Please update your Node.js version or visit https://nodejs.org/ for additional instructions.

错误缘由很明显是NodeJS的版本过低了,由于flink-runtime-web/pom.xml文件中定义了使用v10.9.0这个版本的NodeJS,并无使用咱们本身安装好的,因而打开该文件,找到以下标签,修改一下版本号便可,我这里采用v10.13.0:

<nodeVersion>v10.13.0</nodeVersion>

而后从新进行编译:

[root@flink01 /usr/local/src/flink-release-1.11.2]# mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.15.1 -Dfast

再次编译的过程当中可能会输出了以下错误信息,可是编译仍然能够继续,而且最终的状态也是成功的。因此能够不用管:

[ERROR] Browserslist: caniuse-lite is outdated. Please run next command `npm update`

编译成功,会输出以下内容:

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for flink 1.11.2:
[INFO] 
[INFO] force-shading ...................................... SUCCESS [  0.721 s]
[INFO] flink .............................................. SUCCESS [  0.581 s]
[INFO] flink-annotations .................................. SUCCESS [  0.627 s]
[INFO] flink-test-utils-parent ............................ SUCCESS [  0.033 s]
[INFO] flink-test-utils-junit ............................. SUCCESS [  0.646 s]
[INFO] flink-metrics ...................................... SUCCESS [  0.032 s]
[INFO] flink-metrics-core ................................. SUCCESS [  0.360 s]
[INFO] flink-core ......................................... SUCCESS [  7.062 s]
[INFO] flink-java ......................................... SUCCESS [  1.520 s]
[INFO] flink-queryable-state .............................. SUCCESS [  0.025 s]
[INFO] flink-queryable-state-client-java .................. SUCCESS [  0.303 s]
[INFO] flink-filesystems .................................. SUCCESS [  0.023 s]
[INFO] flink-hadoop-fs .................................... SUCCESS [  1.031 s]
[INFO] flink-runtime ...................................... SUCCESS [ 24.936 s]
[INFO] flink-scala ........................................ SUCCESS [ 25.682 s]
[INFO] flink-mapr-fs ...................................... SUCCESS [  0.457 s]
[INFO] flink-filesystems :: flink-fs-hadoop-shaded ........ SUCCESS [  2.114 s]
[INFO] flink-s3-fs-base ................................... SUCCESS [  0.424 s]
[INFO] flink-s3-fs-hadoop ................................. SUCCESS [  3.012 s]
[INFO] flink-s3-fs-presto ................................. SUCCESS [  4.794 s]
[INFO] flink-swift-fs-hadoop .............................. SUCCESS [ 12.921 s]
[INFO] flink-oss-fs-hadoop ................................ SUCCESS [  3.700 s]
[INFO] flink-azure-fs-hadoop .............................. SUCCESS [ 15.227 s]
[INFO] flink-optimizer .................................... SUCCESS [  1.171 s]
[INFO] flink-streaming-java ............................... SUCCESS [  4.635 s]
[INFO] flink-clients ...................................... SUCCESS [  0.939 s]
[INFO] flink-test-utils ................................... SUCCESS [  0.634 s]
[INFO] flink-runtime-web .................................. SUCCESS [ 48.675 s]
[INFO] flink-examples ..................................... SUCCESS [  0.043 s]
[INFO] flink-examples-batch ............................... SUCCESS [  9.319 s]
[INFO] flink-connectors ................................... SUCCESS [  0.035 s]
[INFO] flink-hadoop-compatibility ......................... SUCCESS [  5.029 s]
[INFO] flink-state-backends ............................... SUCCESS [  0.018 s]
[INFO] flink-statebackend-rocksdb ......................... SUCCESS [  0.628 s]
[INFO] flink-tests ........................................ SUCCESS [ 22.051 s]
[INFO] flink-streaming-scala .............................. SUCCESS [ 23.293 s]
[INFO] flink-hcatalog ..................................... SUCCESS [  5.332 s]
[INFO] flink-table ........................................ SUCCESS [  0.019 s]
[INFO] flink-table-common ................................. SUCCESS [  1.505 s]
[INFO] flink-table-api-java ............................... SUCCESS [  0.820 s]
[INFO] flink-table-api-java-bridge ........................ SUCCESS [  0.393 s]
[INFO] flink-table-api-scala .............................. SUCCESS [ 10.990 s]
[INFO] flink-table-api-scala-bridge ....................... SUCCESS [  9.643 s]
[INFO] flink-sql-parser ................................... SUCCESS [ 17.153 s]
[INFO] flink-libraries .................................... SUCCESS [  0.018 s]
[INFO] flink-cep .......................................... SUCCESS [  1.447 s]
[INFO] flink-table-planner ................................ SUCCESS [01:12 min]
[INFO] flink-sql-parser-hive .............................. SUCCESS [  1.524 s]
[INFO] flink-table-runtime-blink .......................... SUCCESS [  2.073 s]
[INFO] flink-table-planner-blink .......................... SUCCESS [01:30 min]
[INFO] flink-metrics-jmx .................................. SUCCESS [  0.262 s]
[INFO] flink-formats ...................................... SUCCESS [  0.020 s]
[INFO] flink-json ......................................... SUCCESS [  0.500 s]
[INFO] flink-connector-kafka-base ......................... SUCCESS [  0.983 s]
[INFO] flink-avro ......................................... SUCCESS [  1.600 s]
[INFO] flink-csv .......................................... SUCCESS [  0.520 s]
[INFO] flink-connector-kafka-0.10 ......................... SUCCESS [  0.753 s]
[INFO] flink-connector-kafka-0.11 ......................... SUCCESS [  0.652 s]
[INFO] flink-connector-elasticsearch-base ................. SUCCESS [  0.807 s]
[INFO] flink-connector-elasticsearch5 ..................... SUCCESS [  8.900 s]
[INFO] flink-connector-elasticsearch6 ..................... SUCCESS [  0.691 s]
[INFO] flink-connector-elasticsearch7 ..................... SUCCESS [  0.702 s]
[INFO] flink-connector-hbase .............................. SUCCESS [  1.758 s]
[INFO] flink-hadoop-bulk .................................. SUCCESS [  0.576 s]
[INFO] flink-orc .......................................... SUCCESS [  0.828 s]
[INFO] flink-orc-nohive ................................... SUCCESS [  0.445 s]
[INFO] flink-parquet ...................................... SUCCESS [  0.992 s]
[INFO] flink-connector-hive ............................... SUCCESS [  2.614 s]
[INFO] flink-connector-jdbc ............................... SUCCESS [  0.857 s]
[INFO] flink-connector-rabbitmq ........................... SUCCESS [  0.256 s]
[INFO] flink-connector-twitter ............................ SUCCESS [  1.220 s]
[INFO] flink-connector-nifi ............................... SUCCESS [  0.309 s]
[INFO] flink-connector-cassandra .......................... SUCCESS [  2.280 s]
[INFO] flink-connector-filesystem ......................... SUCCESS [  0.742 s]
[INFO] flink-connector-kafka .............................. SUCCESS [  0.773 s]
[INFO] flink-connector-gcp-pubsub ......................... SUCCESS [ 50.078 s]
[INFO] flink-connector-kinesis ............................ SUCCESS [  5.358 s]
[INFO] flink-sql-connector-elasticsearch7 ................. SUCCESS [  4.625 s]
[INFO] flink-connector-base ............................... SUCCESS [  0.302 s]
[INFO] flink-sql-connector-elasticsearch6 ................. SUCCESS [  3.658 s]
[INFO] flink-sql-connector-kafka-0.10 ..................... SUCCESS [  0.236 s]
[INFO] flink-sql-connector-kafka-0.11 ..................... SUCCESS [  0.299 s]
[INFO] flink-sql-connector-kafka .......................... SUCCESS [  0.603 s]
[INFO] flink-sql-connector-hive-1.2.2 ..................... SUCCESS [  2.527 s]
[INFO] flink-sql-connector-hive-2.2.0 ..................... SUCCESS [  3.090 s]
[INFO] flink-sql-connector-hive-2.3.6 ..................... SUCCESS [  2.966 s]
[INFO] flink-sql-connector-hive-3.1.2 ..................... SUCCESS [  3.828 s]
[INFO] flink-avro-confluent-registry ...................... SUCCESS [ 24.666 s]
[INFO] flink-sequence-file ................................ SUCCESS [  0.397 s]
[INFO] flink-compress ..................................... SUCCESS [  0.393 s]
[INFO] flink-sql-orc ...................................... SUCCESS [  0.196 s]
[INFO] flink-sql-parquet .................................. SUCCESS [  0.352 s]
[INFO] flink-examples-streaming ........................... SUCCESS [ 21.793 s]
[INFO] flink-examples-table ............................... SUCCESS [  6.387 s]
[INFO] flink-examples-build-helper ........................ SUCCESS [  0.041 s]
[INFO] flink-examples-streaming-twitter ................... SUCCESS [  0.332 s]
[INFO] flink-examples-streaming-state-machine ............. SUCCESS [  0.319 s]
[INFO] flink-examples-streaming-gcp-pubsub ................ SUCCESS [  7.588 s]
[INFO] flink-container .................................... SUCCESS [  0.216 s]
[INFO] flink-queryable-state-runtime ...................... SUCCESS [  0.430 s]
[INFO] flink-mesos ........................................ SUCCESS [ 22.759 s]
[INFO] flink-kubernetes ................................... SUCCESS [01:55 min]
[INFO] flink-yarn ......................................... SUCCESS [  1.131 s]
[INFO] flink-gelly ........................................ SUCCESS [  1.344 s]
[INFO] flink-gelly-scala .................................. SUCCESS [ 13.956 s]
[INFO] flink-gelly-examples ............................... SUCCESS [ 11.946 s]
[INFO] flink-external-resources ........................... SUCCESS [  0.017 s]
[INFO] flink-external-resource-gpu ........................ SUCCESS [  0.154 s]
[INFO] flink-metrics-dropwizard ........................... SUCCESS [  5.900 s]
[INFO] flink-metrics-graphite ............................. SUCCESS [  3.591 s]
[INFO] flink-metrics-influxdb ............................. SUCCESS [01:53 min]
[INFO] flink-metrics-prometheus ........................... SUCCESS [ 44.165 s]
[INFO] flink-metrics-statsd ............................... SUCCESS [  0.156 s]
[INFO] flink-metrics-datadog .............................. SUCCESS [  0.158 s]
[INFO] flink-metrics-slf4j ................................ SUCCESS [  0.151 s]
[INFO] flink-cep-scala .................................... SUCCESS [  8.664 s]
[INFO] flink-table-uber ................................... SUCCESS [  3.683 s]
[INFO] flink-table-uber-blink ............................. SUCCESS [  4.093 s]
[INFO] flink-python ....................................... SUCCESS [01:53 min]
[INFO] flink-sql-client ................................... SUCCESS [  8.511 s]
[INFO] flink-state-processor-api .......................... SUCCESS [  0.590 s]
[INFO] flink-ml-parent .................................... SUCCESS [  0.018 s]
[INFO] flink-ml-api ....................................... SUCCESS [  0.159 s]
[INFO] flink-ml-lib ....................................... SUCCESS [  8.357 s]
[INFO] flink-ml-uber ...................................... SUCCESS [  0.076 s]
[INFO] flink-scala-shell .................................. SUCCESS [  9.027 s]
[INFO] flink-dist ......................................... SUCCESS [01:08 min]
[INFO] flink-yarn-tests ................................... SUCCESS [ 11.079 s]
[INFO] flink-end-to-end-tests ............................. SUCCESS [ 37.058 s]
[INFO] flink-cli-test ..................................... SUCCESS [  0.164 s]
[INFO] flink-parent-child-classloading-test-program ....... SUCCESS [  0.141 s]
[INFO] flink-parent-child-classloading-test-lib-package ... SUCCESS [  0.089 s]
[INFO] flink-dataset-allround-test ........................ SUCCESS [  0.140 s]
[INFO] flink-dataset-fine-grained-recovery-test ........... SUCCESS [  0.148 s]
[INFO] flink-datastream-allround-test ..................... SUCCESS [  0.745 s]
[INFO] flink-batch-sql-test ............................... SUCCESS [  0.142 s]
[INFO] flink-stream-sql-test .............................. SUCCESS [  0.148 s]
[INFO] flink-bucketing-sink-test .......................... SUCCESS [  0.315 s]
[INFO] flink-distributed-cache-via-blob ................... SUCCESS [  0.139 s]
[INFO] flink-high-parallelism-iterations-test ............. SUCCESS [  4.416 s]
[INFO] flink-stream-stateful-job-upgrade-test ............. SUCCESS [  0.513 s]
[INFO] flink-queryable-state-test ......................... SUCCESS [  0.981 s]
[INFO] flink-local-recovery-and-allocation-test ........... SUCCESS [  0.133 s]
[INFO] flink-elasticsearch5-test .......................... SUCCESS [  3.092 s]
[INFO] flink-elasticsearch6-test .......................... SUCCESS [  1.650 s]
[INFO] flink-quickstart ................................... SUCCESS [  0.263 s]
[INFO] flink-quickstart-java .............................. SUCCESS [ 16.713 s]
[INFO] flink-quickstart-scala ............................. SUCCESS [  0.057 s]
[INFO] flink-quickstart-test .............................. SUCCESS [  0.315 s]
[INFO] flink-confluent-schema-registry .................... SUCCESS [  1.014 s]
[INFO] flink-stream-state-ttl-test ........................ SUCCESS [  2.333 s]
[INFO] flink-sql-client-test .............................. SUCCESS [01:01 min]
[INFO] flink-streaming-file-sink-test ..................... SUCCESS [  0.130 s]
[INFO] flink-state-evolution-test ......................... SUCCESS [  0.527 s]
[INFO] flink-rocksdb-state-memory-control-test ............ SUCCESS [  0.495 s]
[INFO] flink-end-to-end-tests-common ...................... SUCCESS [  0.527 s]
[INFO] flink-metrics-availability-test .................... SUCCESS [  0.136 s]
[INFO] flink-metrics-reporter-prometheus-test ............. SUCCESS [  0.156 s]
[INFO] flink-heavy-deployment-stress-test ................. SUCCESS [  4.367 s]
[INFO] flink-connector-gcp-pubsub-emulator-tests .......... SUCCESS [02:09 min]
[INFO] flink-streaming-kafka-test-base .................... SUCCESS [  0.193 s]
[INFO] flink-streaming-kafka-test ......................... SUCCESS [  4.041 s]
[INFO] flink-streaming-kafka011-test ...................... SUCCESS [  3.555 s]
[INFO] flink-streaming-kafka010-test ...................... SUCCESS [  3.540 s]
[INFO] flink-plugins-test ................................. SUCCESS [  0.033 s]
[INFO] dummy-fs ........................................... SUCCESS [  0.084 s]
[INFO] another-dummy-fs ................................... SUCCESS [  0.074 s]
[INFO] flink-tpch-test .................................... SUCCESS [  5.635 s]
[INFO] flink-streaming-kinesis-test ....................... SUCCESS [  6.854 s]
[INFO] flink-elasticsearch7-test .......................... SUCCESS [  1.939 s]
[INFO] flink-end-to-end-tests-common-kafka ................ SUCCESS [  0.539 s]
[INFO] flink-tpcds-test ................................... SUCCESS [  0.345 s]
[INFO] flink-netty-shuffle-memory-control-test ............ SUCCESS [  0.144 s]
[INFO] flink-python-test .................................. SUCCESS [  3.675 s]
[INFO] flink-statebackend-heap-spillable .................. SUCCESS [  0.352 s]
[INFO] flink-contrib ...................................... SUCCESS [  0.019 s]
[INFO] flink-connector-wikiedits .......................... SUCCESS [  4.279 s]
[INFO] flink-fs-tests ..................................... SUCCESS [  0.509 s]
[INFO] flink-docs ......................................... SUCCESS [  5.049 s]
[INFO] flink-walkthroughs ................................. SUCCESS [  0.021 s]
[INFO] flink-walkthrough-common ........................... SUCCESS [  0.196 s]
[INFO] flink-walkthrough-datastream-java .................. SUCCESS [  0.053 s]
[INFO] flink-walkthrough-datastream-scala ................. SUCCESS [  0.050 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  24:47 min
[INFO] Finished at: 2020-09-29T01:20:10+08:00
[INFO] ------------------------------------------------------------------------

而且会生成一个目录,目录结构以下:

[root@flink01 /usr/local/src/flink-release-1.11.2]# ls flink-dist/target/flink-1.11.2-bin/flink-1.11.2/
bin  conf  examples  lib  LICENSE  log  opt  plugins  README.txt
[root@flink01 /usr/local/src/flink-release-1.11.2]#

单机模式部署及代码提交测试

单机模式部署

首先配置一下hosts,将主机名与本地ip创建一个映射关系:

[root@flink01 ~]# vim /etc/hosts
192.168.243.148   flink01

Flink单机模式部署很是简单,只须要将以前编译生成的目录拷贝出来:

[root@flink01 /usr/local/src/flink-release-1.11.2]# cp -r flink-dist/target/flink-1.11.2-bin/flink-1.11.2/ /usr/local/flink

而后使用以下命令就能够启动Flink了:

[root@flink01 /usr/local/flink]# ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host flink01.
Starting taskexecutor daemon on host flink01.
[root@flink01 /usr/local/flink]# jps  # 启动成功会有以下Java进程
2755 Jps
2389 StandaloneSessionClusterEntrypoint
2733 TaskManagerRunner
[root@flink01 /usr/local/flink]#

与启动命令相对的中止命令以下:

$ ./bin/stop-cluster.sh

日志文件在log目录下,若是启动失败能够经过查看日志文件来排查问题:

[root@flink01 /usr/local/flink]# ls log/
flink-root-standalonesession-0-flink01.log  flink-root-standalonesession-0-flink01.out  flink-root-taskexecutor-0-flink01.log  flink-root-taskexecutor-0-flink01.out
[root@flink01 /usr/local/flink]#

经过浏览器访问机器ip + 8081端口能够打开Flink的web界面控制台:
Flink部署及做业提交(On Flink Standalone)

在侧边菜单栏中能够看到以下选项:
Flink部署及做业提交(On Flink Standalone)

  • Overview:查看总体概览
  • Running Jobs:查看运行中的做业
  • Completed Jobs:查看已经完成的做业
  • TaskManager:查看TaskManager的系统信息
  • JobManager:查看JobManager的配置及日志信息
  • Submit New Job:能够在该页面中提交做业

Flink的总体架构图以下:
Flink部署及做业提交(On Flink Standalone)

Flink 整个系统主要由两个组件组成,分别为 JobManager 和 TaskManager,Flink 架构也遵循 Master - Slave 架构设计原则,JobManager 为 Master 节点,TaskManager 为 Worker (Slave)节点,TaskManager 能够部署多个。其中,Flink Program是咱们使用Flink框架编写的程序,是 TaskManager 具体要执行的任务,任务经过Client提交到集群中。

Client 客户端

Client负责将任务提交到集群,与 JobManager 构建 Akka 链接,而后将任务提交到 JobManager,经过和 JobManager 之间进行交互获取任务执行状态。

Client提交任务能够采用 CLI 方式或者经过使用 Flink WebUI 提交(菜单栏中的 Submit New Job),也能够在应用程序中指定 JobManager 的 RPC 网络端口构建 ExecutionEnvironment 来提交 Flink 应用。

JobManager

JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,而后根据集群中 TaskManager 上 TaskSlot 的使用状况,为提交的应用分配相应的 TaskSlot 资源并命令 TaskManager 启动从客户端中获取的应用。

JobManager 至关于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。

JobManager 和 TaskManager 之间经过 Actor System 进行通讯,获取任务执行的状况并经过 Actor System 将应用的任务执行状况发送给客户端。

同时在任务执行的过程当中,Flink JobManager 会触发 Checkpoint 操做,每一个 TaskManager 节点 收到 Checkpoint 触发指令后,完成 Checkpoint 操做,全部的 Checkpoint 协调过程都是在 Fink JobManager 中完成。

当任务完成后,Flink 会将任务执行的信息反馈给客户端,而且释放掉 TaskManager 中的资源以供下一次提交任务使用。

TaskManager

TaskManager 至关于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每一个节点上的资源申请和管理。

客户端经过将编写好的 Flink 应用编译打包,提交到 JobManager,而后 JobManager 会根据已注册在 JobManager 中 TaskManager 的资源状况,将任务分配给有资源的 TaskManager节点,而后启动并运行任务。

TaskManager 从 JobManager 接收须要部署的任务,而后使用 Slot 资源启动 Task,创建数据接入的网络链接,接收数据并开始数据处理。同时 TaskManager 之间的数据交互都是经过数据流的方式进行的。

能够看出,Flink 的任务运行实际上是采用多线程的方式,这和 MapReduce 多 JVM 进行的方式有很大的区别,Flink 可以极大提升 CPU 使用效率,在多个任务和 Task 之间经过 TaskSlot 方式共享系统资源,每一个 TaskManager 中经过管理多个 TaskSlot 资源池进行对资源进行有效管理。


代码提交测试

将Flink部署完成并了解了Flink的基本组件概念后,咱们能够将Flink自带的一些示例代码提交到集群中测试是否能正常运行。示例代码的目录以下:

[root@flink01 /usr/local/flink]# ls examples/
batch  gelly  python  streaming  table
[root@flink01 /usr/local/flink]# ls examples/streaming/
IncrementalLearning.jar  Iteration.jar  SessionWindowing.jar  SocketWindowWordCount.jar  StateMachineExample.jar  TopSpeedWindowing.jar  Twitter.jar  WindowJoin.jar  WordCount.jar
[root@flink01 /usr/local/flink]#

我这里采用examples/streaming/SocketWindowWordCount.jar做为测试,该示例代码用于读取Socket流并按照分隔符分隔单词,完成词频统计的功能。为了可以模拟Socket流,咱们须要安装一下netcat工具,安装命令以下:

$ yum install -y nc

使用nc命令启动一个Socket监听9999端口,一会咱们就能够经过这个Socket写入数据:

$ nc -lk 9999

而后将示例代码提交到Flink中运行:

[root@flink01 /usr/local/flink]# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
Job has been submitted with JobID c90a28408eae654a143745903cbaa3eb

代码提交成功后,此时在界面上就能够看到有一个Job正在运行中:
Flink部署及做业提交(On Flink Standalone)

点进去能够查看详细信息:
Flink部署及做业提交(On Flink Standalone)

nc命令建立的Socket中写入一些数据:

[root@flink01 ~]# nc -lk 9999
a b c a a b b d d c
hello world
flink spark spark flink

在以下文件中能够看到词频统计后的输出结果:

[root@flink01 /usr/local/flink]# cat log/flink-root-taskexecutor-0-flink01.out
a : 3
spark : 2
flink : 2
world : 1
hello : 1
d : 2
c : 2
b : 3
[root@flink01 /usr/local/flink]#

到此为止咱们就测试完了,此时咱们要怎么中止这个任务呢?建议不要直接Ctrl + c,能够到web界面上点击“Cancel Job”就可让Job中止运行:
Flink部署及做业提交(On Flink Standalone)


Flink Standalone模式部署

官方文档:

上一小节演示了Flink的单机模式部署,但在生产环境咱们每每都是须要分布式部署的,而Flink也提供了Standalone模式部署,即独立集群。Flink Standalone模式的拓扑图:
Flink部署及做业提交(On Flink Standalone)

为了演示Standalone分布式模式的部署,至少须要两台机器,因此我这里新增一台hostnameflink02 的机器。目前的机器配置以下:
IP Hostname 角色
192.168.243.148 flink01 master(JobManager) / worker(TaskManager)
192.168.243.150 flink02 worker(TaskManager)
  • Tips:新增的 flink02 也须要具有Java运行环境

系统配置(全部节点)

配置hosts,将主机名与本地ip创建一个映射关系,使全部节点之间能够经过hostname互相访问:

$ vim /etc/hosts
192.168.243.148   flink01
192.168.243.150   flink02

关闭防火墙:

$ systemctl stop firewalld && systemctl disable firewalld

配置全部节点之间的免密登陆:

[root@flink01 ~]# ssh-keygen -t rsa      # 生成密钥对
[root@flink01 ~]# ssh-copy-id flink01    # 拷贝公钥并追加到本身的受权列表文件中
[root@flink01 ~]# ssh-copy-id flink02    # 拷贝公钥并追加到flink02的受权列表文件中
  • flink02 上也重复一样的操做,这里就不重复演示了

而后测试一下可否免密登陆,能够看到我这里登陆 flink02 节点不须要输入密码:

[root@flink01 ~]# ssh flink02
Last login: Tue Sep 29 14:22:20 2020 from 192.168.243.1
[root@flink02 ~]#

配置 master 节点

flink01 上修改一下配置文件中的几个配置项:

[root@flink01 /usr/local/flink]# vim conf/flink-conf.yaml
jobmanager.rpc.address: flink01
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 2048m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
io.tmp.dirs: /usr/local/flink/tmp_data

建立临时目录:

[root@flink01 /usr/local/flink]# mkdir tmp_data

简单说明下这几个参数:

  • jobmanager.rpc.address:指定master节点的ip地址或hostname
  • jobmanager.memory.process.size:指定JobManager节点可用的内存
  • taskmanager.memory.process.size:指定TaskManager节点可用的内存
  • taskmanager.numberOfTaskSlots:指定每台机器可用的CPU核心数
  • parallelism.default:集群中的CPU总数,也就是任务的并行度
  • io.tmp.dirs:TaskManager的临时数据存储目录
  • 有关配置参数的更多内容能够参考官方文档:Configuration

而后还须要配置 worker 节点的IP或hostname:

[root@flink01 /usr/local/flink]# vim conf/workers
flink01
flink02

重启服务:

[root@flink01 /usr/local/flink]# ./bin/stop-cluster.sh
[root@flink01 /usr/local/flink]# ./bin/start-cluster.sh

配置 worker 节点

flink 目录拷贝到 flink02 上,在 flink02 上执行以下命令:

[root@flink02 ~]# scp -r flink01:/usr/local/flink /usr/local/flink

建立临时目录:

[root@flink02 ~]# cd /usr/local/flink/
[root@flink02 /usr/local/flink]# mkdir tmp_data

启动TaskManager服务:

[root@flink02 /usr/local/flink]# ./bin/taskmanager.sh start
Starting taskexecutor daemon on host flink02.
[root@flink02 /usr/local/flink]# jps
4757 Jps
4701 TaskManagerRunner
[root@flink02 /usr/local/flink]#

此时在dashboard上就能够看到TaskManager节点数量为2了:
Flink部署及做业提交(On Flink Standalone)

在“Task Manager”页面中也能够看到两个节点的信息:
Flink部署及做业提交(On Flink Standalone)

若是须要新增更多的TaskManager节点,也是按照这种方式添加就能够了,很是简单。接下来咱们测试一下提交任务到集群中是否可以正常运行。先使用nc命令建立一个Socket并写入一些数据:

[root@flink01 ~]# nc -lk 9999
a b c a a b b d d c
hello world
flink spark spark flink

而后提交任务:

[root@flink01 /usr/local/flink]# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
Job has been submitted with JobID 641d5e7e0bd572ba4114ea5e69b8644c

在以下文件中能够看到词频统计后的输出结果,表明任务是可以正常运行在Flink的Standalone模式上的:

[root@flink01 /usr/local/flink]# cat log/flink-root-taskexecutor-1-flink01.out
a : 3
spark : 2
flink : 2
world : 1
hello : 1
d : 2
c : 2
b : 3
[root@flink01 /usr/local/flink]#
相关文章
相关标签/搜索