【译】A Deep-Dive into Flink's Network Stack(3)

单个 TaskManager 上的缓冲区总数一般不须要配置。须要配置时请参阅配置网络缓冲区文档html

形成背压(1)

每当子任务的发送缓冲池耗尽时——也就是缓存驻留在结果子分区的缓存队列中或更底层的基于 Netty 的网络栈中时——生产者就被阻塞了,没法继续工做,并承受背压。接收器也是相似:较底层网络栈中传入的 Netty 缓存须要经过网络缓冲区提供给 Flink。若是相应子任务的缓冲池中没有可用的网络缓存,Flink 将在缓存可用前中止从该通道读取。这将对这部分多路传输链路发送的全部子任务形成背压,所以也限制了其余接收子任务。下图中子任务 B.4 过载了,它会对这条多路传输链路形成背压,还会阻止子任务 B.3 接收和处理新的缓存。java

为了防止这种状况发生,Flink 1.5 引入了本身的流量控制机制。git

基于信用的流量控制

基于信用的流量控制可确保“线上”的任何内容都能被接收器处理。它是 Flink 原有机制的天然拓展,基于网络缓冲区的可用性实现。每一个远程输入通道如今都有本身的一组独占缓冲区,而非使用共享的本地缓冲池。而本地缓冲池中的缓存称为浮动缓存,由于它们会浮动并可用于全部输入通道。github

接收器将缓存的可用性声明为发送方的信用(1 缓存 = 1 信用)。每一个结果子分区将跟踪其通道信用值。若是信用可用,则缓存仅转发到较底层的网络栈,而且发送的每一个缓存都会让信用值减去一。除了缓存外,咱们还发送有关当前backlog大小的信息,从而指定在此子分区的队列中等待的缓存数量。接收器将使用它来请求适当数量的浮动缓冲区,以便更快处理 backlog。它将尝试获取与 backlog 大小同样多的浮动缓冲区,但有时并不会如意,可能只获取一点甚至获取不到缓冲。接收器将使用检索到的缓存,并将继续监听可用的缓存。apache

基于信用的流量控制将使用每通道缓冲区来指定本地缓冲池(可选(3))的独占(强制)缓存数和每一个门的浮动缓冲区,从而实现与没有流量控制时相同的缓冲区上限。这两个参数的默认值会使流量控制的最大(理论)吞吐量至少与没有流量控制时同样高,前提是网络的延迟处于通常水平上。你可能须要根据实际的网络延迟和带宽来调整这些参数。api

可选(3):若是没有足够的缓存,每一个缓冲池将从全局可用缓冲池中获取相同份额(±1)。数组

形成背压(2)

相比没有流量控制的接收器的背压机制,信用机制提供了更直接的控制逻辑:若是接收器能力不足,其可用信用将减到 0,并阻止发送方将缓存转发到较底层的网络栈上。这样只在这个逻辑信道上存在背压,而且不须要阻止从多路复用 TCP 信道读取内容。所以,其余接收器在处理可用缓存时就不受影响了。缓存

咱们有什么收获?

经过流量控制,多路复用链路中的信道就不会阻塞链路中的另外一个逻辑信道,提高了总体资源利用率。此外,咱们还能经过彻底控制“在线”数据的数量来改善检查点对齐状况:若是没有流量控制,通道须要一段时间才能填满网络堆栈的内部缓冲区,并广播接收器已经中止读取的消息。这段时间里会多出不少缓存。全部检查点障碍都必须在这些缓存后面排队,所以必须等到全部这些缓存处理完毕后才能启动(“障碍永远不会越过记录!”)。服务器

可是,来自接收器的附加通告消息可能会产生一些额外开销,尤为是在使用 SSL 加密通道的设置中更是如此。此外,单个输入通道不能使用缓冲池中的全部缓存,由于独占缓存不能共享。它也不能当即开始发送尽量多的数据,因此在加速期间(生成数据的速度比计算信用的速度更快时)可能须要更长时间才能发送数据。虽然这可能会影响你的做业性能,但这些代价相比收益来讲仍是值得的。你可能但愿经过每一个通道的缓冲区增长独占缓存的数量,但代价是使用更多内存。但与以前的实现相比整体内存占用可能仍是要少一些,由于较底层的网络栈再也不须要缓存大量数据了,咱们老是能够当即将其传输到 Flink 中。网络

还有一件事要注意:因为咱们在发送方和接收方之间缓存的数据更少了,你可能会更早地遇到背压。但这也在预料之中,并且缓存的数据再多也没什么用。若是你想要缓存更多数据,同时还要有流量控制,能够考虑经过每一个门的浮动缓冲区来提高浮动缓存的数量。

Advantages Disadvantages
• better resource utilisation with data skew in multiplexed connections 经过多路复用链接中的数据倾斜提高资源利用率

• improved checkpoint alignment 改善了检查点对齐

• reduced memory use (less data in lower network layers)减小内存占用(较底层网络层中的数据更少)
• additional credit-announce messages 额外的信用通知消息

• additional backlog-announce messages (piggy-backed with buffer messages, almost no overhead)额外的 backlog 通知消息(缓存消息附带,几乎没有开销)

• potential round-trip latency 潜在的往返延迟
• backpressure appears earlier 背压出现得更早

注意:若是你须要关闭基于信用的流量控制,能够将下列代码添加到 flink-conf.yaml:taskmanager.network.credit-model: false。但此参数已弃用,最终将与不基于信用的流控制代码一块儿被移除。

将记录写入网络缓冲区并再次读取它们

下面的视图比以前的级别更高一些,其中包含网络栈及其周围组件的更多详细信息:

一个记录被建立并传递以后(例如经过 Collector #colle()),它会被递交到RecordWriter,其未来自 Java 对象的记录序列化为一个字节序列,后者最终成为网络缓存,而后像前文提到的那样被处理。RecordWriter 首先使用SpanningRecordSerializer将记录序列化为一个灵活的堆上字节数组。而后,它尝试将这些字节写入目标网络通道的关联网络缓存。

在接收方,较底层的网络栈(netty)将接收到的缓存写入适当的输入通道。最后(流式)任务的线程从这些队列中读取并尝试在RecordReader的帮助下,经过SpillingAdaptiveSpanningRecordDeserializer将积累的数据反序列化为 Java 对象。与序列化器相似,这个反序列化器还必须处理特殊状况,例如跨越多个网络缓冲区的记录——这多是由于记录大于网络缓冲区(默认为 32KiB,经过taskmanager.memory.segment-size设置);或者是由于序列化记录被添加到了没有足够剩余空间的网络缓冲区中。无论怎样,Flink 将使用这些数据,并继续将剩余数据写入新的网络缓冲区。

将缓存刷新到 Netty

在上图中,基于信用的流量控制机制实际上位于“Netty 服务器”(和“Netty 客户端”)组件内部,RecordWriter 写入的缓存始终以空状态添加到结果子分区中,而后逐渐填满(序列化)记录。可是何时 Netty 真的获得了缓存呢?显然,只要它们可用时就不能接收数据了,由于这不只会由于跨线程通讯和同步而增长大量成本,并且还会让整个缓存都过期。

在 Flink 中,有三种状况下 Netty 服务器能够消费缓存:

  • 写入记录时缓冲区变满

  • 缓存超时命中

  • 发送特殊事件,例如检查点障碍

缓冲区满后刷新

RecordWriter 与本地序列化缓冲区一块儿使用当前记录,并将这些数据逐渐写入位于相应结果子分区队列的一个或多个网络缓冲区。虽然 RecordWriter 能够处理多个子分区,但每一个子分区只有一个 RecordWriter 向其写入数据。另外一方面,Netty 服务器正在从多个结果子分区读取并将适当的分区复用到单个信道中,如上所述。这是一个典型的生产者——消费者模式,网络缓冲区位于中间位置,以下图所示。在(1)序列化和(2)将数据写入缓冲区以后,RecordWriter 相应地更新缓冲区的写入器索引。一旦缓冲区被彻底填满,记录写入器将(3)从其本地缓冲池中获取当前记录(或下一个记录)的全部剩余数据生成新的缓存,并将新的缓存添加到子分区队列。这将(4)通知 Netty 服务器还有数据可用(注 4)。每当 Netty 有能力处理此通知时,它将(5)获取缓存并沿适当的 TCP 通道发送它。

注4:若是队列中有更多处理完的缓存,咱们能够假设 Netty 已经收到了通知

缓冲区超时后刷新

为了下降延迟,咱们不能在缓冲区填满以后才向下游发送数据。有些状况下某个通讯信道没有流过那么多记录,这样会带来无心义的延迟。为此,一个名为输出刷新器的按期进程将刷新堆栈中可用的任何数据。能够经过StreamExecutionEnvironment#setBufferTimeout配置周期间隔,这个间隔对于低吞吐量通道来讲就是延迟上限(注 5)。下图显示了它与其余组件的交互方式:RecordWriter 仍是会序列化并写入网络缓冲区,但同时,若是 Netty 服务器还没有知晓,输出刷新器能够(3,4)通知 Netty 服务器有数据可用(相似上面的“缓冲区已满”场景)。当 Netty 处理此通知(5)时,它将使用缓冲区中的可用数据并更新缓冲区的读取器索引。缓存保留在队列中——从 Netty 服务器端对此缓存作进一步操做后,将在下次继续读取读取器索引。

注5:严格来讲,输出刷新器无法给出任何保证——它只会向 Netty 发送通知而已,后者是否响应通知则要取决于其意愿和能力。这也意味着若是通道在经受背压,输出刷新器就没用了。

特殊事件后刷新

某些特殊事件若是经过 RecordWriter 发送,也会触发当即刷新。最重要的特殊事件是检查点障碍或分区结束事件,显然它们应该快速执行,而不是等待输出刷新器启动。

其余要点

相比 Flink 1.5 以前的版本,请注意(a)网络缓冲区如今直接放在子分区队列中,(b)咱们不会在每次刷新时关闭缓冲区。这也带来了一些好处:

  • 同步开销较少(输出刷新和 RecordWriter 是各自独立的)

  • 在高负载场景中,当 Netty 是瓶颈时(由于背压或直接缘由),咱们仍然能够在不完整的缓冲区中积累数据

  • Netty 通知明显减小

但在低负载状况下 CPU 使用率和 TCP 包速率可能会增长。这是由于新版 Flink 将使用全部可用的 CPU 周期来维持所需的延迟。当负载增长时它将经过填充更多的缓冲区来自我调整。因为同步开销减小了,高负载场景不会受到影响,甚至能够得到更大的吞吐量。

缓冲生成器和缓冲消费者

若是你想更深刻地了解如何在 Flink 中实现生产者——消费者机制,请仔细查看 Flink 1.5 中引入的BufferBuilderBufferConsumer类。虽然读取多是按缓存逐个进行的,但写入是按记录进行的这样 Flink 中的全部网络通讯都走热路径。所以,咱们很是清楚咱们须要在任务的线程和 Netty 线程之间创建轻量链接,这不会致使过多的同步开销。详细信息能够参阅源代码

延迟与吞吐量

引入网络缓冲区能得到更高的资源利用率和吞吐量,代价是让一些记录在缓冲区中等待一段时间。虽然能够经过缓冲区超时设置来限制这个延迟,但你极可能想要知道延迟和吞吐量之间的权衡关系——显然它们不可兼得。下图显示了缓冲区超时设置的不一样值——从 0 开始(每一个记录都刷新)到 100 毫秒(默认值)——以及在有 100 个节点,每一个节点 8 个插槽各运行一个做业的集群上对应的吞吐量;做业没有业务逻辑,只用来测试网络栈。为了对比,咱们还加入了 Flink 1.4 版本的状况。

如你所见,使用 Flink 1.5+ 版本时即便是很是低的缓冲区超时(例如 1ms,适合低延迟场景)也设置也只比默认超时设置高出最多 75%的吞吐量。

结论

如今你了解告终果分区、批处理和流式传输的各类网络链接和调度类型。你还了解了基于信用的流量控制以及网络栈的内部工做机制,知道怎样调整网络相关的参数,知道怎样判断某些做业行为。本系列的后续文章将基于这些知识探讨更多操做细节,包括须要查看的相关指标、进一步的网络栈调整以及要避免的常见反模式。敬请期待。

原文连接:

https://flink.apache.org/2019/06/05/flink-network-stack.html

相关文章
相关标签/搜索