Flink中发送端反压以及Credit机制(源码分析)

上一篇《Flink接收端反压机制》说到由于Flink每一个Task的接收端和发送端是共享一个bufferPool的,造成了自然的反压机制,当Task接收数据的时候,接收端会根据积压的数据量以及可用的buffer数量(可用的memorySegment数)来决定是否向上游发送Credit(简而言之就是当我还有空间的时候,我向上游也就是上一个Task的发送端发送一个ack消息,代表我还有空间你能够发送数据过来,若是下游没有给你Credit就证实下游已经堵了,没有空间了也就不能继续往下游发送了)html

如今从源码来看一下Task的数据发送端,也就是Netty的Server端的实现java

先看Task初始化的时候TaskManagerRunner.java中startTaskManager()方法中bootstrap

 这个connectionManager其实分为两种,Netty,local一看就知道netty这种确定是对应须要经过网络传输,本地模式这里就不讲了网络

 

这个地方看到Flink的client和server都初始化了,须要注意的是其实这个地方client端只是初始化了一些配置,并无调用bind()方法启动起来,这里看过上一篇文章的同窗就会知道,client只有当第一次须要拉取上游subpatition数据的时候才会启动起来也就是bind(),tcp

而server端在这里也就是task启动的时候就启动起来了,继续看server端如何启动的server.init()方法spa

 

 init方法中,这里能够看到,这是Flink1.6之前只有基于netty的tcp网络层反压,这里是经过bootstrap的两个参数3d

    ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK  大小为两倍的memorySegment大小netty

    ChannelOption.WRITE_BUFFER_LOW_WATER_MARK   大小为memorySegment + 1server

接着htm

  

       

1处2处常规的Netty定长编解码器,没有什么好说的

看看3处,4这里先不讲后面会提到

 看到3是一个inboundHandler,反压机制时他的用处是用来接收来自下游响应的Credit,来看他的ChannelRead0方法

 当接收到的消息是一个Credit信任的时候

 

 

 先是

 

 

 增长了这个reader的可用的Credit可用数

而后

 

 

 其实了解了接收端的反压,发送端接收到了下游的credit,那发送数据的时候确定有一个地方会先判断是否有可用的Credit才决定是否往下发数据

其实就是这个带星号的地方判断,而后下面就是常规的从queue中拉取reader往netty下游writeAndFlash()数据了,没什么好讲的

来看一下他判断Credit是否知足的地方

 

 

能够看到只有当

    有数据且可用的Credit数量大于0

    或者有数据且数据是一个事件而不是record的时候,才返回true往下游发送

 

能够看到这个 enqueueAvailableReader()方法比较重要,里面包含了判断credit以及日后下游发送数据的逻辑

那这个enqueueAvailableReader()方法除了会在接收到下游的Credit的时候触发一次,还有哪会被触发呢

既然是往下游发送数据那咱们task处理完数据之后应该也会调用这个方法

因而来看一下Task发送数据,之前的文章讲过,这里就不赘述了,直接看到RecordWriterOutput的emit()

 

 

 

 

 

会先将record写入到这个Serializer里面去

而后copyFromSerializerToTargetChannel()方法中

 

 

 先去localBufferPool中请求buffer,这里就是反压了

请求到buffer了之后

 

 

这个调用链有点长不全列出来了

最后

 

 

 这个requestQueue实际上是前面Netty初始化时具体逻辑中的4,是一个ChannelInboundHandlerAdapter

 

 

 

 这个Inbound一开始我也很疑惑,这个Inbound没有重写他的channelRead()方法,那这个不就直接转发数据了吗,那他的做用是干吗的呢

继续往下看

 

 

原来发送数据的时候会触发这个inbound的eventTrigger

看下userEventTriggered()具体触发了什么

 

这个方法就很眼熟了,就是前面到接收到下游发送过来的Credit时会触发一次的方法,用来判断是否有Credit以及经过netty往下游发送数据

这里在发送数据的时候果真又触发了,后面就是判断是否有Credit知足往下游发送数据的条件,而后往下游发送数据

也就是说

    当接收到下游返回的Credit的时候会触发一次,是否能往下游写数据的判断并拉queue数据写数据

    每次Task处理完数据之后emit,也会触发一次判断并拉queue数据写数据

相关文章
相关标签/搜索