近期将Flink Job从Standalone迁移至了OnYarn,随后发现Job性能较以前有所下降:迁移前有8.3W+/S的数据消费速度,迁移到Yarn后分配一样的资源但消费速度降为7.8W+/S,且较以前的消费速度有轻微的抖动。通过缘由分析和测试验证,最终采用了在保持分配给Job的资源不变的状况下将总Container数量减半、每一个Container持有的资源从1C2G 1Slot变动为2C4G 2Slot的方式,使该问题得以解决。html
经历该问题后,发现深刻理解Slot和Flink Runtime Graph是十分必要的,因而撰写了这篇文章。本文内容分为两大部分,第一部分详细的分析Flink Slot与Job运行的关系,第二部详细的介绍遇到的问题和解决方案。java
Flink集群是由JobManager(JM)、TaskManager(TM)两大组件组成的,每一个JM/TM都是运行在一个独立的JVM进程中。JM至关于Master,是集群的管理节点,TM至关于Worker,是集群的工做节点,每一个TM最少持有1个Slot,Slot是Flink执行Job时的最小资源分配单位,在Slot中运行着具体的Task任务。apache
对TM而言:它占用着必定数量的CPU和Memory资源,具体可经过taskmanager . number Of Task Slots , taskmanager . heap . size来配置,实际上taskmanager.numberOfTaskSlots只是指定TM的Slot数量,并不能隔离指定数量的CPU给TM使用。在不考虑Slot Sharing(下文详述)的状况下,一个Slot内运行着一个SubTask(Task实现Runable,SubTask是一个执行Task的具体实例),因此官方建议taskmanager.numberOfTaskSlots配置的Slot数量和CPU相等或成比例。api
固然,咱们能够借助Yarn等调度系统,用Flink On Yarn的模式来为Yarn Container分配指定数量的CPU资源,以达到较严格的CPU隔离(Yarn采用Cgroup作基于时间片的资源调度,每一个Container内运行着一个JM/TM实例)。而taskmanager.heap.size用来配置TM的Memory,若是一个TM有N个Slot,则每一个Slot分配到的Memory大小为整个TM Memory的1/N,同一个TM内的Slots只有Memory隔离,CPU是共享的。网络
对Job而言:一个Job所需的Slot数量大于等于Operator配置的最大Parallelism数,在保持全部Operator的slotSharingGroup一致的前提下Job所需的Slot数量与Job中Operator配置的最大Parallelism相等。app
关于TM/Slot之间的关系能够参考以下从官方文档截取到的三张图:函数
图一:Flink On Yarn的Job提交过程,从图中咱们能够了解到每一个JM/TM实例都分属于不一样的Yarn Container,且每一个Container内只会有一个JM或TM实例;经过对Yarn的学习咱们能够了解到,每一个Container都是一个独立的进程,一台物理机能够有多个Container存在(多个进程),每一个Container都持有必定数量的CPU和Memory资源,并且是资源隔离的,进程间不共享,这就能够保证同一台机器上的多个TM之间是资源隔离的(Standalone模式下,同一台机器下如有多个TM,是作不到TM之间的CPU资源隔离的)。 性能
图一学习
图二:Flink Job运行图,图中有两个TM,各自有3个Slot,2个Slot内有Task在执行,1个Slot空闲。若这两个TM在不一样Container或容器上,则其占用的资源是互相隔离的。在TM内多个Slot间是各自拥有 1/3 TM的Memory,共享TM的CPU、网络(Tcp:ZK、 Akka、Netty服务等)、心跳信息、Flink结构化的数据集等。测试
图二
图三:Task Slot的内部结构图,Slot内运行着具体的Task,它是在线程中执行的Runable对象(每一个虚线框表明一个线程),这些Task实例在源码中对应的类是org.apache.flink.runtime.taskmanager.Task。每一个Task都是由一组Operators Chaining在一块儿的工做集合,Flink Job的执行过程可看做一张DAG图,Task是DAG图上的顶点(Vertex),顶点之间经过数据传递方式相互连接构成整个Job的Execution Graph。
图三
Operator Chain是指将Job中的Operators按照必定策略(例如:single output operator能够chain在一块儿)连接起来并放置在一个Task线程中执行。Operator Chain默认开启,可经过StreamExecutionEnvironment.disableOperatorChaining()关闭,Flink Operator相似Storm中的Bolt,在Strom中上游Bolt到下游会通过网络上的数据传递,而Flink的Operator Chain将多个Operator连接到一块儿执行,减小了数据传递/线程切换等环节,下降系统开销的同时增长了资源利用率和Job性能。实际开发过程当中须要开发者了解这些原理,并能合理分配Memory和CPU给到每一个Task线程。
注: 【一个须要注意的地方】Chained的Operators之间的数据传递默认须要通过数据的拷贝(例如:kryo.copy(...)),将上游Operator的输出序列化出一个新对象并传递给下游Operator,能够经过ExecutionConfig.enableObjectReuse()开启对象重用,这样就关闭了这层copy操做,能够减小对象序列化开销和GC压力等,具体源码可阅读org.apache.flink.streaming.runtime.tasks.OperatorChain与org.apache.flink.streaming.runtime.tasks.OperatorChain.CopyingChainingOutput。官方建议开发人员在彻底了解reuse内部机制后才使用该功能,冒然使用可能会给程序带来bug。
Operator Chain效果可参考以下官方文档截图:
图四:图的上半部分是StreamGraph视角,有Task类别无并行度,如图:Job Runtime时有三种类型的Task,分别是Source->Map、keyBy/window/apply、Sink,其中Source->Map是Source()和Map()chaining在一块儿的Task;图的下半部分是一个Job Runtime期的实际状态,Job最大的并行度为2,有5个SubTask(即5个执行线程)。若没有Operator Chain,则Source()和Map()分属不一样的Thread,Task线程数会增长到7,线程切换和数据传递开销等较以前有所增长,处理延迟和性能会较以前差。补充:在slotSharingGroup用默认或相同组名时,当前Job运行需2个Slot(与Job最大Parallelism相等)。
图四
Slot Sharing是指,来自同一个Job且拥有相同slotSharingGroup(默认:default)名称的不一样Task的SubTask之间能够共享一个Slot,这使得一个Slot有机会持有Job的一整条Pipeline,这也是上文提到的在默认slotSharing的条件下Job启动所需的Slot数和Job中Operator的最大parallelism相等的缘由。经过Slot Sharing机制能够更进一步提升Job运行性能,在Slot数不变的状况下增长了Operator可设置的最大的并行度,让相似window这种消耗资源的Task以最大的并行度分布在不一样TM上,同时像map、filter这种较简单的操做也不会独占Slot资源,下降资源浪费的可能性。
具体Slot Sharing效果可参考以下官方文档截图:
图五:图的左下角是一个soure-map-reduce模型的Job,source和map是4 parallelism,reduce是3 parallelism,总计11个SubTask;这个Job最大Parallelism是4,因此将这个Job发布到左侧上面的两个TM上时获得图右侧的运行图,一共占用四个Slot,有三个Slot拥有完整的source-map-reduce模型的Pipeline,如右侧图所示;注:map的结果会shuffle到reduce端,右侧图的箭头只是说Slot内数据Pipline,没画出Job的数据shuffle过程。
图五
图六:图中包含source-map[6 parallelism]、keyBy/window/apply[6 parallelism]、sink[1 parallelism]三种Task,总计占用了6个Slot;由左向右开始第一个slot内部运行着3个SubTask[3 Thread],持有Job的一条完整pipeline;剩下5个Slot内分别运行着2个SubTask[2 Thread],数据最终经过网络传递给Sink完成数据处理。
图六
Flink在默认状况下有策略对Job进行Operator Chain 和 Slot Sharing的控制,好比:将并行度相同且连续的SingleOutputStreamOperator操做chain在一块儿(chain的条件较苛刻,不止单一输出这一条,具体可阅读org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isChainable(...)),Job的全部Task都采用名为default的slotSharingGroup作Slot Sharing。但在实际的需求场景中,咱们可能会遇到需人为干预Job的Operator Chain 或 Slot Sharing策略的状况,本段就重点关注下用于改变默认Chain 和 Sharing策略的API。
Operator Chain有三种策略ALWAYS、NEVER、HEAD,详细可查看org.apache.flink.streaming.api.operators.ChainingStrategy。startNewChain()对应的策略是ChainingStrategy.HEAD(StreamOperator的默认策略),disableChaining()对应的策略是ChainingStrategy.NEVER,ALWAYS是尽量的将Operators chaining在一块儿;在一般状况下ALWAYS是效率最高,不少Operator会将默认策略覆盖为ALWAYS,如filter、map、flatMap等函数。
相似StreamETL,100 parallelism,即:一个流式的ETL Job,不包含window等操做,Job的并行度为100;
注:OnYarn下使用了与Standalone一致的GC配置,当前Job在Standalone或OnYarn环境中运行时,YGC、FGC频率基本相同,OnYarn下单个Container的堆内存较小使得单次GC耗时减小。生产环境中你们最好对比下CMS和G1,选择更好的GC策略,当前上下文中暂时认为GC对Job性能影响可忽略不计。
引发Job性能下降的缘由不难定位,从这张Container的线程图(VisualVM中的截图)可见:
图七:在一个1C2G的Container内有126个活跃线程,守护线程78个。首先,在一个1C2G的Container中运行着126个活跃线程,频繁的线程切换是会常常出现的,这让原本就不充裕的CPU显得更加的匮乏。其次,真正与数据处理相关的线程是红色画笔圈出的14条线程(2条Kafka Partition Consumer、Consumers和Operators包含在这个两个线程内;12条Kafka Producer线程,将处理好的数据sink到Kafka Topic),这14条线程以外的大多数线程在相同TM、不一样Slot间能够共用,好比:ZK-Curator、Dubbo-Client、GC-Thread、Flink-Akka、Flink-Netty、Flink-Metrics等线程,彻底能够经过增长TM下Slot数量达到多个SubTask共享的目的。
此时咱们会很天然的得出一个解决办法:在Job使用资源不变的状况下,在减小Container数量的同时增长单个Container持有的CPU、Memory、Slot数量,好比上文环境说明中从方案2调整到方案3,实际调整后的Job运行稳定了许多且消费速度与Standalone基本持平。
图七
注:当前问题是内部迁移相似StreamETL的Job时遇到的,解决方案简单但不具备普适性,对于带有window算子的Job须要更仔细缜密的问题分析。目前Deploy到Yarn集群的Job都配置了JMX/Prometheus两种监控,单个Container下Slot数量越多、每次scrape的数据越多,实际生成环境中需观测是否会影响Job正常运行,在测试时将Container配置为3C6G 3Slot时发现一次java.lang.OutOfMemoryError: Direct buffer memory的异常,初步判断与Prometheus Client相关,可适当调整JVM的MaxDirectMemorySize来解决。
所出现异常如图八:
图八
Operator Chain是将多个Operator连接在一块儿放置在一个Task中,只针对Operator;Slot Sharing是在一个Slot中执行多个Task,针对的是Operator Chain以后的Task。这两种优化都充分利用了计算资源,减小了没必要要的开销,提高了Job的运行性能。此外,Operator Chain的源码在streaming包下,只在流处理任务中有这个机制;Slot Sharing在flink-runtime包下,彷佛应用更普遍一些(具体还有待考究)。
最后,只有充分的了解Slot、Operator Chain、Slot Sharing是什么,以及各自的做用和相互间的关系,才能编写出优秀的代码并高效的运行在集群上。
做者:TalkingData数据工程师 王成龙
封面图来源于网络,若有侵权,请联系删除