Apache Flink 的迁移之路,2 年处理效果提高 5 倍

做者:肖强(TalkingData 资深工程师)html

1、背景与痛点

在 2017 年上半年之前,TalkingData 的 App Analytics 和 Game Analytics 两个产品,流式框架使用的是自研的 td-etl-framework。该框架下降了开发流式任务的复杂度,对于不一样的任务只须要实现一个 changer 链便可,而且支持水平扩展,性能尚可,曾经能够知足业务需求。java

可是到了 2016 年末和 2017 年上半年,发现这个框架存在如下重要局限:spring

  1. 性能隐患:App Analytics-etl-adaptor 和 Game Analytics-etl-adaptor 这两个模块相继在节假日出现了严重的性能问题(Full-GC),致使指标计算延迟。
  2. 框架的容错机制不足:依赖于保存在 Kafka 或 ZK 上的 offset,最多只能达到 at-least-once,而须要依赖其余服务与存储才能实现 exactly-once,而且会产生异常致使重启丢数。
  3. 框架的表达能力不足: 不能完整的表达 DAG 图,对于复杂的流式处理问题须要若干依赖该框架的若干个服务组合在一块儿才能解决问题。

TalkingData 这两款产品主要为各种移动端 App 和游戏提供数据分析服务,随着近几年业务量不断扩大,须要选择一个性能更强、功能更完善的流式引擎来逐步升级咱们的流式服务。调研从 2016 年末开始,主要是从 Flink、Heron、Spark streaming 中做选择。apache

最终,咱们选择了 Flink,主要基于如下几点考虑:api

  1. Flink 的容错机制完善,支持 Exactly-once。
  2. Flink 已经集成了较丰富的 streaming operator,自定义 operator 也较为方便,而且能够直接调用 API 完成 stream 的 split 和 join,能够完整的表达 DAG 图。
  3. Flink 自主实现内存管理而不彻底依赖于 JVM,能够在必定程度上避免当前的 etl-framework 的部分服务的 Full-GC 问题。
  4. Flink 的 window 机制能够解决GA中相似于单日游戏时长游戏次数分布等时间段内某个指标的分布类问题。
  5. Flink 的理念在当时的流式框架中最为超前: 将批看成流的特例,最终实现批流统一。

2、演进路线

1. standalone-cluster (1.1.3->1.1.5->1.3.2)

咱们最开始是以 standalone cluster 的模式部署。从 2017 年上半年开始,咱们逐步把 Game Analytics 中一些小流量的 etl-job 迁移到 Flink,到 4 月份时,已经将产品接收各版本 SDK 数据的 etl-job 彻底迁移至 Flink,并整合成了一个 job。造成了以下的数据流和 stream graph:网络

1

图1. Game Analytics-etl-adaptor 迁移至 Flink 后的数据流图框架

2

图2. Game Analytics-etl 的 stream graphide

在上面的数据流图中,flink-job 经过 Dubbo 来调用 etl-service,从而将访问外部存储的逻辑都抽象到了 etl-service 中,flink-job 则不需考虑复杂的访存逻辑以及在 job 中自建 Cache,这样既完成了服务的共用,又减轻了 job 自身的 GC 压力。性能

此外咱们自构建了一个 monitor 服务,由于当时的 1.1.3 版本的 Flink 可提供的监控 metric 少,并且因为其 Kafka-connector 使用的是 Kafka08 的低阶 API,Kafka 的消费 offset 并无提交的 ZK 上,所以咱们须要构建一个 monitor 来监控 Flink 的 job 的活性、瞬时速度、消费淤积等 metric,并接入公司 owl 完成监控告警。测试

这时候,Flink 的 standalone cluster 已经承接了来自 Game Analytics 的全部流量,日均处理消息约 10 亿条,总吞吐量达到 12 TB 每日。到了暑假的时候,日均日志量上升到了 18 亿条天天,吞吐量达到了约 20 TB 每日,TPS 峰值为 3 万。

在这个过程当中,咱们又遇到了 Flink 的 job 消费不均衡、在 standalone cluster 上 job 的 deploy 不均衡等问题,而形成线上消费淤积,以及集群无端自动重启而自动重启后 job 没法成功重启。(咱们将在第三章中详细介绍这些问题中的典型表现及当时的解决方案。)

通过一个暑假后,咱们认为 Flink 经受了考验,所以开始将 App Analytics 的 etl-job 也迁移到 Flink 上。造成了以下的数据流图:

3

图3. App Analytics-etl-adaptor 的标准 SDK 处理工做迁移到 Flink 后的数据流图

4

图4. App Analytics-etl-flink job 的 stream graph

2017 年 3 月开始有大量用户开始迁移至统一的 JSON SDK,新版 SDK 的 Kafka topic 的峰值流量从年中的 8 K/s 上涨至了年末的 3 W/s。此时,整个 Flink standalone cluster 上一共部署了两款产品的 4 个 job,日均吞吐量达到了 35 TB。

这时遇到了两个很是严重的问题:

  • 同一个 standalone cluster 中的 job 相互抢占资源,而 standalone cluster 的模式仅仅只能经过 task slot 在 task manager 的堆内内存上作到资源隔离。同时因为前文提到过的 Flink 在 standalone cluster 中 deploy job 的方式原本就会形成资源分配不均衡,从而会致使 App Analytics 线流量大时而引发Game Analytics 线淤积的问题。
  • 咱们的 source operator 的并行度等同于所消费 Kafka topic 的 partition 数量,而中间作 etl 的 operator 的并行度每每会远大于 Kafka 的 partition 数量。所以最后的 job graph 不可能彻底被链成一条 operator chain,operator 之间的数据传输必须经过 Flink 的 network buffer 的申请和释放,而 1.1.x 版本的 network buffer 在数据量大的时候很容易在其申请和释放时形成死锁,而致使 Flink 明明有许多消息要处理,可是大部分线程处于 waiting 的状态致使业务的大量延迟。

这些问题逼迫着咱们不得不将两款产品的 job 拆分到两个 standalone cluster 中,并对 Flink 作一次较大的版本升级,从 1.1.3(中间过分到 1.1.5)升级成 1.3.2。最终升级至 1.3.2 在 18 年的 Q1 完成,1.3.2 版本引入了增量式的 checkpoint 提交而且在性能和稳定性上比 1.1.x 版本作了巨大的改进。升级以后,Flink 集群基本稳定,尽管还有消费不均匀等问题,可是基本能够在业务量增长时经过扩容机器来解决。

2. Flink on yarn (1.7.1)

由于 standalone cluster 的资源隔离作的并不优秀,并且还有 deploy job 不均衡等问题,加上社区上使用 Flink on yarn 已经很是成熟,所以咱们在 18 年的 Q4 就开始计划将 Flink 的 standalone cluster 迁移至 Flink on yarn 上,而且 Flink 在最近的版本中对于 batch 的提高较多,咱们还规划逐步使用 Flink 来逐步替换如今的批处理引擎。

5

图5. Flink on yarn cluster 规划

如图 5,将来的 Flink on yarn cluster 将能够完成流式计算和批处理计算,集群的使用者能够经过一个构建 service 来完成 stream/batch job 的构建、优化和提交,job 提交后,根据使用者所在的业务团队及服务客户的业务量分发到不一样的 yarn 队列中,此外,集群须要一个完善的监控系统,采集用户的提交记录、各个队列的流量及负载、各个 job 的运行时指标等等,并接入公司的 OWL。

从 19 年的 Q1 开始,咱们将 App Analytics 的部分 stream job 迁移到了 Flink on yarn 1.7 中,又在 19 年 Q2 前完成了 App Analytics 全部处理统一 JSON SDK 的流任务迁移。当前的 Flink on yarn 集群的峰值处理的消息量达到 30 W/s,日均日志吞吐量达约到 50 亿条,约 60 TB。在 Flink 迁移到 on yarn 以后,由于版本的升级性能有所提高,且 job 之间的资源隔离确实优于 standalone cluster。迁移后咱们使用 Prometheus+Grafana 的监控方案,监控更方便和直观。

咱们将在后续将 Game Analytics 的 Flink job 和日志导出的 job 也迁移至该 on yarn 集群,预计能够节约 1/4 的机器资源。

3、重点问题的描述与解决

在 Flink 实践的过程当中,咱们一路上遇到了很多坑,咱们挑出其中几个重点坑作简要讲解。

1. 少用静态变量及 job cancel 时合理释放资源

在咱们实现 Flink 的 operator 的 function 时,通常均可以继承 AbstractRichFunction,其已提供生命周期方法 open()/close(),因此 operator 依赖的资源的初始化和释放应该经过重写这些方法执行。当咱们初始化一些资源,如 spring context、dubbo config 时,应该尽量使用单例对象持有这些资源且(在一个 TaskManager 中)只初始化 1 次,一样的,咱们在 close 方法中应当(在一个 TaskManager 中)只释放一次。

static 的变量应该慎重使用,不然很容易引发 job cancel 而相应的资源没有释放进而致使 job 重启遇到问题。规避 static 变量来初始化可使用 org.apache.flink.configuration.Configuration(1.3)或者 org.apache.flink.api.java.utils.ParameterTool(1.7)来保存咱们的资源配置,而后经过 ExecutionEnvironment 来存放(Job提交时)和获取这些配置(Job运行时)。

示例代码:

Flink 1.3 设置及注册配置:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration parameters = new Configuration();
parameters.setString("zkConnects", zkConnects);
parameters.setBoolean("debug", debug);
env.getConfig().setGlobalJobParameters(parameters);

获取配置(在 operator 的 open 方法中)。

@Override
public void open(Configuration parameters) throws Exception {
    super.open(parameters);
    ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    Configuration globConf = (Configuration) globalParams;
    debug = globConf.getBoolean("debug", false);
    String zks = globConf.getString("zkConnects", "");
    //.. do more ..
}

Flink 1.7 设置及注册配置:

ParameterTool parameters = ParameterTool.fromArgs(args);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

获取配置:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    ParameterTool parameters = (ParameterTool)
        getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
    parameters.getRequired("input");
    // .. do more ..

2. NetworkBuffer 及 operator chain

如前文所述,当 Flink 的 job 的上下游 Task(的 subTask)分布在不一样的 TaskManager 节点上时(也就是上下游 operator 没有 chained 在一块儿,且相对应的 subTask 分布在了不一样的 TaskManager 节点上),就须要在 operator 的数据传递时申请和释放 network buffer 并经过网络 I/O 传递数据。

其过程简述以下:上游的 operator 产生的结果会经过 RecordWriter 序列化,而后申请 BufferPool 中的 Buffer 并将序列化后的结果写入 Buffer,此后 Buffer 会被加入 ResultPartition 的 ResultSubPartition 中。ResultSubPartition 中的 Buffer 会经过 Netty 传输至下一级的 operator 的 InputGate 的 InputChannel 中,一样的,Buffer 进入 InputChannel 前一样须要到下一级 operator 所在的 TaskManager 的 BufferPool 申请,RecordReader 读取 Buffer 并将其中的数据反序列化。BufferPool 是有限的,在 BufferPool 为空时 RecordWriter / RecordReader 所在的线程会在申请 Buffer 的过程当中等待一段时间,具体原理能够参考:[1], [2]。

简要截图以下:

7

图6. Flink 的网络栈, 其中 RP 为 ResultPartition、RS 为 ResultSubPartition、IG 为 InputGate、IC 为 inputChannel

在使用 Flink 1.1.x 和 1.3.x 版本时,若是咱们的 network buffer 的数量配置的不充足且数据的吞吐量变大的时候,就会遇到以下现象:

8

图7. 上游 operator 阻塞在获取 network buffer 的 requestBuffer() 方法中

9

图8. 下游的 operator 阻塞在等待新数据输入

10

图9. 下游的 operator 阻塞在等待新数据输入

咱们的工做线程(RecordWriter 和 RecordReader 所在的线程)的大部分时间都花在了向 BufferPool 申请 Buffer 上,这时候 CPU 的使用率会剧烈的抖动,使得 Job 的消费速度降低,在 1.1.x 版本中甚至会阻塞很长的一段时间,触发整个 job 的背压,从而形成较严重的业务延迟。

这时候,咱们就须要经过上下游 operator 的并行度来计算 ResultPartition 和 InputGate 中所须要的 buffer 的个数,以配置充足的 taskmanager.network.numberOfBuffers。

11

图10. 不一样的 network buffer 对 CPU 使用率的影响

当配置了充足的 network buffer 数时,CPU 抖动能够减小,Job 消费速度有所提升。

在 Flink 1.5 以后,在其 network stack 中引入了基于信用度的流量传输控制(credit-based flow control)机制[2],该机制大限度的避免了在向 BufferPool 申请 Buffer 的阻塞现象,咱们初步测试 1.7 的 network stack 的性能确实比 1.3 要高。

但这毕竟还不是最优的状况,由于若是借助 network buffer 来完成上下游的 operator 的数据传递不能够避免的要通过序列化/反序列化的过程,并且信用度的信息传递有必定的延迟性和开销,而这个过程能够经过将上下游的 operator 链成一条 operator chain 而避免。

所以咱们在构建咱们流任务的执行图时,应该尽量多的让 operator 都 chain 在一块儿,在 Kafka 资源容许的状况下能够扩大 Kafka 的 partition 而使得 source operator 和后继的 operator 链在一块儿,但也不能一味扩大 Kafka topic 的 partition,应根据业务量和机器资源作好取舍。更详细的关于 operator 的 training 和 task slot 的调优能够参考: [4]。

3. Flink 中所选用序列化器的建议

在上一节中咱们知道,Flink 的分布在不一样节点上的 Task 的数据传输必须通过序列化/反序列化,所以序列化/反序列化也是影响 Flink 性能的一个重要因素。Flink 自有一套类型体系,即 Flink 有本身的类型描述类(TypeInformation)。Flink 但愿可以掌握尽量多的进出 operator 的数据类型信息,并使用 TypeInformation 来描述,这样作主要有如下 2 个缘由:

  • 类型信息知道的越多,Flink 能够选取更好的序列化方式,并使得 Flink 对内存的使用更加高效;
  • TypeInformation 内部封装了本身的序列化器,可经过 createSerializer() 获取,这样可让用户再也不操心序列化框架的使用(例如如何将他们自定义的类型注册到序列化框架中,尽管用户的定制化和注册能够提升性能)。

整体上来讲,Flink 推荐咱们在 operator 间传递的数据是 POJOs 类型,对于 POJOs 类型,Flink 默认会使用 Flink 自身的 PojoSerializer 进行序列化,而对于 Flink 没法本身描述或推断的数据类型,Flink 会将其识别为 GenericType,并使用 Kryo 进行序列化。Flink 在处理 POJOs 时更高效,此外 POJOs 类型会使得 stream 的 grouping/joining/aggregating 等操做变得简单,由于可使用如: dataSet.keyBy("username") 这样的方式直接操做数据流中的数据字段。

除此以外,咱们还能够作进一步的优化:

  • 显示调用 returns 方法,从而触发 Flink 的 Type Hint:
dataStream.flatMap(new MyOperator()).returns(MyClass.class)

returns 方法最终会调用 TypeExtractor.createTypeInfo(typeClass) ,用以构建咱们自定义的类型的 TypeInformation。createTypeInfo 方法在构建 TypeInformation 时,若是咱们的类型知足 POJOs 的规则或 Flink 中其余的基本类型的规则,会尽量的将咱们的类型“翻译”成 Flink 熟知的类型如 POJOs 类型或其余基本类型,便于 Flink 自行使用更高效的序列化方式。

//org.apache.flink.api.java.typeutils.PojoTypeInfo
@Override
@PublicEvolving
@SuppressWarnings("unchecked")
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
   if (config.isForceKryoEnabled()) {
      return new KryoSerializer<>(getTypeClass(), config);
   }

   if (config.isForceAvroEnabled()) {
      return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
   }

   return createPojoSerializer(config);
}

对于 Flink 没法“翻译”的类型,则返回 GenericTypeInfo,并使用 Kryo 序列化:

//org.apache.flink.api.java.typeutils.TypeExtractor

@SuppressWarnings({ "unchecked", "rawtypes" })
private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
      ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
   checkNotNull(clazz);
   // 尝试将 clazz转换为 PrimitiveArrayTypeInfo, BasicArrayTypeInfo, ObjectArrayTypeInfo
   // BasicTypeInfo, PojoTypeInfo 等,具体源码已省略
  //...

   //若是上述尝试不成功 , 则return a generic type
   return new GenericTypeInfo<OUT>(clazz);
}
  • 注册 subtypes: 经过 StreamExecutionEnvironment 或 ExecutionEnvironment 的实例的 registerType(clazz) 方法注册咱们的数据类及其子类、其字段的类型。若是 Flink 对类型知道的越多,性能会更好
  • 若是还想作进一步的优化,Flink 还容许用户注册本身定制的序列化器,手动建立本身类型的 TypeInformation,具体能够参考 Flink 官网:[3];

在咱们的实践中,最初为了扩展性,在 operator 之间传递的数据为 JsonNode,可是咱们发现性能达不到预期,所以将 JsonNode 改为了符合 POJOs 规范的类型,在 1.1.x 的 Flink 版本上直接得到了超过 30% 的性能提高。在咱们调用了 Flink 的 Type Hint 和 env.getConfig().enableForceAvro() 后,性能获得进一步提高。这些方法一直沿用到了 1.3.x 版本。

在升级至 1.7.x 时,若是使用 env.getConfig().enableForceAvro() 这个配置,咱们的代码会引发校验空字段的异常。所以咱们取消了这个配置,并尝试使用 Kyro 进行序列化,而且注册咱们的类型的全部子类到 Flink 的 ExecutionEnvironment 中,目前看性能尚可,并优于旧版本使用 Avro 的性能。可是最佳实践还须要通过比较和压测 KryoSerializerAvroUtils.getAvroUtils().createAvroSerializerPojoSerializer 才能总结出来,你们仍是应该根据本身的业务场景和数据类型来合理挑选适合本身的 serializer。

4. Standalone 模式下 job 的 deploy 与资源隔离共享

结合咱们以前的使用经验,Flink 的 standalone cluster 在发布具体的 job 时,会有必定的随机性。举个例子,若是当前集群总共有 2 台 8 核的机器用以部署 TaskManager,每台机器上一个 TaskManager 实例,每一个 TaskManager 的 TaskSlot 为 8,而咱们的 job 的并行度为 12,那么就有可能会出现下图的现象:

12

第一个 TaskManager 的 slot 全被占满,而第二个 TaskManager 只使用了一半的资源!资源严重不平衡,随着 job 处理的流量加大,必定会形成 TM1 上的 task 消费速度慢,而 TM2 上的 task 消费速度远高于 TM1 的 task 的状况。假设业务量的增加迫使咱们不得不扩大 job 的并行度为 24,而且扩容2台性能更高的机器(12核),在新的机器上,咱们分别部署 slot 数为 12 的 TaskManager。通过扩容后,集群的 TaskSlot 的占用可能会造成下图:

14

新扩容的配置高的机器并无去承担更多的 Task,老机器的负担仍然比较严重,资源本质上仍是不均匀!

除了 standalone cluster 模式下 job 的发布策略形成不均衡的状况外,还有资源隔离差的问题。由于咱们在一个 cluster 中每每会部署不止一个 job,而这些 job 在每台机器上都共用 JVM,天然会形成资源的竞争。起初,咱们为了解决这些问题,采用了以下的解决方法:

  1. 将 TaskManager 的粒度变小,即一台机器部署多个实例,每一个实例持有的 slot 数较少;
  2. 将大的业务 job 隔离到不一样的集群上。

这些解决方法增长了实例数和集群数,进而增长了维护成本。所以咱们决定要迁移到 on yarn 上,目前看 Flink on yarn 的资源分配和资源隔离确实比 standalone 模式要优秀一些。

4、总结与展望

Flink 在 2016 年时仅为星星之火,而只用短短两年的时间就成长为了当前最为煊赫一时的流处理平台,并且大有统一批与流之势。通过两年的实践,Flink 已经证实了它可以承接 TalkingData 的 App Analytics 和 Game Analytics 两个产品的流处理需求。接下来咱们会将更复杂的业务和批处理迁移到 Flink 上,完成集群部署和技术栈的统一,最终实现图 5 中 Flink on yarn cluster 的规划,以更少的成原本支撑更大的业务量。

参考资料:

[1]https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
[2]https://flink.apache.org/2019/06/05/flink-network-stack.html
[3]https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/types_serialization.html#type-hints-in-the-java-api
[4]Flink Slot 详解与 Job Execution Graph 优化

了解更多 Flink 企业级应用案例可关注 Flink Forward Asia 企业实践专场,届时来自字节跳动、滴滴出行、快手、Bilibili、网易、爱奇艺、中国农业银行、奇虎360、贝壳找房、奇安信等众多一线大厂技术专家现场分享 Flink 在各行业的应用效果与探索思路。

大会倒计时 25 天!11 月 28-30 日,北京国家会议中心,扫描下方二维码马上报名,来参与一场思惟升级的技术盛宴~

点击了解更多大会议程:https://developer.aliyun.com/special/ffa2019-conference?spm=a2c6h.13239638.0.0.21f27955RWcLlb

11月28日下午,企业实践专题分享

_1

11月29日上午,企业实践专题分享

_2

 

阅读原文

本文为云栖社区原创内容,未经容许不得转载。

相关文章
相关标签/搜索