做者:Nico Kruberhtml
翻译:曹英杰apache
Flink 的网络协议栈是组成 flink-runtime 模块的核心组件之一,是每一个 Flink 做业的核心。它链接全部 TaskManager 的各个子任务(Subtask),所以,对于 Flink 做业的性能包括吞吐与延迟都相当重要。与 TaskManager 和 JobManager 之间经过基于 Akka 的 RPC 通讯的控制通道不一样,TaskManager 之间的网络协议栈依赖于更加底层的 Netty API。数组
本文将首先介绍 Flink 暴露给流算子(Stream operator)的高层抽象,而后详细介绍 Flink 网络协议栈的物理实现和各类优化、优化的效果以及 Flink 在吞吐量和延迟之间的权衡。缓存
Flink 的网络协议栈为彼此通讯的子任务提供如下逻辑视图,例如在 A 经过 keyBy() 操做进行数据 Shuffle :网络
这一过程创建在如下三种基本概念的基础上:并发
▼ 子任务输出类型(ResultPartitionType):
Pipelined(有限的或无限的):一旦产生数据就能够持续向下游发送有限数据流或无限数据流。
Blocking:仅在生成完整结果后向下游发送数据。运维
▼ 调度策略:
同时调度全部任务(Eager):同时部署做业的全部子任务(用于流做业)。
上游产生第一条记录部署下游(Lazy):一旦任何生产者生成任何输出,就当即部署下游任务。
上游产生完整数据部署下游:当任何或全部生产者生成完整数据后,部署下游任务。性能
▼ 数据传输:
高吞吐:Flink 不是一个一个地发送每条记录,而是将若干记录缓冲到其网络缓冲区中并一次性发送它们。这下降了每条记录的发送成本所以提升了吞吐量。
低延迟:当网络缓冲区超过必定的时间未被填满时会触发超时发送,经过减少超时时间,能够经过牺牲必定的吞吐来获取更低的延迟。测试
咱们将在下面深刻 Flink 网络协议栈的物理实现时看到关于吞吐延迟的优化。对于这一部分,让咱们详细说明输出类型与调度策略。首先,须要知道的是子任务的输出类型和调度策略是紧密关联的,只有二者的一些特定组合才是有效的。优化
Pipelined 结果是流式输出,须要目标 Subtask 正在运行以便接收数据。所以须要在上游 Task 产生数据以前或者产生第一条数据的时候调度下游目标 Task 运行。批处理做业生成有界结果数据,而流式处理做业产生无限结果数据。
批处理做业也可能以阻塞方式产生结果,具体取决于所使用的算子和链接模式。在这种状况下,必须等待上游 Task 先生成完整的结果,而后才能调度下游的接收 Task 运行。这可以提升批处理做业的效率而且占用更少的资源。
下表总结了 Task 输出类型以及调度策略的有效组合:
注释:
[1]目前 Flink 未使用
[2]批处理 / 流计算统一完成后,可能适用于流式做业
此外,对于具备多个输入的子任务,调度以两种方式启动:当全部或者任何上游任务产生第一条数据或者产生完整数据时调度任务运行。要调整批处理做业中的输出类型和调度策略,能够参考 ExecutionConfig#setExecutionMode()——尤为是 ExecutionMode,以及 ExecutionConfig#setDefaultInputDependencyConstraint()。
为了理解物理数据链接,请回想一下,在 Flink 中,不一样的任务能够经过 Slotsharing group 共享相同 Slot。TaskManager 还能够提供多个 Slot,以容许将同一任务的多个子任务调度到同一个 TaskManager 上。
对于下图所示的示例,咱们假设 2 个并发为 4 的任务部署在 2 个 TaskManager 上,每一个 TaskManager 有两个 Slot。TaskManager 1 执行子任务 A.1,A.2,B.1 和 B.2,TaskManager 2 执行子任务 A.3,A.4,B.3 和 B.4。在 A 和 B 之间是 Shuffle 链接类型,好比来自于 A 的 keyBy() 操做,在每一个 TaskManager 上会有 2x4 个逻辑链接,其中一些是本地的,另外一些是远程的:
不一样任务(远程)之间的每一个网络链接将在 Flink 的网络堆栈中得到本身的 TCP 通道。可是,若是同一任务的不一样子任务被调度到同一个 TaskManager,则它们与同一个 TaskManager 的网络链接将多路复用并共享同一个 TCP 信道以减小资源使用。在咱们的例子中,这适用于 A.1→B.3,A.1→B.4,以及 A.2→B.3 和 A.2→B.4,以下图所示:
每一个子任务的输出结果称为 ResultPartition,每一个 ResultPartition 被分红多个单独的 ResultSubpartition- 每一个逻辑通道一个。Flink 的网络协议栈在这一点的处理上,再也不处理单个记录,而是将一组序列化的记录填充到网络缓冲区中进行处理。每一个子任务本地缓冲区中最多可用 Buffer 数目为(每一个发送方和接收方各一个):
#channels * buffers-per-channel + floating-buffers-per-gate
单个 TaskManager 上的网络层 Buffer 总数一般不须要配置。有关如何在须要时进行配置的详细信息,请参阅配置网络缓冲区的文档。
▼ 形成反压(1)
每当子任务的数据发送缓冲区耗尽时——数据驻留在 Subpartition 的缓冲区队列中或位于更底层的基于 Netty 的网络堆栈内,生产者就会被阻塞,没法继续发送数据,而受到反压。接收端以相似的方式工做:Netty 收到任何数据都须要经过网络 Buffer 传递给 Flink。若是相应子任务的网络缓冲区中没有足够可用的网络 Buffer,Flink 将中止从该通道读取,直到 Buffer 可用。这将反压该多路复用上的全部发送子任务,所以也限制了其余接收子任务。下图说明了过载的子任务 B.4,它会致使多路复用的反压,也会致使子任务 B.3 没法接受和处理数据,即便是 B.3 还有足够的处理能力。
为了防止这种状况发生,Flink 1.5 引入了本身的流量控制机制。
Credit-based 流量控制可确保发送端已经发送的任何数据,接收端都具备足够的能力(Buffer)来接收。新的流量控制机制基于网络缓冲区的可用性,做为 Flink 以前机制的天然延伸。每一个远程输入通道(RemoteInputChannel)如今都有本身的一组独占缓冲区(Exclusive buffer),而不是只有一个共享的本地缓冲池(LocalBufferPool)。与以前不一样,本地缓冲池中的缓冲区称为流动缓冲区(Floating buffer),由于它们会在输出通道间流动而且可用于每一个输入通道。
数据接收方会将自身的可用 Buffer 做为 Credit 告知数据发送方(1 buffer = 1 credit)。每一个 Subpartition 会跟踪下游接收端的 Credit(也就是可用于接收数据的 Buffer 数目)。只有在相应的通道(Channel)有 Credit 的时候 Flink 才会向更底层的网络协议栈发送数据(以 Buffer 为粒度),而且每发送一个 Buffer 的数据,相应的通道上的 Credit 会减 1。除了发送数据自己外,数据发送端还会发送相应 Subpartition 中有多少正在排队发送的 Buffer 数(称之为 Backlog)给下游。数据接收端会利用这一信息(Backlog)去申请合适数量的 Floating buffer 用于接收发送端的数据,这能够加快发送端堆积数据的处理。接收端会首先申请和 Backlog 数量相等的 Buffer,但可能没法申请到所有,甚至一个都申请不到,这时接收端会利用已经申请到的 Buffer 进行数据接收,并监听是否有新的 Buffer 可用。
Credit-based 的流控使用 Buffers-per-channel 来指定每一个 Channel 有多少独占的 Buffer,使用 Floating-buffers-per-gate 来指定共享的本地缓冲池(Local buffer pool)大小(可选3),经过共享本地缓冲池,Credit-based 流控可使用的 Buffer 数目能够达到与原来非 Credit-based 流控一样的大小。这两个参数的默认值是被精心选取的,以保证新的 Credit-based 流控在网络健康延迟正常的状况下至少能够达到与原策略相同的吞吐。能够根据实际的网络 RRT (round-trip-time)和带宽对这两个参数进行调整。
注释3:若是没有足够的 Buffer 可用,则每一个缓冲池将得到全局可用 Buffer 的相同份额(±1)。
▼ 形成反压(2)
与没有流量控制的接收端反压机制不一样,Credit 提供了更直接的控制:若是接收端的处理速度跟不上,最终它的 Credit 会减小成 0,此时发送端就不会在向网络中发送数据(数据会被序列化到 Buffer 中并缓存在发送端)。因为反压只发生在逻辑链路上,所以不必阻断从多路复用的 TCP 链接中读取数据,也就不会影响其余的接收者接收和处理数据。
▼ Credit-based 的优点与问题
因为经过 Credit-based 流控机制,多路复用中的一个信道不会因为反压阻塞其余逻辑信道,所以总体资源利用率会增长。此外,经过彻底控制正在发送的数据量,咱们还可以加快 Checkpoint alignment:若是没有流量控制,通道须要一段时间才能填满网络协议栈的内部缓冲区并代表接收端再也不读取数据了。在这段时间里,大量的 Buffer 不会被处理。任何 Checkpoint barrier(触发 Checkpoint 的消息)都必须在这些数据 Buffer 后排队,所以必须等到全部这些数据都被处理后才可以触发 Checkpoint(“Barrier 不会在数据以前被处理!”)。
可是,来自接收方的附加通告消息(向发送端通知 Credit)可能会产生一些额外的开销,尤为是在使用 SSL 加密信道的场景中。此外,单个输入通道( Input channel)不能使用缓冲池中的全部 Buffer,由于存在没法共享的 Exclusive buffer。新的流控协议也有可能没法作到当即发送尽量多的数据(若是生成数据的速度快于接收端反馈 Credit 的速度),这时则可能增加发送数据的时间。虽然这可能会影响做业的性能,但因为其全部优势,一般新的流量控制会表现得更好。可能会经过增长单个通道的独占 Buffer 数量,这会增大内存开销。然而,与先前实现相比,整体内存使用可能仍然会下降,由于底层的网络协议栈再也不须要缓存大量数据,由于咱们老是能够当即将其传输到 Flink(必定会有相应的 Buffer 接收数据)。
在使用新的 Credit-based 流量控制时,可能还会注意到另外一件事:因为咱们在发送方和接收方之间缓冲较少的数据,反压可能会更早的到来。然而,这是咱们所指望的,由于缓存更多数据并无真正得到任何好处。若是要缓存更多的数据而且保留 Credit-based 流量控制,能够考虑经过增长单个输入共享 Buffer 的数量。
注意:若是须要关闭 Credit-based 流量控制,能够将这个配置添加到 flink-conf.yaml 中:taskmanager.network.credit-model:false。可是,此参数已过期,最终将与非 Credit-based 流控制代码一块儿删除。
下图从上面的扩展了更高级别的视图,其中包含网络协议栈及其周围组件的更多详细信息,从发送算子发送记录(Record)到接收算子获取它:
在生成 Record 并将其传递出去以后,例如经过 Collector#collect(),它被传递给 RecordWriter,RecordWriter 会将 Java 对象序列化为字节序列,最终存储在 Buffer 中按照上面所描述的在网络协议栈中进行处理。RecordWriter 首先使用 SpanningRecordSerializer 将 Record 序列化为灵活的堆上字节数组。而后,它尝试将这些字节写入目标网络 Channel 的 Buffer 中。咱们将在下面的章节回到这一部分。
在接收方,底层网络协议栈(Netty)将接收到的 Buffer 写入相应的输入通道(Channel)。流任务的线程最终从这些队列中读取并尝试在 RecordReader 的帮助下经过 SpillingAdaptiveSpanningRecordDeserializer 将累积的字节反序列化为 Java 对象。与序列化器相似,这个反序列化器还必须处理特殊状况,例如跨越多个网络 Buffer 的 Record,或者由于记录自己比网络缓冲区大(默认状况下为32KB,经过 taskmanager.memory.segment-size 设置)或者由于序列化 Record 时,目标 Buffer 中已经没有足够的剩余空间保存序列化后的字节数据,在这种状况下,Flink 将使用这些字节空间并继续将其他字节写入新的网络 Buffer 中。
在上图中,Credit-based 流控制机制实际上位于“Netty Server”(和“Netty Client”)组件内部,RecordWriter 写入的 Buffer 始终以空状态(无数据)添加到 Subpartition 中,而后逐渐向其中填写序列化后的记录。可是 Netty 在何时真正的获取并发送这些 Buffer 呢?显然,不能是 Buffer 中只要有数据就发送,由于跨线程(写线程与发送线程)的数据交换与同步会形成大量的额外开销,而且会形成缓存自己失去意义(若是是这样的话,不如直接将将序列化后的字节发到网络上而没必要引入中间的 Buffer)。
在 Flink 中,有三种状况可使 Netty 服务端使用(发送)网络 Buffer:
▼ 在 Buffer 满后发送
RecordWriter 将 Record 序列化到本地的序列化缓冲区中,并将这些序列化后的字节逐渐写入位于相应 Result subpartition 队列中的一个或多个网络 Buffer中。虽然单个 RecordWriter 能够处理多个 Subpartition,但每一个 Subpartition 只会有一个 RecordWriter 向其写入数据。另外一方面,Netty 服务端线程会从多个 Result subpartition 中读取并像上面所说的那样将数据写入适当的多路复用信道。这是一个典型的生产者 - 消费者模式,网络缓冲区位于生产者与消费者之间,以下图所示。在(1)序列化和(2)将数据写入 Buffer 以后,RecordWriter 会相应地更新缓冲区的写入索引。一旦 Buffer 彻底填满,RecordWriter 会(3)为当前 Record 剩余的字节或者下一个 Record 从其本地缓冲池中获取新的 Buffer,并将新的 Buffer 添加到相应 Subpartition 的队列中。这将(4)通知 Netty服务端线程有新的数据可发送(若是 Netty 还不知道有可用的数据的话4)。每当 Netty 有能力处理这些通知时,它将(5)从队列中获取可用 Buffer 并经过适当的 TCP 通道发送它。
注释4:若是队列中有更多已完成的 Buffer,咱们能够假设 Netty 已经收到通知。
▼ 在 Buffer 超时后发送
为了支持低延迟应用,咱们不能只等到 Buffer 满了才向下游发送数据。由于可能存在这种状况,某种通讯信道没有太多数据,等到 Buffer 满了在发送会没必要要地增长这些少许 Record 的处理延迟。所以,Flink 提供了一个按期 Flush 线程(the output flusher)每隔一段时间会将任何缓存的数据所有写出。能够经过 StreamExecutionEnvironment#setBufferTimeout 配置 Flush 的间隔,并做为延迟5的上限(对于低吞吐量通道)。下图显示了它与其余组件的交互方式:RecordWriter 如前所述序列化数据并写入网络 Buffer,但同时,若是 Netty 还不知道有数据能够发送,Output flusher 会(3,4)通知 Netty 服务端线程数据可读(相似与上面的“buffer已满”的场景)。当 Netty 处理此通知(5)时,它将消费(获取并发送)Buffer 中的可用数据并更新 Buffer 的读取索引。Buffer 会保留在队列中——从 Netty 服务端对此 Buffer 的任何进一步操做将在下次从读取索引继续读取。
注释5:严格来讲,Output flusher 不提供任何保证——它只向 Netty 发送通知,而 Netty 线程会按照能力与意愿进行处理。这也意味着若是存在反压,则 Output flusher 是无效的。
▼ 特殊消息后发送
一些特殊的消息若是经过 RecordWriter 发送,也会触发当即 Flush 缓存的数据。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件,这些事件应该尽快被发送,而不该该等待 Buffer 被填满或者 Output flusher 的下一次 Flush。
▼ 进一步的讨论
与小于 1.5 版本的 Flink 不一样,请注意(a)网络 Buffer 如今会被直接放在 Subpartition 的队列中,(b)网络 Buffer 不会在 Flush 以后被关闭。这给咱们带来了一些好处:
可是,在低负载状况下,可能会出现 CPU 使用率和 TCP 数据包速率的增长。这是由于,Flink 将使用任何可用的 CPU 计算能力来尝试维持所需的延迟。一旦负载增长,Flink 将经过填充更多的 Buffer 进行自我调整。因为同步开销减小,高负载场景不会受到影响,甚至能够实现更高的吞吐。
更深刻地了解 Flink 中是如何实现生产者 - 消费者机制,须要仔细查看 Flink 1.5 中引入的 BufferBuilder 和 BufferConsumer 类。虽然读取是以 Buffer 为粒度,但写入它是按 Record 进行的,所以是 Flink 中全部网络通讯的核心路径。所以,咱们须要在任务线程(Task thread)和 Netty 线程之间实现轻量级链接,这意味着尽可能小的同步开销。你能够经过查看源代码获取更加详细的信息。
引入网络 Buffer 的目是得到更高的资源利用率和更高的吞吐,代价是让 Record 在 Buffer 中等待一段时间。虽然能够经过 Buffer 超时给出此等待时间的上限,但可能很想知道有关这两个维度(延迟和吞吐)之间权衡的更多信息,显然,没法二者同时兼得。下图显示了不一样的 Buffer 超时时间下的吞吐,超时时间从 0 开始(每一个 Record 直接 Flush)到 100 毫秒(默认值),测试在具备 100 个节点每一个节点 8 个 Slot 的群集上运行,每一个节点运行没有业务逻辑的 Task 所以只用于测试网络协议栈的能力。为了进行比较,咱们还测试了低延迟改进(如上所述)以前的 Flink 1.4 版本。
如图,使用 Flink 1.5+,即便是很是低的 Buffer 超时(例如1ms)(对于低延迟场景)也提供高达超时默认参数(100ms)75% 的最大吞吐,但会缓存更少的数据。
了解 Result partition,批处理和流式计算的不一样网络链接以及调度类型,Credit-Based 流量控制以及 Flink 网络协议栈内部的工做机理,有助于更好的理解网络协议栈相关的参数以及做业的行为。后续咱们会推出更多 Flink 网络栈的相关内容,并深刻更多细节,包括运维相关的监控指标(Metrics),进一步的网络调优策略以及须要避免的常见错误等。
via:
https://flink.apache.org/2019...