大数据云原生系列| 微信 Flink on Kubernetes 实战总结

前言

架构转型,拥抱云原生服务生态java

当前微信内部的大数据计算平台是基于自研的 Yard 资源调度系统来建设,Yard 的设计初衷除了提供在线服务资源隔离外,另外一方面是为了提升在线服务机器的总体资源利用率,其核心策略是在机器空闲时能在上面跑一些大数据离线任务。可是对接业界各类大数据计算框架(例如 Hadoop MapReduce、Spark、Flink 等)都须要专门定制化开发,迭代维护很是不灵活,难以跟上开源社区发展的步伐。为此,咱们开始转向使用Kubernetes,并基于腾讯云的 TKE 容器平台逐步搭建咱们的大数据计算平台。mysql

考虑到咱们 Yard 平台上 Flink 做业还不是特别多,历史包袱相对较少,因此咱们首先开始 Flink on Kubernetes实战之路。git

微信 Flink 实时计算平台总体概况

微信 Flink 做业数据流转图

下图是咱们大多数业务的 Flink 做业实时计算数据流转图,数据经采集上报到消息队列 Pulsar,用户的 Flink 做业消费 Pulsar 计算(必要时也会访问其余外部存储,如Redis、FeatureKV等),计算结果能够落地到多种存储系统,例如对于报表类业务,计算结果写入 mysql/pg;对于实时样本特征拼接做业,计算结果写入 hdfs,为下游模型训练不断提供样本;对于一些中间结果,则写入Pulsar,以便对接下游 Flink 做业。github

下面详细阐述上图中 Flink 做业是如何提交部署的。web

集群及 Flink 做业部署

Flink on TKE 半托管服务,极致的Flink云原生使用体验sql

Flink on TKE 半托管服务提供了Flink集群部署、日志、监控、存储等一站式的服务,用户能够将其余在线业务与Flink运行在同一个集群中,从而最大程度提升资源资源使用率,达到统一资源、统一技术栈、统一运维等能力。docker

咱们基于腾讯云的 TKE 容器平台构建 Flink Kubernetes 计算集群。根据已有的 Flink 做业运营行状况,咱们发现绝大多数 Flink 做业主要是耗费内存,而CPU利用率广泛较低,在机型选择上咱们推荐选择内存型机器。apache

对于 Flink 做业的提交部署,Flink on Kubernetes 有多种部署模式(详细介绍请参考TKE团队出品的文章:Flink on kubernetes 部署模式分析),Flink 开源社区前后推出了基于 Standalone 的 Kubernetes 声明式部署以及 Kubernetes Native 部署方式,基于 Standalone 的 Kubernetes 声明式部署步骤繁琐且不易管理,因此不考虑,另外社区的 Flink on Kubernetes Native 部署方式是从1.12起正式推出,功能还不够完善,而且还没有被大规模生产验证,咱们在这以前其实已经开始调研部署,通过一番比较后,咱们使用的是TKE容器团队提供的Flink on TKE半托管服务(基于Kubernetes Operator),其提交部署流程大体以下图所示。json

img

经过 Flink Operator,客户端就能够经过一个简单的声明式 API 提交部署 Flink 做业,各组件的生命周期统一由 Operator 控制,例如:api

apiVersion: flinkoperator.Kubernetes.io/v1beta1
kind: FlinkCluster
metadata:
  name: flink-hello-world
spec:
  image:
    name: flink:1.11.3
  jobManager:
    resources:
      limits:
        memory: "1024Mi"
        cpu: "200m"
  taskManager:
    replicas: 2
    resources:
      limits:
        memory: "2024Mi"
        cpu: "200m"
  job:
    jarFile: /opt/flink/examples/streaming/helloword.jar
    className: org.apache.flink.streaming.examples.wordcount.WordCount
    args: ["--input", "/opt/flink/README.txt"]
    parallelism: 2
  flinkProperties:
    taskmanager.numberOfTaskSlots: "2"

Flink Operator 提交流程大体以下图所示,首先会启动一个 Flink Standalone Session Cluster,而后拉起一个 Job Pod 运行用户代码,向 Standalone Session Cluster 提交 Job,提交完成后会不断去跟踪 Job 的运行状态。因此运行过程当中会有三类 Pod,即 JobManager、TaskManager、Job Pod。

来源: https://github.com/lyft/flink...

使用 Flink Operator 部署 Flink 做业的好处不言而喻,客户端不须要像 Flink on Kubernetes Native 部署方式那样须要 kubeconfig,能够直接经过 http 接口访问 API Server。虽然 Flink on Kubernetes Native 部署能够作到按需自动申请 TM,可是实际上咱们的应用场景基本都是单 Job 的流计算,用户事先规划好资源也可接受,并且基于 Flink Operator,咱们能够作批调度,即 Gang Schedule,能够避免资源有限的状况下做业之间互相等待资源 hold 住的状况(例如大做业先提交,部分 TaskManager 长时间处于资源等待状态,小做业后提交,小做业申请不到资源也 hold 在那里傻等)。

自动下载用户上传资源

做业与 Flink 内核动态分离,提升灵活性

经过上述的声明式 API 方式提交部署,咱们能够看到用户 jar 包须要事先打到 image 里,做为平台提供方,固然不可能让每一个用户本身去打 docker image,有些用户甚至都不知道怎么用 docker,因此咱们应该对用户屏蔽 docker image,用户只须要上传 jar 包等资源便可。Flink Operator 提供了 initContainer 选项,借助它咱们能够实现自动下载用户上传资源,可是为了简单,咱们直接修改 docker entrypoint 启动脚本,先下载用户上传的资源,再启动 Flink 相关进程,用户上传的资源经过环境变量声明。例如:

apiVersion: flinkoperator.Kubernetes.io/v1beta1
kind: FlinkCluster
metadata:
  name: flink-hello-world
spec:
  image:
    name: flink:1.11.3
  envVars:
    - name: FLINK_USER_JAR
      value: hdfs://xxx/path/to/helloword.jar
    - name: FLINK_USER_DEPENDENCIES
      value: hdfs://xxx/path/to/config.json,hdfs://xxx/path/to/vocab.txt
  ...

用户上传的依赖能够是任意文件,跟 Flink on Yarn 的方式不一样,咱们不用经过 submit 来分发依赖,而是在容器 docker entrypoint 启动脚本中直接下载到工做目录,以便用户能够在代码里以相对路径的方式(例如 ./config.json)访问到,若是依赖文件是 jar,则须要将其附加到 classpath 中,为了避免修改 flink 的脚本,咱们将 jar 附加到环境变量 HADOOP_CLASSPATH上,最后 Flink 相关进程启动的时候会被加到 Java 的 classpath 中。

对于用户主类所在的 jar(即环境变量FLINK_USER_JAR),只须要在 Job Pod 的 Container 中下载,若是一样下载到当前目录,那么它也会被附加到classpath中,在提交的时候可能会出现以下类加载连接错误,这是由于 Java 启动的时候加载了一遍,在执行用户main函数的时候 Flink 又会去加载一遍,因此咱们将主 jar 包下载到一个专门固定目录,例如/opt/workspace/main/,那么提交时经过spec.job.jarFile

参数指定到 /opt/workspace/main/xxx.jar 便可。

java.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "org/apache/pulsar/client/api/Authentication"
    at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_152]
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[?:1.8.0_152]
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[?:1.8.0_152]
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) ~[?:1.8.0_152]
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) ~[?:1.8.0_152]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) ~[?:1.8.0_152]
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) ~[?:1.8.0_152]

总的来讲,每类 pod 的启动流程以下图所示:

img

与微信后台服务打通

云原生架构下的资源类型 Demonsets,简化架构转型复杂度

用户的 Flink 做业常常须要在运行过程当中与微信的后台服务进行交互,在传统的裸机上访问微信的后台服务须要机器部署 Agent 及路由配置,对于 Kubernetes 集群,在咱们基础架构中心的同事支持下,微信后台基础 Agent 以 DeamonSet 方式打包到部署到每一个节点上,咱们在起 Flink 相关 Container 的时候,带上 HostIPC 选项并挂载路由配置路径,就能够像使用裸机同样访问微信的后台服务。

此外,由于部分 Agent 的 unix sock 文件在母机 /tmp 下,咱们须要在容器里挂载目录 /tmp,然而 Flink 运行过程当中 shuffle、web 以及一些临时文件(例如解压出来的so等)默认都是放到 /tmp 目录下,这就会致使做业即便失败也会残留一些垃圾到母机上,久而久之,/tmp 目录势必会被撑爆,因此咱们在启动 Java 进程时设置参数 -Djava.io.tmpdir=/opt/workspace/tmp,将 Java 的默认临时目录改到容器内的路径,这样做业失败,容器销毁不至于残留垃圾。

属性配置、日志及监控

日志与监控,提高可观测性

从上面的声明式 yaml 配置能够看到,提交 Flink 做业时是经过flinkProperties 选项来指定 Flink 属性参数,事实上 Flink Operator 会将flinkProperties指定的属性参数以 ConfigMap 形式部署,会覆盖 image 中的 flink/conf 目录,因此咱们不能将系统默认属性配置放到 flink image 中,为此,咱们在客户端维护一份 Flink 系统默认配置,在提交的时候会合并用户填的属性配置,填充到 flinkProperties 选项中,能够方便咱们灵活调整 Flink 系统默认配置。

默认状况下,Flink on Kubernetes部署的做业,其在 Docker Container 中运行的进程都是前台运行的,使用 log4j-console.properties配置,日志会直接打到控制台,这样就会致使 Flink UI 没法展现 log,只能去查看 Pod 日志,此外用户经过 System.out.println 打的日志也会混在 log4j 的日志中,不易区分查看。因此咱们从新定义了 log4j-console.properties,将 log4j 日志打到FLINK_LOG_DIR 目录下的文件中,并按大小滚动,为了能在 Flink UI 上也能看到用户 stdout 的输出,在进程启动命令flink-console.sh 最后加上 2>\&1 | tee ${FLINK_LOG_PREFIX}.out,能够把控制台输出的日志旁路一份到日志目录的文件中。最后 Flink UI 展现的日志以下图所示:

对于历史失败做业,咱们在Kubernetes上也部署了一个 Flink History Server,能够灵活地扩缩容,今后不再用担忧半夜做业挂了自动重启没法追溯缘由了。

对于资源及做业的监控,TKE 提供了免费的云原生 Prometheus 服务 TPS,能够一键部署并关联咱们的 TKE 集群,然而咱们在早期已经采用主流的 Prometheus + Grafana 组合部署了监控平台,这里就没有使用TPS。当前咱们有集群资源、应用组(Namespace)资源、做业资源利用状况的监控,大体以下图所示。后面咱们会再将每一个做业 Flink Metric 推到 Prometheus,便于监控做业级别的反压、gc、operator 流量等信息。

数据应用平台对接

基于上述基础的 Flink-on-Kubernetes 能力,就能够将 Flink 对接到咱们的各类数据应用平台上。以下图所示,咱们已经支持用户使用多样化的方式使用 Flink,用户能够在机器学习平台拖拽节点或者注册定制化节点以 Jar 包或 PyFlink 的方式使用,另外也能够在SQL分析平台上写 Flink SQL。

对于 Jar、PyFlink 的方式使用就不详细展开,对于 Flink SQL 的支持,咱们目前是结合咱们自身的元数据体系,利用 Flink 已有的 SQL 功能。当前实时数仓被业界普遍提起,咱们知道传统的离线数仓,如 Hive,无外乎是在 HDFS 上套了一层 Schema,那么实时数仓也相似,数据源一般是 Kafka、Pulsar 这类消息队列系统,在这之上套一层 Schema 将实时数据管理起来,就能够称之为实时数仓了。咱们基于SQL分析平台的元数据管理体系,构建 Flink SQL 能力,用户能够在SQL分析平台上注册/管理库表元数据,为了架构简单,咱们并无去实现本身的 Flink Catalog(元数据操做直接在 SQL分析平台上完成,无需实现 create、drop 等 API),而是采用以下图所示的流程来提交 SQL。

用户在SQL分析平台上注册库表元数据(能够精细受权管控),而后编辑 SQL 提交,首先SQL分析平台会作语法校验、权限及合法性校验,没问题后,将 SQL 涉及到的元数据加密打包,连同声明式配置 Yaml 提交给统一调度平台,在统一调度平台上咱们开发了一个 FlinkSQL 类型的做业,本质上就是一个常规的 Flink Jar 做业,即 FlinkSQLDriver ,用于接受 SQL 及其附属的参数,FlinkSQLDriver 被提交后,解析传过来的配置,组装完整的 SQL 语句(包括 DDL、DML),而后调用 tableEnvironment.executeSql逐条执行,因此本质上是将库表临时注册到 default catalog 中。

小结

本文从总体上介绍了微信 Flink-on-Kubernetes实战经验以及 Flink 数据应用平台的概况,一方面咱们提供最基础的 Flink 计算平台能力,借助Kubernetes有效管控集群,另外一方面咱们在已有的数据通道及元数据平台上构建实时数仓,提供 Flink SQL 能力,进一步下降用户使用门槛,对于 Flink SQL 的支持目前还比较初级和原始,后面咱们将结合业务使用状况探索更多深层次的优化。

【腾讯云原生】云说新品、云研新术、云游新活、云赏资讯,扫码关注同名公众号,及时获取更多干货!!
相关文章
相关标签/搜索