做者:张俊
整理:张友亮(Apache Flink 社区志愿者)网络
本文共 4745字,预计阅读时间 15min。架构
本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、OPPO 大数据平台研发负责人张俊老师分享。主要内容以下:大数据
首先咱们能够看下这张最精简的网络流控的图,Producer 的吞吐率是 2MB/s,Consumer 是 1MB/s,这个时候咱们就会发如今网络通讯的时候咱们的 Producer 的速度是比 Consumer 要快的,有 1MB/s 的这样的速度差,假定咱们两端都有一个 Buffer,Producer 端有一个发送用的 Send Buffer,Consumer 端有一个接收用的 Receive Buffer,在网络端的吞吐率是 2MB/s,过了 5s 后咱们的 Receive Buffer 可能就撑不住了,这时候会面临两种状况:优化
为了解决这个问题,咱们就须要网络流控来解决上下游速度差的问题,传统的作法能够在 Producer 端实现一个相似 Rate Limiter 这样的静态限流,Producer 的发送速率是 2MB/s,可是通过限流这一层后,往 Send Buffer 去传数据的时候就会降到 1MB/s 了,这样的话 Producer 端的发送速率跟 Consumer 端的处理速率就能够匹配起来了,就不会致使上述问题。可是这个解决方案有两点限制:spa
针对静态限速的问题咱们就演进到了动态反馈(自动反压)的机制,咱们须要 Consumer 可以及时的给 Producer 作一个 feedback,即告知 Producer 可以承受的速率是多少。动态反馈分为两种:计算机网络
让咱们来看几个经典案例线程
上图就是 Storm 里实现的反压机制,能够看到 Storm 在每个 Bolt 都会有一个监测反压的线程(Backpressure Thread),这个线程一但检测到 Bolt 里的接收队列(recv queue)出现了严重阻塞就会把这个状况写到 ZooKeeper 里,ZooKeeper 会一直被 Spout 监听,监听到有反压的状况就会中止发送,经过这样的方式匹配上下游的发送接收速率。3d
Spark Streaming 里也有作相似这样的 feedback 机制,上图 Fecher 会实时的从 Buffer、Processing 这样的节点收集一些指标而后经过 Controller 把速度接收的状况再反馈到 Receiver,实现速率的匹配。orm
首先在解决这个疑问以前咱们须要先了解一下 Flink 的网络传输是一个什么样的架构。中间件
这张图就体现了 Flink 在作网络传输的时候基本的数据的流向,发送端在发送网络数据前要经历本身内部的一个流程,会有一个本身的 Network Buffer,在底层用 Netty 去作通讯,Netty 这一层又有属于本身的 ChannelOutbound Buffer,由于最终是要经过 Socket 作网络请求的发送,因此在 Socket 也有本身的 Send Buffer,一样在接收端也有对应的三级 Buffer。学过计算机网络的时候咱们应该了解到,TCP 是自带流量控制的。实际上 Flink (before V1.5)就是经过 TCP 的流控机制来实现 feedback 的。
根据下图咱们来简单的回顾一下 TCP 包的格式结构。首先,他有 Sequence number 这样一个机制给每一个数据包作一个编号,还有 ACK number 这样一个机制来确保 TCP 的数据传输是可靠的,除此以外还有一个很重要的部分就是 Window Size,接收端在回复消息的时候会经过 Window Size 告诉发送端还能够发送多少数据。
接下来咱们来简单看一下这个过程。
TCP 的流控就是基于滑动窗口的机制,如今咱们有一个 Socket 的发送端和一个 Socket 的接收端,目前咱们的发送端的速率是咱们接收端的 3 倍,这样会发生什么样的一个状况呢?假定初始的时候咱们发送的 window 大小是 3,而后咱们接收端的 window 大小是固定的,就是接收端的 Buffer 大小为 5。
首先,发送端会一次性发 3 个 packets,将 1,2,3 发送给接收端,接收端接收到后会将这 3 个 packets 放到 Buffer 里去。
接收端一次消费 1 个 packet,这时候 1 就已经被消费了,而后咱们看到接收端的滑动窗口会往前滑动一格,这时候 2,3 还在 Buffer 当中 而 4,5,6 是空出来的,因此接收端会给发送端发送 ACK = 4 ,表明发送端能够从 4 开始发送,同时会将 window 设置为 3 (Buffer 的大小 5 减去已经存下的 2 和 3),发送端接收到回应后也会将他的滑动窗口向前移动到 4,5,6。
这时候发送端将 4,5,6 发送,接收端也能成功的接收到 Buffer 中去。
到这一阶段后,接收端就消费到 2 了,一样他的窗口也会向前滑动一个,这时候他的 Buffer 就只剩一个了,因而向发送端发送 ACK = 七、window = 1。发送端收到以后滑动窗口也向前移,可是这个时候就不能移动 3 格了,虽然发送端的速度容许发 3 个 packets 可是 window 传值已经告知只能接收一个,因此他的滑动窗口就只能往前移一格到 7 ,这样就达到了限流的效果,发送端的发送速度从 3 降到 1。
咱们再看一下这种状况,这时候发送端将 7 发送后,接收端接收到,可是因为接收端的消费出现问题,一直没有从 Buffer 中去取,这时候接收端向发送端发送 ACK = 八、window = 0 ,因为这个时候 window = 0,发送端是不能发送任何数据,也就会使发送端的发送速度降为 0。这个时候发送端不发送任何数据了,接收端也不进行任何的反馈了,那么如何知道消费端又开始消费了呢?
TCP 当中有一个 ZeroWindowProbe 的机制,发送端会按期的发送 1 个字节的探测消息,这时候接收端就会把 window 的大小进行反馈。当接收端的消费恢复了以后,接收到探测消息就能够将 window 反馈给发送端端了从而恢复整个流程。TCP 就是经过这样一个滑动窗口的机制实现 feedback。
大致的逻辑就是从 Socket 里去接收数据,每 5s 去进行一次 WordCount,将这个代码提交后就进入到了编译阶段。
这时候尚未向集群去提交任务,在 Client 端会将 StreamGraph 生成 JobGraph,JobGraph 就是作为向集群提交的最基本的单元。在生成 JobGrap 的时候会作一些优化,将一些没有 Shuffle 机制的节点进行合并。有了 JobGraph 后就会向集群进行提交,进入运行阶段。
JobGraph 提交到集群后会生成 ExecutionGraph ,这时候就已经具有基本的执行任务的雏形了,把每一个任务拆解成了不一样的 SubTask,上图 ExecutionGraph 中的 Intermediate Result Partition 就是用于发送数据的模块,最终会将 ExecutionGraph 交给 JobManager 的调度器,将整个 ExecutionGraph 调度起来。而后咱们概念化这样一张物理执行图,能够看到每一个 Task 在接收数据时都会经过这样一个 InputGate 能够认为是负责接收数据的,再往前有这样一个 ResultPartition 负责发送数据,在 ResultPartition 又会去作分区跟下游的 Task 保持一致,就造成了 ResultSubPartition 和 InputChannel 的对应关系。这就是从逻辑层上来看的网络传输的通道,基于这么一个概念咱们能够将反压的问题进行拆解。
反压的传播其实是分为两个阶段的,对应着上面的执行图,咱们一共涉及 3 个 TaskManager,在每一个 TaskManager 里面都有相应的 Task 在执行,还有负责接收数据的 InputGate,发送数据的 ResultPartition,这就是一个最基本的数据传输的通道。在这时候假设最下游的 Task (Sink)出现了问题,处理速度降了下来这时候是如何将这个压力反向传播回去呢?这时候就分为两种状况:
前面提到,发送数据须要 ResultPartition,在每一个 ResultPartition 里面会有分区 ResultSubPartition,中间还会有一些关于内存管理的 Buffer。
对于一个 TaskManager 来讲会有一个统一的 Network BufferPool 被全部的 Task 共享,在初始化时会从 Off-heap Memory 中申请内存,申请到内存的后续内存管理就是同步 Network BufferPool 来进行的,不须要依赖 JVM GC 的机制去释放。有了 Network BufferPool 以后能够为每个 ResultSubPartition 建立 Local BufferPool 。
如上图左边的 TaskManager 的 Record Writer 写了 <1,2> 这个两个数据进来,由于 ResultSubPartition 初始化的时候为空,没有 Buffer 用来接收,就会向 Local BufferPool 申请内存,这时 Local BufferPool 也没有足够的内存因而将请求转到 Network BufferPool,最终将申请到的 Buffer 按原链路返还给 ResultSubPartition,<1,2> 这个两个数据就能够被写入了。以后会将 ResultSubPartition 的 Buffer 拷贝到 Netty 的 Buffer 当中最终拷贝到 Socket 的 Buffer 将消息发送出去。而后接收端按照相似的机制去处理将消息消费掉。
接下来咱们来模拟上下游处理速度不匹配的场景,发送端的速率为 2,接收端的速率为 1,看一下反压的过程是怎样的。
由于速度不匹配就会致使一段时间后 InputChannel 的 Buffer 被用尽,因而他会向 Local BufferPool 申请新的 Buffer ,这时候能够看到 Local BufferPool 中的一个 Buffer 就会被标记为 Used。
发送端还在持续以不匹配的速度发送数据,而后就会致使 InputChannel 向 Local BufferPool 申请 Buffer 的时候发现没有可用的 Buffer 了,这时候就只能向 Network BufferPool 去申请,固然每一个 Local BufferPool 都有最大的可用的 Buffer,防止一个 Local BufferPool 把 Network BufferPool 耗尽。这时候看到 Network BufferPool 仍是有可用的 Buffer 能够向其申请。
一段时间后,发现 Network BufferPool 没有可用的 Buffer,或是 Local BufferPool 的最大可用 Buffer 到了上限没法向 Network BufferPool 申请,没有办法去读取新的数据,这时 Netty AutoRead 就会被禁掉,Netty 就不会从 Socket 的 Buffer 中读取数据了。
显然,再过不久 Socket 的 Buffer 也被用尽,这时就会将 Window = 0 发送给发送端(前文提到的 TCP 滑动窗口的机制)。这时发送端的 Socket 就会中止发送。
很快发送端的 Socket 的 Buffer 也被用尽,Netty 检测到 Socket 没法写了以后就会中止向 Socket 写数据。
Netty 中止写了以后,全部的数据就会阻塞在 Netty 的 Buffer 当中了,可是 Netty 的 Buffer 是无界的,能够经过 Netty 的水位机制中的 high watermark 控制他的上界。当超过了 high watermark,Netty 就会将其 channel 置为不可写,ResultSubPartition 在写以前都会检测 Netty 是否可写,发现不可写就会中止向 Netty 写数据。
这时候全部的压力都来到了 ResultSubPartition,和接收端同样他会不断的向 Local BufferPool 和 Network BufferPool 申请内存。
Local BufferPool 和 Network BufferPool 都用尽后整个 Operator 就会中止写数据,达到跨 TaskManager 的反压。
了解了跨 TaskManager 反压过程后再来看 TaskManager 内反压过程就更好理解了,下游的 TaskManager 反压致使本 TaskManager 的 ResultSubPartition 没法继续写入数据,因而 Record Writer 的写也被阻塞住了,由于 Operator 须要有输入才能有计算后的输出,输入跟输出都是在同一线程执行, Record Writer 阻塞了,Record Reader 也中止从 InputChannel 读数据,这时上游的 TaskManager 还在不断地发送数据,最终将这个 TaskManager 的 Buffer 耗尽。具体流程能够参考下图,这就是 TaskManager 内的反压过程。
在介绍 Credit-based 反压机制以前,先分析下 TCP 反压有哪些弊端。
这个机制简单的理解起来就是在 Flink 层面实现相似 TCP 流控的反压机制来解决上述的弊端,Credit 能够类比为 TCP 的 Window 机制。
如图所示在 Flink 层面实现反压机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完以后若是有充足的 Buffer 就会返还给上游一个 Credit 告知他能够发送消息(图上两个 ResultSubPartition 和 InputChannel 之间是虚线是由于最终仍是要经过 Netty 和 Socket 去通讯),下面咱们看一个具体示例。
假设咱们上下游的速度不匹配,上游发送速率为 2,下游接收速率为 1,能够看到图上在 ResultSubPartition 中累积了两条消息,10 和 11, backlog 就为 2,这时就会将发送的数据 <8,9> 和 backlog = 2 一同发送给下游。下游收到了以后就会去计算是否有 2 个 Buffer 去接收,能够看到 InputChannel 中已经不足了这时就会从 Local BufferPool 和 Network BufferPool 申请,好在这个时候 Buffer 仍是能够申请到的。
过了一段时间后因为上游的发送速率要大于下游的接受速率,下游的 TaskManager 的 Buffer 已经到达了申请上限,这时候下游就会向上游返回 Credit = 0,ResultSubPartition 接收到以后就不会向 Netty 去传输数据,上游 TaskManager 的 Buffer 也很快耗尽,达到反压的效果,这样在 ResultSubPartition 层就能感知到反压,不用经过 Socket 和 Netty 一层层地向上反馈,下降了反压生效的延迟。同时也不会将 Socket 去阻塞,解决了因为一个 Task 反压致使 TaskManager 和 TaskManager 之间的 Socket 阻塞的问题。
有了动态反压,静态限速是否是彻底没有做用了?
实际上动态反压不是万能的,咱们流计算的结果最终是要输出到一个外部的存储(Storage),外部数据存储到 Sink 端的反压是不必定会触发的,这要取决于外部存储的实现,像 Kafka 这样是实现了限流限速的消息中间件能够经过协议将反压反馈给 Sink 端,可是像 ES 没法将反压进行传播反馈给 Sink 端,这种状况下为了防止外部存储在大的数据量下被打爆,咱们就能够经过静态限速的方式在 Source 端去作限流。因此说动态反压并不能彻底替代静态限速的,须要根据合适的场景去选择处理方案。
本文为云栖社区原创内容,未经容许不得转载。