Flink中的数据交换基于如下设计原则apache
1.用于数据交换的控制流(即:为了启动交换而传递的消息)是接收者启动的,就像原始MapReduce同样网络
2.用于数据交换的数据流,即经过线路的实际数据传输由IntermediateResult的概念抽象,而且是可插入的。 这意味着系统能够使用相同的实现支持流数据传输和批量数据传输。数据结构
数据交换涉及许多实例,例如:oop
JobManager是主节点,负责调度任务,恢复和协调,并经过ExecutionGraph数据结构保存工做的全貌。线程
TaskManagers,工做节点。 TaskManager(TM)在线程中同时执行许多任务。 每一个TM还包含一个CommunicationManager(CM - 在任务之间共享)和一个MemoryManager(MM - 也在任务之间共享)。 TM能够经过复用的TCP链接相互交换数据,这些链接是在须要时建立的。设计
请注意,在Flink中,经过网络交换数据的是TaskManagers,而不是任务,即,经过一个网络链接复用生活在同一TM中的任务之间的数据交换。netty
ExecutionGraph:执行图是一种数据结构,包含有关做业计算的“基本事实”。 它由表示计算任务的顶点(ExecutionVertex)和表示任务生成的数据的中间结果(IntermediateResultPartition)组成。 顶点连接到它们经过ExecutionEdges(EE)消耗的中间结果:对象
这些是JobManager中的逻辑数据结构。 它们具备运行时等效结构,负责TaskManagers中的实际数据处理。 IntermediateResultPartition的运行时等价物称为ResultPartition。blog
ResultPartition(RP)表示BufferWriter写入的一大块数据,即由单个任务生成的一大块数据。 RP是结果子分区(RS)的集合。 这是为了区分指向不一样接收器的数据,例如,在用于reduce或join的分区shuffle的状况下。生命周期
ResultSubpartition(RS)表示由operator建立的数据的一个分区,以及将此数据转发给接收operator的逻辑。 RS的具体实现肯定了实际的数据传输逻辑,这是可插拔的机制,容许系统支持各类数据传输。 例如,PipelinedSubpartition是一个支持流数据交换的流水线实现。 SpillableSubpartition是一种支持批量数据交换的阻塞实现。
InputGate:接收端RP的逻辑等效项。 它负责收集数据缓冲区并将其上传到上游。
InputChannel:接收端RS的逻辑等价物。 它负责收集特定分区的数据缓冲区。
Buffer: See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525
序列化器和反序列化器可靠地将类型化记录转换为原始字节缓冲区,反之亦然,处理跨越多个缓冲区的记录等。
Control flow for data exchange
图片表明一个简单的map-reduce做业,具备两个并行任务。咱们有两个TaskManagers,每一个都有两个任务(一个map任务和一个reduce任务)在两个不一样的节点中运行,一个JobManager在第三个节点中运行。咱们专一于启动任务M1和R2之间的转移。使用粗箭头表示数据传输,使用细箭头表示消息。首先,M1生成ResultPartition(RP1)(箭头1)。当RP可供使用时(咱们将在稍后讨论),它会通知JobManager(箭头2)。 JobManager通知该分区的预期接收者(任务R1和R2)分区已准备就绪。若是还没有安排接收器,这实际上将触发任务的部署(箭头3a,3b)。而后,接收器将从RP请求数据(箭头4a和4b)。这将在本地(状况5a)或经过TaskManagers(5b)的网络堆栈启动任务(箭头5a和5b)之间的数据传输。当RP决定通知JobManager其可用性时,该过程留下必定程度的自由度。例如,若是RP1在通知JM以前彻底自行生成(而且可能写入文件),则数据交换大体对应于Hadoop中实现的批处理交换。若是RP1在产生第一条记录后当即通知JM,咱们就会进行流数据交换。
Transfer of a byte buffer between two tasks
这张图片更详细地展现了数据记录从生产者发送到消费者的生命周期。最初,MapDriver生成传递给RecordWriter对象的记录(由收集器收集)。 RecordWriters包含许多序列化程序(RecordSerializer对象),每一个消费者任务可能会使用这些记录。例如,在shuffle或broadcast中,将有与消费者任务数量同样多的序列化器。 ChannelSelector选择一个或多个序列化程序来放置记录。例如,若是广播记录,它们将被放置在每一个序列化器中。若是记录是散列分区的,则ChannelSelector将评估记录上的哈希值并选择适当的序列化程序。
序列化程序将记录序列化为二进制表示形式,并将它们放在固定大小的缓冲区中(记录能够跨越多个缓冲区)。这些缓冲区移交给BufferWriter并写入ResultPartition(RP)。 RP由几个子分区(ResultSubpartitions-RSs)组成,为特定的消费者收集缓冲区。在图片中,缓冲区的目的地是第二个reducer(在TaskManager 2中),它被放置在RS2中。因为这是第一个缓冲区,RS2可供使用(请注意,此行为实现了流式shuffle),并通知JobManager事实。
JobManager查找RS2的使用者,并通知TaskManager 2有可用的数据块。到TM2的消息向下传播到应该接收此缓冲区的InputChannel,后者又通知RS2能够启动网络传输。而后,RS2将缓冲区移交给TM1的网络堆栈,而后TM1将其交给netty进行运输。网络链接长时间运行并存在于TaskManagers之间,而不是单个任务。
一旦TM2接收到缓冲区,它就会经过一个相似的对象层次结构,从InputChannel(接收方等效于IRPQ)开始,进入InputGate(包含几个IC),最后进入RecordDeserializer,从缓冲区生成类型化记录并将它们交给接收任务,在本例中为ReduceDriver。
原文连接:https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks