moquette改造笔记(五):设备链接频繁上下线或者相互顶替出现的设备上下线状态错乱问题

发现问题

在moquette使用中发如今设备频繁上下线和两个设备ClientId同样相互顶替链接的状况下,InterceptHandler的onConnect和onConnectionLost的方法调用没有前后顺序,若是在这两个方法里面来记录设备上下线状态,会形成状态不对。异步

io.moquette.spi.impl.ProtocolProcessor中的processConnect(Channel channel, MqttConnectMessage msg)部分代码以下ide

ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
        final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
        if (existing != null) {
            LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
            existing.abort();
            this.connectionDescriptors.removeConnection(existing);
            this.connectionDescriptors.addConnection(descriptor);
        }

        initializeKeepAliveTimeout(channel, msg, clientId);
        storeWillMessage(msg, clientId);
        if (!sendAck(descriptor, msg, clientId)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        m_interceptor.notifyClientConnected(msg);

能够看到existing.abort();后会m_interceptor.notifyClientConnected(msg); 先断开原来的链接,而后接着通知上线。因为Netty自己就是异步的,再加上InterceptHandler相关方法的调用都是在线程池中进行的,所以nterceptHandler的onConnect和onConnectionLost的方法调用前后顺序是没法保证的性能

解决方法

在ChannelHandler链中添加一个handler,专门处理设备上线事件,对于相同ClientId的链接已经存在时,链接断开和链接事件强制加上时序。this


@Sharable
public class AbrotExistConnectionMqttHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(AbrotExistConnectionMqttHandler.class);
    private final ProtocolProcessor m_processor;

    private static final ReentrantLock[] locks = new ReentrantLock[8];

    static {
        for (int i = 0; i < locks.length; i++) {
            locks[i] = new ReentrantLock();
        }
    }

    public AbrotExistConnectionMqttHandler(ProtocolProcessor m_processor) {
        this.m_processor = m_processor;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
        MqttMessage msg = (MqttMessage) message;
        MqttMessageType messageType = msg.fixedHeader().messageType();
        LOG.debug("Processing MQTT message, type: {}", messageType);

        if (messageType != MqttMessageType.CONNECT) {
            super.channelRead(ctx, message);
            return;
        }

        MqttConnectMessage connectMessage = (MqttConnectMessage) msg;
        String clientId = connectMessage.payload().clientIdentifier();

        /**
         * 经过锁和sleep来解决设备互顶出现的设备上线和下线回调时序错乱的问题
         * 目前解决的方式经过sleep不是太好
         * 解决了多个链接互相顶替出现的问题(有一个链接先链接的状况)
         * */
        ReentrantLock lock = locks[Math.abs(clientId.hashCode()) % locks.length];
        lock.lock();
        try {
            if (!m_processor.isConnected(clientId)) {
                super.channelRead(ctx, message);
                return;
            }
            m_processor.abortIfExist(clientId);
            Thread.sleep(50);
            super.channelRead(ctx, message);
            Thread.sleep(30);
        } catch (Exception ex) {
            ex.printStackTrace();
            super.channelRead(ctx, message);
        } finally {
            lock.unlock();
        }

    }
}

解释:
1.经过ReentrantLock lock = locks[Math.abs(clientId.hashCode()) % locks.length];来保证相同的ClientId的链接都会得到同一个锁
2.经过两次Thread.sleep(50);将断开链接和处理设备上线变成前后顺序关系。
3.由于相互顶替的状况并很少见,所以两个Thread.sleep()也能够接受,在性能上并不会形成多大影响。线程

相关文章
相关标签/搜索