官方文档:html
在上一篇 Flink部署及做业提交(On Flink Cluster) 文章中,咱们介绍了如何编译部署Flink自身的资源分配和管理系统,并将做业提交到该系统上去运行。但一般来说这种方式用得很少,由于在企业中,可能会使用不一样的分布式计算框架,如Spark、Storm或MapReduce等。java
若是每一种框架都须要搭建各自的资源分配和管理系统,就没法共享资源,致使资源利用率低。而且大多企业通常会使用Hadoop生态的相关组件作做为大数据处理平台的底座,如HDFS、Hive、YARN等。node
其中 YARN 是资源调度框架、通用的资源管理系统,能够为上层应用提供统一的资源管理和调度,Spark、Flink、Storm等计算框架均可以集成到 YARN 上。如此一来这些计算框架能够享受总体的资源调度,进而提升集群资源的利用率,这也就是所谓的 xxx on YARN。所以,绝大部分企业都是将计算做业放到 YARN 上进行调度,而不是每种计算框架都单独搭一个资源分配和管理系统。这也是为何要单独介绍Flink On YARN的缘由。web
想要让Flink做业跑在 YARN 上,咱们首先得搭建一个Hadoop环境,为了简单这里只搭建单节点环境。我这里使用的是CDH的Hadoop发行版。下载地址以下:shell
首先须要安装好Java运行环境,因为比较简单这里就不演示了:apache
[root@hadoop01 ~]# echo ${JAVA_HOME} /usr/local/jdk/11 [root@hadoop01 ~]# java -version java version "11.0.8" 2020-07-14 LTS Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS) Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode) [root@hadoop01 ~]#
配置hosts
,将主机名与本地ip创建一个映射关系:编程
[root@hadoop01 ~]# vim /etc/hosts 192.168.243.142 hadoop01
关闭防火墙:vim
[root@hadoop01 ~]# systemctl stop firewalld && systemctl disable firewalld
配置免密登陆:api
[root@hadoop01 ~]# ssh-keygen -t rsa # 生成密钥对 [root@hadoop01 ~]# ssh-copy-id hadoop01 # 拷贝公钥并追加到本身的受权列表文件中
而后就能够开始安装Hadoop了,这里采用 hadoop-2.6.0-cdh5.16.2 版本做为演示,复制下载连接到系统上进行下载:浏览器
[root@hadoop01 ~]# cd /usr/local/src [root@hadoop01 /usr/local/src]# wget http://archive.cloudera.com/cdh5/cdh/5/hadoop-2.6.0-cdh5.16.2.tar.gz
解压下载好的压缩包:
[root@hadoop01 /usr/local/src]# tar -zxvf hadoop-2.6.0-cdh5.16.2.tar.gz -C /usr/local
配置系统环境变量:
[root@hadoop01 ~]# vim /etc/profile export HADOOP_HOME=/usr/local/hadoop-2.6.0-cdh5.16.2 export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin [root@hadoop01 ~]# source /etc/profile
修改几个配置文件:
[root@hadoop01 ~]# cd $HADOOP_HOME [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/hadoop-env.sh export JAVA_HOME=/usr/local/jdk/11 # 配置JDK的目录 # 配置 core [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/core-site.xml <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://hadoop01:8020</value> </property> </configuration> # 配置 hdfs,设置副本因子和临时目录 [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/hdfs-site.xml <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/data/hadoop/tmp</value> </property> </configuration> # 配置slave节点的ip或hostname [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/slaves hadoop01 # 配置 yarn [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/yarn-site.xml <configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration> # 配置MapReduce [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# cp etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xml [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# vim etc/hadoop/mapred-site.xml <configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration> # 建立hadoop的临时目录 [root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# mkdir -p /data/hadoop/tmp
应用HDFS的配置:
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# ./bin/hdfs namenode -format
启动全部组件:
[root@hadoop01 /usr/local/hadoop-2.6.0-cdh5.16.2]# ./sbin/start-all.sh
启动成功后查看进程:
[root@hadoop01 ~]# jps 3344 SecondaryNameNode 2722 NameNode 3812 Jps 3176 DataNode 3578 NodeManager 3502 ResourceManager [root@hadoop01 ~]#
而后在浏览器中访问HDFS的web界面,默认端口是50070:
接着访问HDFS的YARN界面,默认端口是8088:
测试HDFS可否正常读写:
[root@hadoop01 ~]# hadoop fs -put anaconda-ks.cfg / # 任意put一个文件到hdfs [root@hadoop01 ~]# hadoop fs -ls / # 查看hdfs中是否有该文件 Found 1 items -rw-r--r-- 1 root supergroup 1269 2020-09-29 17:45 /anaconda-ks.cfg
通过测试,确认Hadoop环境是运行正常以后,咱们就能够尝试将Flink应用放到YARN上运行了。
Flink on YARN 有两种模式:Session模式和Per-Job模式。在Session模式中多个 JobManager 共享 Dispatcher 和 YarnResourceManager。在这种模式下,须要先向 YARN 申请资源,初始化一个常驻服务在 YARN 上,后续提交的Job都将运行在这个Session上:
而Per-Job模式则相反,一个 JobManager 独享 Dispatcher 和 YarnResourceManager。也就是说每提交一个Job都新建一个Session,不一样Job之间的资源是隔离的,不会互相影响:
想要深刻了解的话能够参考官方文档:
首先将在 Flink部署及做业提交(On Flink Cluster) 一文中编译好的Flink目录拷贝到当前部署了Hadoop环境的机器上:
[root@hadoop01 ~]# scp -r 192.168.243.148:/usr/local/src/flink-release-1.11.2/flink-dist/target/flink-1.11.2-bin/flink-1.11.2/ /usr/local/flink
配置环境变量,不然Flink会报找不到Hadoop相关Class的异常:
[root@hadoop01 ~]# vim /etc/profile export HADOOP_MAPRED_HOME=$HADOOP_HOME export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME export YARN_HOME=$HADOOP_HOME export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop export HADOOP_CLASSPATH=$HADOOP_COMMON_HOME/lib:$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/mepreduce/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/tools/*:$HADOOP_HOME/share/hadoop/httpfs/*:$HADOOP_HOME/share/hadoop/kms/*:$HADOOP_HOME/share/hadoop/common/lib/*
而后执行./bin/yarn-session.sh --help
命令测试一下可否正常输出帮助信息:
[root@hadoop01 ~]# cd /usr/local/flink/ [root@hadoop01 /usr/local/flink]# ./bin/yarn-session.sh --help ... Usage: Optional -at,--applicationType <arg> Set a custom application type for the application on YARN -D <property=value> use value for given property -d,--detached If present, runs the job in detached mode -h,--help Help for the Yarn session CLI. -id,--applicationId <arg> Attach to running YARN session -j,--jar <arg> Path to Flink jar file -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -m,--jobmanager <arg> Address of the JobManager to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration. -nl,--nodeLabel <arg> Specify YARN node label for the YARN application -nm,--name <arg> Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue <arg> Specify YARN queue. -s,--slots <arg> Number of slots per TaskManager -t,--ship <arg> Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode [root@hadoop01 /usr/local/flink]#
确认Flink能够正常找到Hadoop后,使用以下命令在 YARN 上建立一个常驻服务:
[root@hadoop01 /usr/local/flink]# ./bin/yarn-session.sh -jm 1024m -tm 2048m ... JobManager Web Interface: http://hadoop01:37525 # 建立成功的话会输出JobManager的web访问地址
-jm
:指定JobManager须要的内存资源-tm
:指定TaskManager须要的内存资源使用浏览器打开 YARN 的web界面,正常状况下会有以下应用:
点击应用右边的 “ApplicationMaster” 能够跳转到Flink的dashboard。此时能够看到Flink Dashboard页面上任何数字都是0,应该就能看得出实际这只是启动了一个JobManager:
hosts
文件中配置一下hadoop01这个主机名到IP的映射关系接下来咱们尝试一下提交做业到 YARN 上运行,首先准备好官方提供的测试文件,并put到HDFS中:
[root@hadoop01 ~]# wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt [root@hadoop01 ~]# hadoop fs -copyFromLocal LICENSE-2.0.txt /
而后执行以下命令,提交一个Word Count做业:
[root@hadoop01 ~]# cd /usr/local/flink/ [root@hadoop01 /usr/local/flink]# ./bin/flink run -m hadoop01:37525 ./examples/batch/WordCount.jar \ --input hdfs://hadoop01:8020/LICENSE-2.0.txt --output hdfs://hadoop01:8020/wordcount-result.txt
hadoop01:37525
,是执行完yarn-session.sh
命令输出的JobManager的访问地址执行完成后,控制台会输出以下内容:
Job has been submitted with JobID 2240e11994cf8579a78e16a1984f08db Program execution finished Job with JobID 2240e11994cf8579a78e16a1984f08db has finished. Job Runtime: 10376 ms
此时到“Completed Jobs”页面中,能够看到运行完成的做业及其信息:
除此以外,咱们还能够查看该做业输出到HDFS中的结果文件:
[root@hadoop01 /usr/local/flink]# hadoop fs -ls /wordcount-result.txt -rw-r--r-- 1 root supergroup 4499 2020-09-29 20:25 /wordcount-result.txt [root@hadoop01 /usr/local/flink]# hadoop fs -text /wordcount-result.txt
首先将以前在 yarn 上运行的应用和相关进程给kill
掉:
[root@hadoop01 ~]# yarn application -kill application_1601372571363_0001 [root@hadoop01 ~]# jps 6995 SecondaryNameNode 7204 ResourceManager 7305 NodeManager 11291 Jps 6734 NameNode 6830 DataNode 8942 FlinkYarnSessionCli [root@hadoop01 ~]# kill 8942
Per-Job模式更简单,由于是提交一个做业就建立一次资源的,因此直接运行以下命令就能够提交一个Flink的Word Count做业到 yarn 上,不须要像Session模式那样事先去建立资源:
[root@hadoop01 /usr/local/flink]# ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
做业运行完成后,控制台会输出一堆统计结果。此时在 yarn 上能够看到该做业已经执行完成:
在以前的演示中能够看到,提交的Flink做业都是以jar
包形式存在的。若是咱们在实际开发中,须要频繁修改代码提交到 yarn 上测试,那么就得频繁的打包,相对来讲就有点麻烦。那么Flink有没有像Spark那样提供相似于 Spark Shell 的交互式编程终端用于简单的代码测试呢?答案是有的,Flink提供了PyFlink Shell和Scala Shell,能够执行Python和Scala代码。
这里简单演示下Flink Scala Shell的使用,执行以下命令打开Flink Scala Shell:
[root@hadoop01 /usr/local/flink]# ./bin/start-scala-shell.sh local
local
表示在本地运行,除此以外还能够选择remote
和yarn
,具体可使用--help
参数进行查看shell里调用API的方式仍是同样的,只是环境变成了内置的变量,例如这里使用的benv
就表示批处理的env:
scala> val dataSet = benv.readTextFile("file:///root/LICENSE-2.0.txt") dataSet: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@3110bb19 scala> dataSet.print