两年Flink迁移之路:从standalone到on yarn,处理能力提高五倍

1、背景与痛点

在2017年上半年之前,TalkingData的App Analytics和Game Analytics两个产品,流式框架使用的是自研的td-etl-framework。该框架下降了开发流式任务的复杂度,对于不一样的任务只须要实现一个changer链便可,而且支持水平扩展,性能尚可,曾经能够知足业务需求。html

可是到了2016年末和2017年上半年,发现这个框架存在如下重要局限:java

  1. 性能隐患:App Analytics-etl-adaptor和Game Analytics-etl-adaptor这两个模块相继在节假日出现了严重的性能问题(Full-GC),致使指标计算延迟;
  2. 框架的容错机制不足:依赖于保存在Kafka或ZK上的offset,最多只能达到at-least-once,而须要依赖其余服务与存储才能实现exactly-once,而且会产生异常致使重启丢数;
  3. 框架的表达能力不足: 不能完整的表达DAG图,对于复杂的流式处理问题须要若干依赖该框架的若干个服务组合在一块儿才能解决问题;

TalkingData这两款产品主要为各种移动端App和游戏提供数据分析服务,随着近几年业务量不断扩大,须要选择一个性能更强、功能更完善的流式引擎来逐步升级咱们的流式服务。调研从2016年末开始,主要是从Flink、Heron、Spark streaming中做选择。spring

最终,咱们选择了Flink,主要基于如下几点考虑:apache

  1. Flink的容错机制完善,支持Exactly-once;
  2. Flink已经集成了较丰富的streaming operator,自定义operator也较为方便,而且能够直接调用API完成stream的split和join,能够完整的表达DAG图;
  3. Flink自主实现内存管理而不彻底依赖于JVM,能够在必定程度上避免当前的etl-framework的部分服务的Full-GC问题;
  4. Flink的window机制能够解决GA中相似于单日游戏时长游戏次数分布等时间段内某个指标的分布类问题;
  5. Flink的理念在当时的流式框架中最为超前: 将批看成流的特例,最终实现批流统一;

2、演进路线

2.1 standalone-cluster (1.1.3->1.1.5->1.3.2)api

咱们最开始是以standalone cluster的模式部署。从2017年上半年开始,咱们逐步把Game Analytics中一些小流量的etl-job迁移到Flink,到4月份时,已经将产品接收各版本SDK数据的etl-job彻底迁移至Flink,并整合成了一个job。造成了以下的数据流和stream graph:网络

clipboard.png

图1. Game Analytics-etl-adaptor迁移至Flink后的数据流图框架

clipboard.png

图2. Game Analytics-etl的stream graph分布式

在上面的数据流图中,flink-job经过Dubbo来调用etl-service,从而将访问外部存储的逻辑都抽象到了etl-service中,flink-job则不需考虑复杂的访存逻辑以及在job中自建Cache,这样既完成了服务的共用,又减轻了job自身的GC压力。ide

此外咱们自构建了一个monitor服务,由于当时的1.1.3版本的Flink可提供的监控metric少,并且因为其Kafka-connector使用的是Kafka08的低阶API,Kafka的消费offset并无提交的ZK上,所以咱们须要构建一个monitor来监控Flink的job的活性、瞬时速度、消费淤积等metric,并接入公司owl完成监控告警。性能

这时候,Flink的standalone cluster已经承接了来自Game Analytics的全部流量,日均处理消息约10亿条,总吞吐量达到12TB每日。到了暑假的时候,日均日志量上升到了18亿条天天,吞吐量达到了约20TB每日,TPS峰值为3万。

在这个过程当中,咱们又遇到了Flink的job消费不均衡、在standalone cluster上job的deploy不均衡等问题,而形成线上消费淤积,以及集群无端自动重启而自动重启后job没法成功重启。(咱们将在第三章中详细介绍这些问题中的典型表现及当时的解决方案。)

通过一个暑假后,咱们认为Flink经受了考验,所以开始将App Analytics的etl-job也迁移到Flink上。造成了以下的数据流图:

clipboard.png

图3. App Analytics-etl-adaptor的标准SDK处理工做迁移到Flink后的数据流图

clipboard.png

图4. App Analytics-etl-flink job的stream graph

2017年3月开始有大量用户开始迁移至统一的JSON SDK,新版SDK的Kafka topic的峰值流量从年中的8K/s 上涨至了年末的 3W/s。此时,整个Flink standalone cluster上一共部署了两款产品的4个job,日均吞吐量达到了35TB。

这时遇到了两个很是严重的问题:

1) 同一个standalone cluster中的job相互抢占资源,而standalone cluster的模式仅仅只能经过task slot在task manager的堆内内存上作到资源隔离。同时因为前文提到过的Flink在standalone cluster中deploy job的方式原本就会形成资源分配不均衡,从而会致使App Analytics线流量大时而引发Game Analytics线淤积的问题;

2) 咱们的source operator的并行度等同于所消费Kafka topic的partition数量,而中间作etl的operator的并行度每每会远大于Kafka的partition数量。所以最后的job graph不可能彻底被链成一条operator chain,operator之间的数据传输必须经过Flink的network buffer的申请和释放,而1.1.x 版本的network buffer在数据量大的时候很容易在其申请和释放时形成死锁,而致使Flink明明有许多消息要处理,可是大部分线程处于waiting的状态致使业务的大量延迟。

这些问题逼迫着咱们不得不将两款产品的job拆分到两个standalone cluster中,并对Flink作一次较大的版本升级,从1.1.3(中间过分到1.1.5)升级成1.3.2。最终升级至1.3.2在18年的Q1完成,1.3.2版本引入了增量式的checkpoint提交而且在性能和稳定性上比1.1.x版本作了巨大的改进。升级以后,Flink集群基本稳定,尽管还有消费不均匀等问题,可是基本能够在业务量增长时经过扩容机器来解决。

2.2 Flink on yarn (1.7.1)

由于standalone cluster的资源隔离作的并不优秀,并且还有deploy job不均衡等问题,加上社区上使用Flink on yarn已经很是成熟,所以咱们在18年的Q4就开始计划将Flink的standalone cluster迁移至Flink on yarn上,而且Flink在最近的版本中对于batch的提高较多,咱们还规划逐步使用Flink来逐步替换如今的批处理引擎。

clipboard.png

图5. Flink on yarn cluster规划

如图5,将来的Flink on yarn cluster将能够完成流式计算和批处理计算,集群的使用者能够经过一个构建service来完成stream/batch job的构建、优化和提交,job提交后,根据使用者所在的业务团队及服务客户的业务量分发到不一样的yarn队列中,此外,集群须要一个完善的监控系统,采集用户的提交记录、各个队列的流量及负载、各个job的运行时指标等等,并接入公司的OWL。

从19年的Q1开始,咱们将App Analytics的部分stream job迁移到了Flink on yarn 1.7中,又在19年Q2前完成了App Analytics全部处理统一JSON SDK的流任务迁移。当前的Flink on yarn集群的峰值处理的消息量达到30W/s,日均日志吞吐量达约到50亿条,约60TB。在Flink迁移到on yarn以后,由于版本的升级性能有所提高,且job之间的资源隔离确实优于standalone cluster。迁移后咱们使用Prometheus+Grafana的监控方案,监控更方便和直观。

咱们将在后续将Game Analytics的Flink job和日志导出的job也迁移至该on yarn集群,预计能够节约1/4的机器资源。

3、重点问题的描述与解决

在Flink实践的过程当中,咱们一路上遇到了很多坑,咱们挑出其中几个重点坑作简要讲解。

1.少用静态变量及job cancel时合理释放资源

在咱们实现Flink的operator的function时,通常均可以继承AbstractRichFunction,其已提供生命周期方法open()/close(),因此operator依赖的资源的初始化和释放应该经过重写这些方法执行。当咱们初始化一些资源,如spring context、dubbo config时,应该尽量使用单例对象持有这些资源且(在一个TaskManager中)只初始化1次,一样的,咱们在close方法中应当(在一个TaskManager中)只释放一次。

static的变量应该慎重使用,不然很容易引发job cancel而相应的资源没有释放进而致使job重启遇到问题。规避static变量来初始化可使用org.apache.flink.configuration.Configuration(1.3)或者org.apache.flink.api.java.utils.ParameterTool(1.7)来保存咱们的资源配置,而后经过ExecutionEnvironment来存放(Job提交时)和获取这些配置(Job运行时)。

示例代码:

Flink 1.3
设置及注册配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration parameters = new Configuration();
parameters.setString("zkConnects", zkConnects);
parameters.setBoolean("debug", debug);
env.getConfig().setGlobalJobParameters(parameters);

获取配置(在operator的open方法中)

@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    Configuration globConf = (Configuration) globalParams;
    debug = globConf.getBoolean("debug", false);
    String zks = globConf.getString("zkConnects", "");
    //.. do more ..
}

Flink 1.7
设置及注册配置

ParameterTool parameters = ParameterTool.fromArgs(args);

// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

获取配置

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    ParameterTool parameters = (ParameterTool)
        getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    parameters.getRequired("input");
    // .. do more ..

2.NetworkBuffer及operator chain

如前文所述,当Flink的job 的上下游Task(的subTask)分布在不一样的TaskManager节点上时(也就是上下游operator没有chained在一块儿,且相对应的subTask分布在了不一样的TaskManager节点上),就须要在operator的数据传递时申请和释放network buffer并经过网络I/O传递数据。

其过程简述以下:上游的operator产生的结果会经过RecordWriter序列化,而后申请BufferPool中的Buffer并将序列化后的结果写入Buffer,此后Buffer会被加入ResultPartition的ResultSubPartition中。ResultSubPartition中的Buffer会经过Netty传输至下一级的operator的InputGate的InputChannel中,一样的,Buffer进入InputChannel前一样须要到下一级operator所在的TaskManager的BufferPool申请,RecordReader读取Buffer并将其中的数据反序列化。BufferPool是有限的,在BufferPool为空时RecordWriter/RecordReader所在的线程会在申请Buffer的过程当中wait一段时间,具体原理能够参考:[1], [2]。

简要截图以下:

clipboard.png

图6. Flink的网络栈, 其中RP为ResultPartition、RS为ResultSubPartition、IG为InputGate、IC为inputChannel。

在使用Flink 1.1.x和1.3.x版本时,若是咱们的network buffer的数量配置的不充足且数据的吞吐量变大的时候,就会遇到以下现象:

clipboard.png

图7. 上游operator阻塞在获取network buffer的requestBuffer()方法中

clipboard.png

图8. 下游的operator阻塞在等待新数据输入

clipboard.png

图9. 下游的operator阻塞在等待新数据输入

咱们的工做线程(RecordWriter和RecordReader所在的线程)的大部分时间都花在了向BufferPool申请Buffer上,这时候CPU的使用率会剧烈的抖动,使得Job的消费速度降低,在1.1.x版本中甚至会阻塞很长的一段时间,触发整个job的背压,从而形成较严重的业务延迟。

这时候,咱们就须要经过上下游operator的并行度来计算ResultPartition和InputGate中所须要的buffer的个数,以配置充足的taskmanager.network.numberOfBuffers。

clipboard.png

图10. 不一样的network buffer对CPU使用率的影响

当配置了充足的network buffer数时,CPU抖动能够减小,Job消费速度有所提升。

在Flink 1.5以后,在其network stack中引入了基于信用度的流量传输控制(credit-based flow control)机制[2],该机制大限度的避免了在向BufferPool申请Buffer的阻塞现象,咱们初步测试1.7的network stack的性能确实比1.3要高。

但这毕竟还不是最优的状况,由于若是借助network buffer来完成上下游的operator的数据传递不能够避免的要通过序列化/反序列化的过程,并且信用度的信息传递有必定的延迟性和开销,而这个过程能够经过将上下游的operator链成一条operator chain而避免。

所以咱们在构建咱们流任务的执行图时,应该尽量多的让operator都chain在一块儿,在Kafka资源容许的状况下能够扩大Kafka的partition而使得source operator和后继的operator 链在一块儿,但也不能一味扩大Kafka topic的partition,应根据业务量和机器资源作好取舍。更详细的关于operator的training和task slot的调优能够参考: [4]。

3.Flink中所选用序列化器的建议

在上一节中咱们知道,Flink的分布在不一样节点上的Task的数据传输必须通过序列化/反序列化,所以序列化/反序列化也是影响Flink性能的一个重要因素。Flink自有一套类型体系,即Flink有本身的类型描述类(TypeInformation)。Flink但愿可以掌握尽量多的进出operator的数据类型信息,并使用TypeInformation来描述,这样作主要有如下2个缘由:

  1. 类型信息知道的越多,Flink能够选取更好的序列化方式,并使得Flink对内存的使用更加高效;
  2. TypeInformation内部封装了本身的序列化器,可经过createSerializer()获取,这样可让用户再也不操心序列化框架的使用(例如如何将他们自定义的类型注册到序列化框架中,尽管用户的定制化和注册能够提升性能)。

整体上来讲,Flink推荐咱们在operator间传递的数据是POJOs类型,对于POJOs类型,Flink默认会使用Flink自身的PojoSerializer进行序列化,而对于Flink没法本身描述或推断的数据类型,Flink会将其识别为GenericType,并使用Kryo进行序列化。Flink在处理POJOs时更高效,此外POJOs类型会使得stream的grouping/joining/aggregating等操做变得简单,由于可使用如:dataSet.keyBy("username") 这样的方式直接操做数据流中的数据字段。

除此以外,咱们还能够作进一步的优化:
1) 显示调用returns方法,从而触发Flink的Type Hint:
dataStream.flatMap(new MyOperator()).returns(MyClass.class)
returns方法最终会调用TypeExtractor.createTypeInfo(typeClass) ,用以构建咱们自定义的类型的TypeInformation。createTypeInfo方法在构建TypeInformation时,若是咱们的类型知足POJOs的规则或Flink中其余的基本类型的规则,会尽量的将咱们的类型“翻译”成Flink熟知的类型如POJOs类型或其余基本类型,便于Flink自行使用更高效的序列化方式。

//org.apache.flink.api.java.typeutils.PojoTypeInfo

@Override
@PublicEvolving
@SuppressWarnings("unchecked")
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
   if (config.isForceKryoEnabled()) {
      return new KryoSerializer<>(getTypeClass(), config);
   }

   if (config.isForceAvroEnabled()) {
      return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
   }

   return createPojoSerializer(config);
}

对于Flink没法“翻译”的类型,则返回GenericTypeInfo,并使用Kryo序列化:

//org.apache.flink.api.java.typeutils.TypeExtractor

@SuppressWarnings({ "unchecked", "rawtypes" })
private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
      ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
   checkNotNull(clazz);

   // 尝试将 clazz转换为 PrimitiveArrayTypeInfo, BasicArrayTypeInfo, ObjectArrayTypeInfo
   // BasicTypeInfo, PojoTypeInfo 等,具体源码已省略
  //...
   
   //若是上述尝试不成功 , 则return a generic type
   return new GenericTypeInfo<OUT>(clazz);
}

2) 注册subtypes: 经过StreamExecutionEnvironment或ExecutionEnvironment的实例的registerType(clazz)方法注册咱们的数据类及其子类、其字段的类型。若是Flink对类型知道的越多,性能会更好;

3) 若是还想作进一步的优化,Flink还容许用户注册本身定制的序列化器,手动建立本身类型的TypeInformation,具体能够参考Flink官网:[3];

在咱们的实践中,最初为了扩展性,在operator之间传递的数据为JsonNode,可是咱们发现性能达不到预期,所以将JsonNode改为了符合POJOs规范的类型,在1.1.x的Flink版本上直接得到了超过30%的性能提高。在咱们调用了Flink的Type Hint和env.getConfig().enableForceAvro()后,性能获得进一步提高。这些方法一直沿用到了1.3.x版本。

在升级至1.7.x时,若是使用env.getConfig().enableForceAvro()这个配置,咱们的代码会引发校验空字段的异常。所以咱们取消了这个配置,并尝试使用Kyro进行序列化,而且注册咱们的类型的全部子类到Flink的ExecutionEnvironment中,目前看性能尚可,并优于旧版本使用Avro的性能。可是最佳实践还须要通过比较和压测KryoSerializerAvroUtils.getAvroUtils().createAvroSerializerPojoSerializer才能总结出来,你们仍是应该根据本身的业务场景和数据类型来合理挑选适合本身的serializer。

4.Standalone模式下job的deploy与资源隔离共享

结合咱们以前的使用经验,Flink的standalone cluster在发布具体的job时,会有必定的随机性。举个例子,若是当前集群总共有2台8核的机器用以部署TaskManager,每台机器上一个TaskManager实例,每一个TaskManager的TaskSlot为8,而咱们的job的并行度为12,那么就有可能会出现下图的现象:

第一个TaskManager的slot全被占满,而第二个TaskManager只使用了一半的资源!资源严重不平衡,随着job处理的流量加大,必定会形成TM1上的task消费速度慢,而TM2上的task消费速度远高于TM1的task的状况。假设业务量的增加迫使咱们不得不扩大job的并行度为24,而且扩容2台性能更高的机器(12核),在新的机器上,咱们分别部署slot数为12的TaskManager。通过扩容后,集群的TaskSlot的占用可能会造成下图:

新扩容的配置高的机器并无去承担更多的Task,老机器的负担仍然比较严重,资源本质上仍是不均匀!

除了standalone cluster模式下job的发布策略形成不均衡的状况外,还有资源隔离差的问题。由于咱们在一个cluster中每每会部署不止一个job,而这些job在每台机器上都共用JVM,天然会形成资源的竞争。起初,咱们为了解决这些问题,采用了以下的解决方法:

  1. 将TaskManager的粒度变小,即一台机器部署多个实例,每一个实例持有的slot数较少;
  2. 将大的业务job隔离到不一样的集群上。

这些解决方法增长了实例数和集群数,进而增长了维护成本。所以咱们决定要迁移到on yarn上,目前看Flink on yarn的资源分配和资源隔离确实比standalone模式要优秀一些。

4、总结与展望

Flink在2016年时仅为星星之火,而只用短短两年的时间就成长为了当前最为煊赫一时的流处理平台,并且大有统一批与流之势。通过两年的实践,Flink已经证实了它可以承接TalkingData的App Analytics和Game Analytics两个产品的流处理需求。接下来咱们会将更复杂的业务和批处理迁移到Flink上,完成集群部署和技术栈的统一,最终实现图5 中Flink on yarn cluster 的规划,以更少的成原本支撑更大的业务量。

参考资料:
[1] https://cwiki.apache.org/conf...
[2] https://flink.apache.org/2019...
[3] https://ci.apache.org/project...
[4] https://mp.weixin.qq.com/s/XR...

做者简介:肖强:TalkingData资深工程师,TalkingData统计分析产品App Analytics和Game Analytics技术负责人。硕士毕业于北京航空航天大学,主要从事大数据平台开发,对流式计算和分布式存储有必定研究。

相关文章
相关标签/搜索