flink集群的搭建与部署

运行环境

  • linux-CentOS6.8
  • hadoop-2.7.5
  • Scala-2.11.6
  • jdk-1.8
  • flink-1.7.1-bin-hadoop27-scala_2.11.tgz

flink搭建

1.下载

下载地址:http://flink.apache.org/downloads.html
根据本身集群环境的状况,下载相应的flink版本。
上面描述个人集群环境是hadoop2.7.5Scala2.11,因此下载:flink-1.7.1-bin-hadoop27-scala_2.11.tgzhtml

2.下载方式

2.1 直接从网页上下载,上传至集群上。
2.2wget下载: wget flink-1.7.1-bin-hadoop27-scala_2.11.tgz(推荐使用)
在这里插入图片描述linux

3.解压

tar -zxvf  flink-1.7.1-bin-hadoop27-scala_2.11.tgz

4.设置环境变量

vi /etc/profileweb

#flink
export FLINK_HOME=/usr/local/flink-1.7.1
export PATH=$FLINK_HOME/bin:$PATH

刷新使之生效 source /etc/profileshell

5.配置

cd /usr/local/flink-1.7.1/conf
在这里插入图片描述
5.1配置文件说明
这里面须要咱们配置的有:slavesflink-conf.yaml文件,这里面masters文件是用来配置HA的,只要咱们不配置HA的话,就不须要配置masters文件(flink也是master/slave结构,可是对于此时master的选择是执行启动脚本的机器为master)。可是slave须要咱们配置,配置对应的主机名便可(伪分布式和分布式的区别也就是实际上slave节点的个数,以及分布式在多个节点上而已)。接下来须要咱们配置的就是flink-conf.yaml,flink和spark仍是有区别的,spark配置文件分spark-env.shspark-default.conf文件,而flink的配置都在flink-conf.yaml中完成配置。apache

5.2修改flink-conf.yaml配置文件,先配置一个简单版本,standalone的模式session

# JobManager runs.
jobmanager.rpc.address: cdh1
# The RPC port where the JobManager is reachable.
jobmanager.rpc.port: 6123
# The heap size for the JobManager JVM
jobmanager.heap.size: 1024m
# The heap size for the TaskManager JVM
taskmanager.heap.size: 1024m
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 1
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1
#配置是否在Flink集群启动时候给TaskManager分配内存,默认不进行预分配,这样在咱们不适用flink集群时候不会占用集群资源
taskmanager.memory.preallocate: false
# 用于未指定的程序的并行性和其余并行性,默认并行度 
parallelism.default: 2 
 #指定JobManger的可视化端口,尽可能配置一个不容易冲突的端口
 jobmanager.web.port: 5566
 #配置checkpoint目录
state.backend.fs.checkpointdir: hdfs://cdh1:9000/flink-checkpoints 
#配置hadoop的配置文件
fs.hdfs.hadoopconf: /usr/local/hadoop/etc/hadoop/ 
#访问hdfs系统使用的
fs.hdfs.hdfssite: /usr/local/hadoop/etc/hadoop/hdfs-site.xml

5.3修改slavesmasters2个文件,用来配置taskManager和JobManager信息app

[hadoop@cdh1 conf]$ cat slaves 
cdh2
cdh3
cdh4
cdh5
[hadoop@cdh1 conf]$ cat masters 
cdh1:8081

5.4配置内容注意
flink-conf.yaml中配置key/value时候在“:”后面须要有一个空格,不然配置不会生效。
#sloves文件配置,填写从节点的ip地址便可分布式

同步信息

flink安装全部信息已经环境信息同步到其余机器上面,这里有几台机器就要执行几回svg

scp /etc/profile root@cdh3:etc/profile
scp -r ./flink-1.7.1 root@cdh3:/usr/local

source /etc/proflie

flink集群的开启

1.启动flink集群

start-cluster.shoop

[root@cdh1 bin]# start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host cdh1 .

2.而后jps查看一下进程:

分别能够看到JobManagerTaskManager的2个进程

[root@cdh1 bin]$ jps
3876 StandaloneSessionClusterEntrypoint
[root@cdh2 ~]$ jps
3544 TaskManagerRunner

3.登陆JobManager的地址查看web界面 http://192.168.10.3:8081
在这里插入图片描述
已经表示搭建完成了,如今咱们开始验证一下集群

运行模式介绍

使用start-scala-shell.sh来验证
${FLINK_HOME}/bin/start-scala-shell.sh是flink提供的交互式clinet,能够用于代码片断的测试,方便开发工做,它有两种启动方式,一种是工做在本地,另外一种是工做到集群。本例中由于机器链接很是方便,就直接使用集群进行测试,在开发中,若是集群链接不是很是方便,能够链接到本地,在本地开发测试经过后,再链接到集群进行部署工做。若是程序有依赖的jar包,则能够使用 -a <path/to/jar.jar> 或 --addclasspath <path/to/jar.jar>参数来添加依赖。

1.本地链接

${FLINK_HOME}/bin/start-scala-shell.sh local

2.集群链接

${FLINK_HOME}/bin/start-scala-shell.sh remote <hostname> <portnumber>

3.带有依赖包的格式

${FLINK_HOME}/bin/start-scala-shell.sh [local|remote<host><port>] --addclasspath<path/to/jar.jar>

4.查看帮助

${FLINK_HOME}/bin/start-scala-shell.sh --help

[root@cdh2 bin]$ ./start-scala-shell.sh --help
Flink Scala Shell
Usage: start-scala-shell.sh [local|remote|yarn] [options] <args>...

Command: local [options]
Starts Flink scala shell with a local Flink cluster
  -a, --addclasspath <path/to/jar>
                           Specifies additional jars to be used in Flink
Command: remote [options] <host> <port>
Starts Flink scala shell connecting to a remote cluster
  <host>                   Remote host name as string
  <port>                   Remote port as integer

  -a, --addclasspath <path/to/jar>
                           Specifies additional jars to be used in Flink
Command: yarn [options]
Starts Flink scala shell connecting to a yarn cluster
  -n, --container arg      Number of YARN container to allocate (= Number of TaskManagers)
  -jm, --jobManagerMemory arg
                           Memory for JobManager container
  -nm, --name <value>      Set a custom name for the application on YARN
  -qu, --queue <arg>       Specifies YARN queue
  -s, --slots <arg>        Number of slots per TaskManager
  -tm, --taskManagerMemory <arg>
                           Memory per TaskManager container
  -a, --addclasspath <path/to/jar>
                           Specifies additional jars to be used in Flink
  --configDir <value>      The configuration directory.
  -h, --help               Prints this usage text

测试

咱们 使用集群模式去验证

[root@cdh1 bin]$ ./start-scala-shell.sh remote 192.168.10.3 8081

运行以下案例代码

Scala> val text = benv.fromElements(
  "To be, or not to be,--that is the question:--",
  "Whether 'tis nobler in the mind to suffer",
  "The slings and arrows of outrageous fortune",
  "Or to take arms against a sea of troubles,")
Scala> val counts = text
    .flatMap { _.toLowerCase.split("\\W+") }
    .map { (_, 1) }.groupBy(0).sum(1)
Scala> counts.print()

运行结果
在这里插入图片描述

web url也能够看到详细的信息
在这里插入图片描述

遇到异常状况:

咱们这边是由于安装了Scala致使通讯失败,将Scala的环境信息去掉就能够了。

中止flink集群

stop-cluster.sh
  
[root@cdh1 conf]# stop-cluster.sh

参考连接

(1)https://blog.csdn.net/paicMis/article/details/84642263
(2)http://www.javashuo.com/article/p-kttwhpgo-bp.html