MQTT Paho Qos为0时,并发publish会出现阻塞的状况,同时后续发布一直提示Too many publishes in progress (32202)

如下为使用的paho jar包:java

<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>git

在项目测试的过程当中测试同窗发现了Too many publishes in progress (32202)的问题,发布消息时对actualInFlight 与 maxInflight二者进行判断,若actualInFlight  >= maxInflight就会抛出这个异常,这个值能够设置,可是不能解决根本问题。github

同时发现快速的重复发布同一个Qos为0 的topic会出先阻塞的状况,进一步跟踪发现发布的过程当中会调用Token的awitUtilSent的方法,里面调用了this.sentLock.wait()方法等待消息发完,发完以后会调用ClientState的notifySent方法,这里调用了sentLock.notifyAll(),唤醒wait(),发布完成。并发

Qos为0的消息存储的token的Key所有为0(后面的token会将前面的覆盖),在并发较高场景下会出现如下状况,上一条消息发送完后removeToken将后面消息的token 移除了(remove的时候是根据key remove),token被remove掉以后致使后面的消息拿不到token,没法调用notifyAll的方法,另外一边的awit方法就一直阻塞。eclipse

同时发送消息时actualInFlight  会自增,发完以后notifySent方法会自减,可是这里token被remove掉致使不会调用到到这个方法,actualInFlight  只增不减,因此在后面每次发布都报这个错误。测试

解决方法:在新版本的jar包中修复了这个问题,将token放在了message中,且Qos为0时不会将token 存到tokenStore中,解决了token丢失的问题this

新的paho依赖:spa

<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.1</version>
</dependency>token

修改的地方:https://github.com/eclipse/paho.mqtt.java/pull/563/commits/cd00fa364a0d0475cb3c748de3ca8bc0ab4b7e10ip

 

新人不知道咋贴代码,后续有空贴出代码更直观

相关文章
相关标签/搜索