先上一张图总体了解Flink中的反压java
能够看到每一个task都会有本身对应的IG(inputgate)对接上游发送过来的数据和RS(resultPatation)对接往下游发送数据, 整个反压机制经过inputgate,resultPatation公用一个必定大小的memorySegmentPool来实现(Flink中memorySegment做为内存使用的抽象,类比bytebuffer), 公用一个pool当接收上游数据时Decoder,往下游发送数据时Encoder,都会向pool中请求内存memorySegment 。由于是公共pool,也就是说运行时,当接受的数据占用的内存多了,往下游发送的数据就少了,这样是个什么样的状况呢?缓存
好比说你sink端堵塞了,背压了写不进去,那这个task的resultPatation没法发送数据了,也就没法释放memorySegment了,相应的用于接收数据的memorySegment就会愈来愈少,直到接收数据端拿不到memorySegment了,也就没法接收上游数据了,既然这个task没法接收数据了,天然引发这个task的上一个task数据发送端没法发送,那上一个task又反压了,因此这个反压从发生反压的地方,依次的往上游扩散直到source,这个就是flink的自然反压。tcp
从源码来看一下flink是如何实现的编码
来到数据接收的地方StreamInputProcessor.java中processInput()方法中spa
这里经过经过handler的getNextNonBlocked()方法获取到了bufferOrEvent后面就会将这个bufferOrEvent解析成record数据而后使用用户的代码处理了3d
其实这里的handler分为两种netty
区别主要是barrierbuffer实现了barrier对齐的数据缓存,用于实现一次语义,这里之后随缘更新到容错机制的时候讲code
来看一下getNextNonBlocked()方法对象
这个看到了经过会经过上游inputGate获取数据,具体看一下getNextBufferOrEvent()其中有两个比较重要的调用blog
先看requestPartitions()
先遍历了全部的inputchannel而后调用了requestSubpartition()在其中
先看一下1处,这里返回了一个Netty的Client来看一下createPartitionRequestClient是怎么建立的
能够看到源码的描述,这里其实就是建立与上游发送数据端的tcp链接的client端,用来接收上游数据的
接着
这里若是已经创建TCP链接就直接拿,与上游尚未创建tcp链接的话就会先初始化Client端,经过这个connect()方法
来看一下第一次是如何初始化链接的
看到这个应该熟悉Netty的同窗一眼就了解了,在1处就是Client的具体逻辑了,而后与上游端口创建链接
来看一下具体的Client端具体的逻辑,这里最好对netty有必定的认识
2处是用于Decoder的ChannelinboundHandler常规的解码器没有什么好说的
PartitionRequestClientHandler: 不带信任机制的
CreditBasedPartitionRequestClientHandler:带credit信任机制的
这里取出了全部的带有信任的上游inputChannel而且向其响应发送了一个Credit对象
那带Credit机制的handler什么时候触发userEventTriggered()来触发向上游发送Credit呢?
先不慌,先来看下client接收到数据后作了什么,看下Nettyclient端的channelRead()方法(这里只看credit机制的)
decodeMsg()方法中
decodeBufferOrEvent()方法
在没有Credit机制的PartitionRequestClientHandler中
requestBuffer()方法就是请求memorySegmentPool中的memorySegment
这里不能确保能获取到,因此会用一个while(true)一直挂着
在Credit机制的CreditBasedPartitionRequestClientHandler中
请求requestBuffer()方法就是请求memorySegmentPool中的memorySegment由于信任机制在请求前就已经保证有足够的memorySegment因此不会请求不到,这里请求不到直接就抛异常了
而后OnBuffer( )方法
1处将将这个buffer加入到了这个receivedBuffers的ArrayDeque中,这里要注意receivedBuffers,这个queue后面会用到(后面处理数据就是循环的从这个queue中poll拉数据出来)
这里还要注意onBuffer方法还传入了backlog参数,这里是一个积压的数据量
接着会根据积压的数据量
当可用的buffer数 <(挤压的数据量 + 已经分配给信任Credit的buffer量) 时,就会向Pool中继续请求buffer,这里请求不到也会一直while造成柱塞反压
而后经过notifyCreditAvailable()方法发送Credit,具体来看一下
可用看到这里就触发了前面说到的向上游发送Credit的方法了
到这里,Nettyclient端的初始化以及Netty的处理逻辑就讲完了
如今回到最最开始的地方
requestPartition()那里建立nettyclient后
currentChannel.getNextBuffer()方法中
前面咱们说到的NettyClient端channelRead读取数据后会把数据放到一个recivedBuffers的queue中,这里就是去那个queue中取数据而后返回到咱们的 数据接收的地方StreamInputProcessor.java中processInput()方法中的获得上游数据之后,就是开始执行咱们用户的代码了调用processElement方法了。
而后while(true)开始了下一轮拉取数据而后处理的过程