简介: 在 Flink 现有的架构设计中,多流 Join 拓扑下单个 Task 失败会致使全部 Task 从新部署,耗时可能会持续几分钟,致使做业的输出断流,这对于线上业务来讲是不可接受的。针对这一痛点,字节提出单点恢复的方案。缓存
在字节跳动的实时计算场景中,咱们有不少任务(数量 2k+)会直接服务于线上,其输出时延和稳定性会直接影响线上产品的用户体验,这类任务一般具备以下特色:网络
在 Flink 现有的架构设计中,多流 Join 拓扑下单个 Task 失败会致使全部 Task 从新部署,耗时可能会持续几分钟,致使做业的输出断流,这对于线上业务来讲是不可接受的。多线程
针对这一痛点,咱们提出单点恢复的方案,经过对 network 层的加强,使得在机器下线或者 Task 失败的状况下,以短期内故障 Task 的部分数据丢失为代价,达成如下目标:架构
当初遇到这些问题的时候,咱们提出的想法是说能不能在机器故障下线的时候,只让在这台机器上的 Tasks 进行 Failover,而这些 Tasks 的上下游 Tasks 能刚好感知到这些失败的 Tasks,并做出对应的措施:并发
根据这些想法咱们思考得出几个比较关键点在于:ide
基于以上考虑咱们决定基于已有的 Network 层线程模型,修改上下游对于 Task Failed 后的处理逻辑,让非故障的 Tasks 短期内完成对失败 Task 的感知操做,从而使得做业持续稳定地输出。性能
注:咱们的实现基于 Flink-1.9,1.11 后的网络模型加入了 Unaligned Checkpoint 的特性,可能会有所变化。测试
咱们先将 Flink 的上下游 Task 通讯模型简单抽象一下:优化
上下游 Task 感知彼此状态的逻辑,分三种状况考虑:spa
能够看到,在大部分状况下,Task 是能够直接感知到上下游 Task 的状态变化。了解了基础的通讯模型以后,咱们能够按照以前的解决思路继续深刻一下,分别在上游发送端和下游接收端能够作什么样改进来实现单点恢复。
根据咱们的解决思路,咱们来绘制一下单个 Task 挂了以后,整个 Job 的通讯流程:
Map(1) 失败以后:
从这个流程,咱们能够将优化分为三个模块,分别为上游发送端、下游接收端和 JobManager。
咱们再细化一下上游发送端的相关细节,
(1) Netty Server 收到 Client 发送的 Partition Request 后,在对应的 Subpartition 注册读取数据的 SubpartitionView 和 Reader。
(2) RecordWriter 发送数据到不一样的 Subpartitions,每一个 Subpartition 内部维护一个 buffer 队列,并将读取数据的 Reader 放入到 Readers Queue 中。(Task 线程)
(3) Netty 线程读取 Readers Queue,取出对应的 Reader 并读取对应 Subpartition 中的 buffer 数据,发送给下游。(Netty 线程)
咱们的指望是上游发送端在感知到下游 Task 失败以后,直接将发送到对应 Task 的数据丢弃。那么咱们的改动逻辑,在这个示意图中,就是 Subpartition 经过 Netty Server 收到下游 Task Fail 的消息后,将本身设置为 Unavailable,而后 RecordWriter 在发送数据到指定 Subpartition 时,判断是否可用,若是不可用则直接将数据丢弃。而当 Task Failover 完成后从新与上游创建链接后,再将该 Subpartition 置为 Available,则数据能够从新被消费。
发送端的改动比较简单,得益于 Flink 内部对 Subpartition 的逻辑作了很好的抽象,而且能够很容易的经过参数来切换 Subpartition 初始化的类型,咱们在这里参考 PipelinedSubpartition 的实现,根据上述的逻辑,实现了咱们本身的 Subpartition 和对应的 View。
一样,咱们来细化一下下游接收端的细节:
仔细来看,其实和上游的线程模型很有相似之处:
(1) InputGate 初始化全部的 Channel 并经过 Netty Client 和上游 Server 创建链接。
(2) InputChannel 接收到数据后,缓存到 buffer 队列中并将本身的引用放入到 Channels Queue 里。(Netty 线程)
(3) InputGate 经过 InputProcessor 的调用,从 Queue 里拉取 Channel 并读取 Channel 中缓存的 buffer 数据,若是 buffer 不完整(好比只有半条 record),那么则会将不完整的 buffer 暂存到 InputProcessor 中。(Task 线程)
这里咱们指望下游接收端感知到上游 Task 失败以后,能将对应 InputChannel 的接收到的不完整的 buffer 直接清除。不完整的 buffer 存储在 InputProcessor 中,那么咱们如何让 InputProcessor 知道哪一个 Channel 出现了问题?
简单的方案是说,咱们在 InputChannel 中直接调用 InputGate 或者 InputProcessor,作 buffer 清空的操做,可是这样引入一个问题,因为 InputChannel 收到 Error 是在 Netty 线程,而 InputProcessor 的操做是在 Task 线程,这样跨线程的调用打破了已有的线程模型,必然会引入锁和调用时间的不肯定性,增长架构设计的复杂度,而且由于 InputProcessor 会对每一条 record 都有调用,稍有不慎就会带来性能的降低。
咱们沿用已有的线程模型,Client 感知到上游 Task 失败的消息以后告知对应的 Channel,Channel 向本身维护的 receivedBuffers 的末尾插入一个 UnavailableEvent,并等待 InputProcessor 拉取并清空对应 Channel 的 buffer 数据。示意图以下所示,红色的模块是咱们新增的部分:
JobManager 重启策略能够参考社区已有的 RestartIndividualStrategy,比较重要的区别是,在从新 deploy 这个失败的 Task 后,咱们须要经过 ExecutionGraph 中的拓扑信息,找到该 Task 的下游 Tasks,并经过 Rpc 调用让下游 Tasks 和这个新的上游 Tasks 从新创建链接。
这里实现有一个难点是若是 JobManager 去 update 下游的 Channel 信息时,旧的 Channel 对应的 buffer 数据尚未被清除怎么办?咱们这里经过新增 CachedChannelProvider 来处理这一逻辑:
如图所示,以 Channel - 1 为例,若是 JobManager 更新 Channel 的 Rpc 请求到来时 Channel 处于不可用状态,那么咱们直接利用 Rpc 请求中携带的 Channel 信息来从新初始化 Channel。以 Channel - 3 为例,若是 Rpc 请求到来时 Channel 仍然可用,那么咱们将 Channel 信息暂时缓存起来,等 Channel - 3 中全部数据消费完毕后,通知 CachedChannelProvider,而后再经过 CachedChannelProvider 去更新 Channel。
这里还须要特别提到一点,在字节跳动内部咱们实现了预留 TaskManager 的功能,当 Task 出现 Failover 时,可以直接使用 TaskManager 的资源,大大节约了 Failover 过程数据丢失的损耗。
整个解决的思路实际上是比较清晰的,相信你们也比较容易理解,可是在实现中仍然有不少须要注意的地方,举例以下:
目前在字节跳动内部,单点恢复功能已经上线了 1000+ 做业,在机器下线、网络抖动的状况下,下游在上游做业作 Failover 的过程几乎没有感知。
接下来咱们如下面这个做业拓扑为例,在做业正常运行时咱们手动 Kill 一个 Container,来看看不一样并行度做业开启单点恢复的效果:
咱们在 1000 和 4000 并行度的做业上进行测试,每一个 slot 中有 2 个 Source 和 1 个 Joiner 共 3 个 Task,手动 Kill 一个 Container 后,从故障恢复时间和断流影响两个维度进行收益计算:
结论:能够看到,在 4000 个 Slot 的做业里,若是不开启单点恢复,做业总体的 Failover 时间为 81s,同时对于下游服务来讲,上游服务断流 81s,这在实时服务线上的场景中明显是不可接受的。而开启了单点恢复和预留资源后,Kill 1 个 Container 只会影响 4 个 Slot,且 Failover 的时间只有 5s,同时对于下游服务来讲,上游服务产生的数据减小 4/4000=千分之一,持续 5s,效果是很是显而易见的。