上一章讲了flink 的 application mode。今天咱们主要经过该模式提交WordCount
做业,而且选择的是native kubernetes。html
下图描述了flink如何与kubernetes集成:docker
建立Flink Kubernetes Session集群时,Flink客户端将首先链接到Kubernetes ApiServer提交集群描述,包括ConfigMap规范,Job Manager服务规范,Job Manager Deployment规范和Owner Reference。而后,Kubernetes将建立JobManager Deployment,在此期间,Kubelet将拉取镜像,准备并安装卷,而后执行启动命令。启动JobManager Pod 后,Dispatcher和KubernetesResourceManager 就绪可用,而且集群已准备好接受一个或多个做业。 shell
当用户经过Flink客户端提交做业时,客户端将生成Job Graph
,并将其与用户jar一块儿上传到Dispatcher。 apache
JobManager向KubernetesResourceManager请求插槽的资源。若是没有可用的插槽,资源管理器将建立TaskManager Pod并在集群中注册它们。架构
Application mode容许用户建立一个包含其Job和Flink运行时的镜像,这将根据须要自动建立和销毁集群组件。 Flink社区提供了针对任何用例定制的基本docker镜像。app
下载less
首先去官网下载 flink1.11。包中包含如下内容:oop
bin conf Dockerfile examples lib LICENSE licenses log NOTICE opt plugins README.txt
其中:ui
构建镜像lua
而后咱们建立一个Dockerfile,用于定制镜像。Dockerfile内容以下:
FROM flink RUN mkdir -p $FLINK_HOME/usrlib COPY ./examples/streaming/WordCount.jar $FLINK_HOME/usrlib/my-flink-job.jar
构建镜像:
docker build -t iyacontrol/flink-world-count:v0.0.1 . Sending build context to Docker daemon 362.7MB Step 1/3 : FROM flink latest: Pulling from library/flink e9afc4f90ab0: Already exists 989e6b19a265: Already exists af14b6c2f878: Already exists 68a79816c3e1: Pull complete 037cc5cb1b83: Pull complete d3efdb331614: Pull complete bf82d2b871ad: Pull complete 4ff2e8c5d83f: Pull complete f15a0d59303a: Pull complete 81130e2e9fdd: Pull complete 40bdeebc27c6: Pull complete 8fe3a85e5402: Pull complete Digest: sha256:665db47d0a2bcc297e9eb4df7640d3e4c1d398d25849252a726c8ada112722cf Status: Downloaded newer image for flink:latest ---> 43f070a908e6 Step 2/3 : RUN mkdir -p $FLINK_HOME/usrlib ---> Running in c44a726b85a9 Removing intermediate container c44a726b85a9 ---> 67ab6686e049 Step 3/3 : COPY ./examples/streaming/WordCount.jar $FLINK_HOME/usrlib/my-flink-job.jar ---> ab3686ebc7e5 Successfully built ab3686ebc7e5
推送镜像到镜像仓库:
docker push iyacontrol/flink-world-count:v0.0.1 The push refers to repository [docker.io/iyacontrol/flink-world-count] b3b3d0402b8d: Pushed b1757ffb6e42: Pushed 3af0e2838f53: Mounted from library/flink cf0f92755ad7: Mounted from library/flink 1f8a2f4bd423: Mounted from library/flink eedc301c6f3f: Mounted from library/flink d23c0e026b3e: Mounted from library/flink 37f26e989a45: Mounted from library/flink e658c78cae16: Mounted from library/flink d8859f270d7a: Mounted from library/flink 7ab97ad88178: Mounted from library/flink 527ade4639e0: Mounted from library/flink c2c789d2d3c5: Mounted from library/flink 8803ef42039d: Mounted from library/flink v0.0.1: digest: sha256:fcd99fedbba2734796226a725789bf7db109131b04f2a13c1cd1bc773ff3b8c0 size: 3253
注意:此处须要换成本身的镜像仓库。或是能够绕过构建步骤,直接使用我打好的镜像。
配置kubernetes RBAC权限
须要给flink授予RBAC某些权限,而且在提交任务的时候经过参数(-Dkubernetes.jobmanager.service-account=flink)指定, JobManager 方可建立做业Pod。
kubectl create serviceaccount flink -n stream kubectl create clusterrolebinding flink-role-binding-flink -n stream --clusterrole=edit --serviceaccount=stream:flink
并无选择default 命名空间,这里建立了一个stream的命名空间。
提交做业
执行如下命令提交WorldCount 做业:
./bin/flink run-application -p 8 -t kubernetes-application \ -Dkubernetes.cluster-id=my-first-cluster \ -Dtaskmanager.memory.process.size=4096m \ -Dkubernetes.taskmanager.cpu=2 \ -Dtaskmanager.numberOfTaskSlots=4 \ -Dkubernetes.container.image=iyacontrol/flink-world-count:v0.0.1 \ -Dkubernetes.namespace=stream \ -Dkubernetes.jobmanager.service-account=flink \ -Dkubernetes.rest-service.exposed.type=LoadBalancer \ -Dkubernetes.rest-service.annotations=service.beta.kubernetes.io/aws-load-balancer-type:nlb \ local:///opt/flink/usrlib/my-flink-job.jar
只有 application mode 的架构支持local。假定jar位于镜像中,而不位于Flink客户端中。
以下相似输出:
2020-07-17 17:57:55,455 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration blob.server.port will be set to 6124 2020-07-17 17:57:55,455 INFO org.apache.flink.kubernetes.utils.KubernetesUtils [] - Kubernetes deployment requires a fixed port. Configuration taskmanager.rpc.port will be set to 6122 2020-07-17 17:57:55,511 WARN org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator [] - Found 0 files in directory null/etc/hadoop, skip to mount the Hadoop Configuration ConfigMap. 2020-07-17 17:57:55,511 WARN org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator [] - Found 0 files in directory null/etc/hadoop, skip to create the Hadoop Configuration ConfigMap. 2020-07-17 17:57:56,348 INFO org.apache.flink.kubernetes.KubernetesClusterDescriptor [] - Create flink application cluster my-first-cluster successfully, JobManager Web Interface: http://F1DD312BB1102AC0AE558F66FA.gr7.ap-southeast-1.eks.amazonaws.com:8081
下面咱们讲下提交参数:
而后咱们查看一下再k8s当中建立了那些资源:
kubectl get all -n stream NAME READY STATUS RESTARTS AGE pod/my-first-cluster-64ff98cd96-sprk7 1/1 Running 0 24s pod/my-first-cluster-taskmanager-1-1 1/1 Running 0 16s pod/my-first-cluster-taskmanager-1-2 0/1 Pending 0 12s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/my-first-cluster ClusterIP None <none> 6123/TCP,6124/TCP 24s service/my-first-cluster-rest LoadBalancer 10.100.xx.179 a4fd46cfa3985f99582310dbfd-0ce036fe28648b82.elb.ap-southeast-1.amazonaws.com 8081:32756/TCP 24s NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/my-first-cluster 1/1 1 1 24s NAME DESIRED CURRENT READY AGE replicaset.apps/my-first-cluster-64ff98cd96 1 1 1 24s
访问UI
使用上面的url访问JobManager的UI,以下:
flink native kubernets 在1.10的时候推出,目前还处于开发当中,某些参数可能会在以后版本中变化。不过1.11 版本已经比较稳定了。