flink安装部署

一. Flink的下载
安装包下载地址:http://flink.apache.org/downloads.html ,选择对应Hadoop的Flink版本下载
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
Flink 有三种部署模式,分别是 Local、Standalone Cluster 和 Yarn Cluster。

二. Local模式
对于 Local 模式来说,JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload。如果要验证一个简单的应用,Local 模式是最方便的。实际应用中大多使用 Standalone 或者 Yarn Cluster,而local模式只是将安装包解压启动(./bin/start-local.sh)即可,在这里不在演示。

三. Standalone 模式
快速入门教程地址:https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html

  1. 软件要求
    Java 1.8.x或更高版本,
    ssh(必须运行sshd才能使用管理远程组件的Flink脚本)
  2. 解压
    tar zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz
  3. 修改配置文件
    修改flink/conf/masters,slaves,flink-conf.yaml

复制代码
sudo vi masters
hdp-1:8081
sudo vi slaves
hdp-2
hdp-3
hdp-4
sudo vi flink-conf.yaml
taskmanager.numberOfTaskSlots:2
jobmanager.rpc.address: hdp-1

可选配置:
每个JobManager(jobmanager.heap.mb)的可用内存量,
每个TaskManager(taskmanager.heap.mb)的可用内存量,
每台机器的可用CPU数量(taskmanager.numberOfTaskSlots),
集群中的CPU总数(parallelism.default)和
临时目录(taskmanager.tmp.dirs)
4. 拷贝安装包到各节点
scp
5. 配置环境变量
配置所有节点Flink的环境变量

sudo vi /etc/profile
export FLINK_HOME=
export PATH= P A T H : PATH: FLINK_HOME/bin
source /etc/profile
6. 启动flink
./bin/start-cluster.sh
jps查看进程
StandaloneSessionClusterEntrypoint
TaskManagerRunner
7. WebUI查看
http://node21:8081
在这里插入图片描述
8. Flink 的 HA
首先,我们需要知道 Flink 有两种部署的模式,分别是 Standalone 以及 Yarn Cluster 模式。对于 Standalone 来说,Flink 必须依赖于 Zookeeper 来实现 JobManager 的 HA(Zookeeper 已经成为了大部分开源框架 HA 必不可少的模块)。在 Zookeeper 的帮助下,一个 Standalone 的 Flink 集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选举新的 JobManager 来接管 Flink 集群。

对于 Yarn Cluaster 模式来说,Flink 就要依靠 Yarn 本身来对 JobManager 做 HA 了。其实这里完全是 Yarn 的机制。对于 Yarn Cluster 模式来说,JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的 JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就完全依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一样)。由于完全依赖了 Yarn,因此不同版本的 Yarn 可能会有细微的差异。这里不再做深究。

1) 修改配置文件
修改flink-conf.yaml,HA模式下,jobmanager不需要指定,在master file中配置,由zookeeper选出leader与standby。
#jobmanager.rpc.address: node21
high-availability:zookeeper #指定高可用模式(必须)
high-availability.zookeeper.quorum:node21:2181,node22:2181,node23:2181 #ZooKeeper仲裁是ZooKeeper服务器的复制组,它提供分布式协调服务(必须)
high-availability.storageDir:hdfs:///flink/ha/ #JobManager元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须)
high-availability.zookeeper.path.root:/flink #根ZooKeeper节点,在该节点下放置所有集群节点(推荐)
high-availability.cluster-id:/flinkCluster #自定义集群(推荐)
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints

修改conf/zoo.cfg
server.1=hdp-1:2888:3888
server.2=hdp-2:2888:3888
server.3=hdp-3:2888:3888
server.4=hdp-4:2888:3888

修改conf/masters
hdp-1:8081
hdp-2:8081

修改slaves
hdp-2
hdp-3
hdp-4
同步配置文件conf到各节点
scp

2) 启动HA
先启动zookeeper集群各节点(测试环境中也可以用Flink自带的start-zookeeper-quorum.sh),启动dfs ,再启动flink
start-cluster.sh

WebUI查看,这是会自动产生一个主Master,如下
在这里插入图片描述
3) 验证HA
kill掉一个,会自动启动另一个

4)手动将JobManager / TaskManager实例添加到群集
您可以使用bin/jobmanager.sh和bin/taskmanager.sh脚本将JobManager和TaskManager实例添加到正在运行的集群中。

添加JobManager

bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all

添加TaskManager

bin/taskmanager.sh start|start-foreground|stop|stop-all

[[email protected] apps]$ jobmanager.sh start hdp-1
新添加的为从master。

  1. 运行测试任务
    $ flink run -m node21:8081 ./examples/batch/WordCount.jar --input hdfs:///wordcount/input/wc.txt --output hdfs:///wordcount/output2

四. Yarn Cluster模式

  1. 引入
    在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的 Workload。因此 Flink 也支持在 Yarn 上面运行。首先,让我们通过下图了解下 Yarn 和 Flink 的关系。
    在这里插入图片描述 在图中可以看出,Flink 与 Yarn 的关系与 MapReduce 和 Yarn 的关系是一样的