Flink
是目前最热门的分布式流/批处理框架,而Kubernetes
是目前最热门的资源管理和调度平台。Flink
支持在Kubernetes
上采用Session模式或Application模式部署做业。基于实践经验,本文主要讨论在Kubernetes上部署Flink做业须要注意的地方。html
环境:java
测试虽基于flink-1.11.2,参考1.12的文档也无妨:node
k8s上运行flink任务有两种模式,session模式和application模式(早期还有一种per-job模式,但已废弃)github
session模式下,在k8s上会部署一个运行jobmanager的pod(以及包括deployment/rs/service/configmap等资源)。后续经过提交命令提交的任务,都由这个jobmanager的pod负责向k8s申请taskmanager的pod。这种模式与standalone有些相似,惟一不一样的是,standalone模式须要事先部署好master和worker。docker
application模式下,每一个做业会在k8s上部署一套jm和tm,这跟yarn模式是相似的。apache
基于上述原理,不论是哪一种模式都须要pod有权限建立和管理k8s资源,所以须要考虑RBAC。api
正如文档所说,须要事先为default这个servicename设置对应的role,实践中咱们部署以下配置,参考kubernetes-log-user-systemserviceaccountdefaultdefault-cannot-get-services:网络
apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding metadata: name: fabric8-rbac subjects: - kind: ServiceAccount # Reference to upper's `metadata.name` name: default # Reference to upper's `metadata.namespace` namespace: default roleRef: kind: ClusterRole name: cluster-admin apiGroup: rbac.authorization.k8s.io
最开始遗漏上述步骤浪费了大量的调试时间。session
另外,提交任务最好在k8s的节点上进行,由于以下缘由
KubeConfig, which has access to list, create, delete pods and services, configurable via ~/.kube/config. You can verify permissions by running kubectl auth can-i <list|create|edit|delete> pods.
k8s上须要事先安装好CoreDNS
flink1.11的客户端对log配置处理并很差,这形成调试和排错困难,因此建议上来先处理一下客户端的配置:
在flink-conf.yaml
增长以下配置:
kubernetes.container-start-command-template: %java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args% kubernetes.container.image.pull-policy: Always
kubernetes.container-start-command-template
的做用是生成jobmanager pod时的启动命令。这里去掉提交命令中最后的的%redirect%
。默认%redirect%
会将标准输出和标准错误重定向到文件。重定向会致使,若是pod出错挂掉的话,没法经过kubectl logs命令查看日志将logback-console.xml
和log4j-console.properties
重命名为logback.xml
和log4j.properties
。这将使得日志打印到stdout和stderr,不然日志将打印到文件。
1.12修复了这个两个问题,没必要修改命令行模板,也没必要重命名日志文件 FLINK-15792
首先经过以下命令提交jobmanager:
$ ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
若是不成功的的话,建议经过kubectl logs命令看下问题。
接下来提交做业:
$ ./bin/flink run --target kubernetes-session -Dkubernetes.cluster-id=my-first-flink-cluster ./examples/streaming/TopSpeedWindowing.jar
可能会报错:
Caused by: java.util.concurrent.CompletionException: [org.apache.flink.shaded.netty4.io](http://org.apache.flink.shaded.netty4.io/).netty.channel.AbstractChannel$AnnotatedConnectException: 拒绝链接: /192.168.21.73:8081 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ... 19 more
显然是客户端尝试将jobgraph
提交给容器中的jobmanager
时没法链接到jobmanager。这里想到首先要检查一下jobmanager是否正常,进入jm容器,日志没有什么报错,应该没问题。容器启动的是8081的端口,在外面是没法访问的。这是k8s环境的“通病”:复杂网络环境。
有几种方案:
经过kubectl port-forward进行本地端口转发,例如:
kubectl port-forward my-first-flink-cluster-6446bbb6f6-4nnm5 8081:8081 --address 0.0.0.0
jobmanager会启动一个rest service资源,默认采用LoadBalancer类型,咱们能够在集群中安装一个LoadBalancer,下面介绍了如何安装metallb
metallb自己也是pod运行的,按照官网安装没什么问题:
kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.9.5/manifests/namespace.yaml kubectl apply -f https://raw.githubusercontent.com/metallb/metallb/v0.9.5/manifests/metallb.yaml首次安装需运行
kubectl create secret generic -n metallb-system memberlist --from-literal=secretkey="$(openssl rand -base64 128)"采用layer2的方式配置,并且地址池的地址段与node是同一个地址段,这样不须要配置其余东西:
config.yaml apiVersion: v1 kind: ConfigMap metadata: namespace: metallb-system name: config data: config: | address-pools: - name: default protocol: layer2 addresses: - 192.168.21.210-192.168.21.215
配置loadbalancer之后:
能够看到xxx-rest的svc,其中的EXTERNAL-IP
原先一直是<pending>,如今从地址段中分配了一个地址了。这个地址没法ping,可是能够访问:
解决了服务外部访问的问题,就能正常运行测试做业了。
session模式总结:
Session mode虽然看似简单,可是对于扫清环境障碍起到相当重要的做用。上面提到Application mode与yarn实际上是比较相似的,是一种更接近生产的部署模式。
首先须要将打包好的应用程序jar包打入镜像:
FROM flink:1.11.2-scala_2.11 RUN mkdir -p $FLINK_HOME/usrlib COPY jax-flink-entry-2.0-SNAPSHOT.jar $FLINK_HOME/usrlib/jax-flink-entry-2.0-SNAPSHOT.jar COPY kafka-clients-2.2.0.jar $FLINK_HOME/lib/kafka-clients-2.2.0.jar
以上面的Dockerfile为例,把咱们的应用程序包放到$FLINK_HOME/usrlib
(这是个特殊的目录,默认Flink在运行的时候会从这个目录加载用户的jar包)。同时,咱们把依赖包放到$FLINK_HOME/lib
下。
构建镜像并推送到内部的镜像仓库:
docker build -t xxxxx:5000/jax-flink:lastest . docker push xxxx:5000/jax-flink:lastest
以Application mode提交做业
./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=jax-application-cluster \ -Dkubernetes.container.image=xxxx:5000/jax-flink:lastest \ -c com.eoi.jax.flink_entry.FlinkMainEntry \ local:///opt/flink/usrlib/jax-flink-entry-2.0-SNAPSHOT.jar ...
做业会启动独立的jobmanager和taskmanager。Applicatoin mode的特色是做业的构建(生成jobgraph的过程)不在客户端完成,而是在jobmanager上完成,这一点与spark的driver是相似的。
一些提交命令参数的做用:
$internal.application.program-args
。这将最终最为用户main函数的参数[]String$internal.application.main
pipeline.classpaths
(必须是合法的URL)。可是,pipeline.classpaths中的URL不会被加到运行用户main函数的类加载器中,这意味着-C指定的依赖包没法被用户代码使用。笔者已经向Flink提交了相关的issue和PR,已经被肯定为BUG。FLINK-21289containerized.taskmanager.env
和containerized.master.env
测试下来是生效的,能够生成容器的env