从moquette源码看IOT接入协议MQTT的实现

背景

阅读优秀的代码是一种享受,将优秀的代码用本身的世界观优秀地描述出来就十分痛苦了是要死一亿个脑细胞的。html

这篇源码阅读笔记早在一年前就有了当时只是简单的记录一下本身的总结,最近将她从新整理一下但愿能帮助有须要的人。node

随着移动互联网快速进入后半场,愈来愈多的企业将注意力转移到物联网。好比共享单车和小米的智能家居产品等都是典型的物联网应用。算法

企业相信借助于大数据和AI技术能够得到不少额外的价值产生新的商业模式。海量数据须要经过接入服务才能流向后端产生后续价值,在接入服务中MQTT已成为物联网中非明确的标准协议国内外云厂均有其broker实现。后端

特性

MQTT协议是为大量计算能力有限,且工做在低带宽、不可靠的网络的远程传感器和控制设备通信而设计的协议,它具备如下主要的几项特性:bash

  1. 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合
  2. 对负载内容屏蔽的消息传输
  3. 使用 TCP/IP 提供网络链接
  4. 有三种消息发布服务质量
    • “至多一次”,消息发布彻底依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于以下状况,环境传感器数据,丢失一次读记录无所谓,由于不久后还会有第二次发送。
    • “至少一次”,确保消息到达,但消息重复可能会发生。
    • “只有一次”,确保消息到达一次。这一级别可用于以下状况,在计费系统中,消息重复或丢失会致使不正确的结果。
  5. 小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以下降网络流量
  6. 使用 Last Will (遗嘱)和 Testament 特性通知有关各方客户端异常中断的机制

==下文中会对上述特性的实现方式进行讲解==服务器

术语

image

客户端Client

使用MQTT的程序或者设备,如环境监控传感器、共享单车、共享充电宝等。网络

服务端Server

一个程序或设备,做为发送消息的客户端和请求订阅的客户端之间的中介。session

发布、订阅流程

客户端-A 给 客户端-B 发送消息“hello”流程以下:并发

  1. 客户端-B订阅名称为msg的主题
  2. 客户端-A向服务端-Server发送“hello”,并指明发送给名为msg的主题
  3. 服务端-Server向客户端-B转发消息“hello”

有别于HTTP协议的请求响应模式,客户端-A与客户端-B不发生直接链接关系,他们之间的消息传递经过服务端Server进行转发。 服务端Server又称 MQTT Broker 即订阅和发送的中间人app

基于moquette源码的特性实现分析

在上述的客户端-A 给 客户端-B 发送消息“hello”流程中须要有以下动做。

  1. 客户端-A 、 客户端-B 链接到服务端Server
  2. 客户端-B 订阅主题
  3. 客户端-A 发布消息
  4. 服务端Server 转发消息
  5. 客户端-B 收到消息

下面将基于链接、订阅、发布这几个动做进行源码跟踪解读。

链接

image

基本概念:

Session:会话即客户端(由ClientId做为标示)和服务端之间逻辑层面的通讯;生命周期(存在时间):会话 >= 网络链接。

ClientID:客户端惟一标识,服务端用于关联一个Session 只能包含这些 大写字母,小写字母 和 数字(0-9a-zA-Z),23个字符之内 若是 ClientID 在屡次 TCP链接中保持一致,客户端和服务器端会保留会话信息(Session) 同一时间内 Server 和同一个 ClientID 只能保持一个 TCP 链接,再次链接会踢掉前一个。

CleanSession:在Connect时,由客户端设置

  • 0 开启会话重用机制。网络断开重连后,恢复以前的Session信息。须要客户端和服务器有相关Session持久化机制;
  • 1 关闭会话重用机制。每次Connect都是一个新Session,会话仅持续和网络链接一样长的时间。

Keep Alive:目的是保持长链接的可靠性,以及双方对彼此是否在线的确认。 客户端在Connect的时候设置 Keep Alive 时长。若是服务端在 1.5 * KeepAlive 时间内没有收到客户端的报文,它必须断开客户端的网络链接 Keep Alive 的值由具体应用指定,通常是几分钟。容许的最大值是 18 小时 12 分 15 秒。

Will:遗嘱消息(Will Message)存储在服务端,当网络链接关闭时,服务端必须发布这个遗嘱消息,因此被形象地称之为遗嘱,可用于通知异常断线。 客户端发送 DISCONNECT 关闭连接,遗嘱失效并删除 遗嘱消息发布的条件,包括: 服务端检测到了一个 I/O 错误或者网络故障 客户端在保持链接(Keep Alive)的时间内未能通信 客户端没有先发送 DISCONNECT 报文直接关闭了网络链接 因为协议错误服务端关闭了网络链接 相关设置项,须要在Connect时,由客户端指定。

Will Flag :遗嘱的总开关

  • 0 关闭遗嘱功能,Will QoS 和 Will Retain 必须为 0
  • 1 开启遗嘱功能,须要设置 Will Retain 和 Will QoS

Will QoS: 遗嘱消息 QoS可取值 0、一、2,含义与消息QoS相同

Will Retain:遗嘱是否保留

  • 0 遗嘱消息不保留,后面再订阅不会收到消息
  • 1 遗嘱消息保留,持久存储

Will Topic:遗嘱话题

Will Payload:遗嘱消息内容

链接流程

  1. 判断客户端链接时发送的MQTT协议版本号,非3.1和3.1.1版本发送协议不支持响应报文并在发送完成后关闭链接
  2. 在客户端配置了cleanSession=false 或者服务端不容许clientId不存在的状况下客户端若是未上传clientId发送协议不支持响应报文并在发送完成后关闭链接
  3. 判断用户名和密码是否合法
  4. 初始化链接对象并将链接对象引用放入链接管理中,若是发现链接管理中存在相同客户端ID的对象则关闭前一个链接并将新的链接对象放入链接管理中
  5. 根据客户端上传的心跳时间调整服务端当前链接的心跳判断时间(keepAlive * 1.5f)
  6. 遗嘱消息存储(当链接意外断开时向存储的主题发布消息)
  7. 发送链接成功响应
  8. 建立当前链接session
  9. 当cleanSession=false 发送当前session已经存储的消息
public void processConnect(Channel channel, MqttConnectMessage msg) {
        MqttConnectPayload payload = msg.payload();
        String clientId = payload.clientIdentifier();
        LOG.debug("Processing CONNECT message. CId={}, username={}", clientId, payload.userName());

        // 1. 判断客户端链接时发送的MQTT协议版本号,非3.1和3.1.1版本发送协议不支持响应报文并在发送完成后关闭链接
        if (msg.variableHeader().version() != MqttVersion.MQTT_3_1.protocolLevel()
                && msg.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
            MqttConnAckMessage badProto = connAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);

            LOG.error("MQTT protocol version is not valid. CId={}", clientId);
            channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        final boolean cleanSession = msg.variableHeader().isCleanSession();
        if (clientId == null || clientId.length() == 0) {
            // 2. 在客户端配置了cleanSession=false 或者服务端不容许clientId不存在的状况下客户端若是未上传clientId发送协议不支持响应报文并在发送完成后关闭链接
            if (!cleanSession || !this.allowZeroByteClientId) {
                MqttConnAckMessage badId = connAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED);

                channel.writeAndFlush(badId).addListener(FIRE_EXCEPTION_ON_FAILURE);
                channel.close().addListener(CLOSE_ON_FAILURE);
                LOG.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
                return;
            }

            // Generating client id.
            clientId = UUID.randomUUID().toString().replace("-", "");
            LOG.info("Client has connected with a server generated identifier. CId={}, username={}", clientId,
                payload.userName());
        }
        // 3. 判断用户名和密码是否合法
        if (!login(channel, msg, clientId)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        // 4. 初始化链接对象并将链接对象引用放入链接管理中,若是发现链接管理中存在相同客户端ID的对象则关闭前一个链接并将新的链接对象放入链接管理中
        ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
        final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
        if (existing != null) {
            LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
            existing.abort();
            //return;
            this.connectionDescriptors.removeConnection(existing);
            this.connectionDescriptors.addConnection(descriptor);
        }

        // 5. 根据客户端上传的心跳时间调整服务端当前链接的心跳判断时间(keepAlive * 1.5f)
        initializeKeepAliveTimeout(channel, msg, clientId);
        // 6. 遗嘱消息存储(当链接意外断开时向存储的主题发布消息)
        storeWillMessage(msg, clientId);
        // 7. 发送链接成功响应
        if (!sendAck(descriptor, msg, clientId)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        m_interceptor.notifyClientConnected(msg);

        if (!descriptor.assignState(SENDACK, SESSION_CREATED)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        // 8. 建立当前链接session
        final ClientSession clientSession = this.sessionsRepository.createOrLoadClientSession(clientId, cleanSession);
        // 9. 当cleanSession=false 发送当前session已经存储的消息
        if (!republish(descriptor, msg, clientSession)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        
        int flushIntervalMs = 500/* (keepAlive * 1000) / 2 */;
        setupAutoFlusher(channel, flushIntervalMs);

        final boolean success = descriptor.assignState(MESSAGES_REPUBLISHED, ESTABLISHED);
        if (!success) {
            channel.close().addListener(CLOSE_ON_FAILURE);
        }

        LOG.info("Connected client <{}> with login <{}>", clientId, payload.userName());
    }
复制代码

订阅

image

基本概念

订阅流程

  1. 订阅的主题校验(权限、主题path合法性)
  2. 在当前session中存储订阅的主题
  3. 采用全局tree结构存储订阅信息(主题和订阅者信息),用于消息转发时根据主题查找到对应的订阅者(tree结构和查找算法下一章节中介绍
  4. 发送订阅回应
  5. 扫描持久化的消息匹配到当前订阅主题的当即向此链接发送消息
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);

        RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
        SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
        if (currentStatus != null) {
            LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
                clientID, messageID);
            return;
        }
        String username = NettyUtils.userName(channel);
        // 一、订阅的主题校验(权限、主题path合法性)
        List<MqttTopicSubscription> ackTopics = doVerify(clientID, username, msg);
        MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);
        if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
            LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " +
                "messageId={}", clientID, messageID);
            return;
        }

        LOG.debug("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics);
        // 二、在当前session中存储订阅的主题
        List<Subscription> newSubscriptions = doStoreSubscription(ackTopics, clientID);

        // save session, persist subscriptions from session
        // 三、采用全局tree结构存储订阅信息(主题和订阅者信息),用于消息转发时根据主题查找到对应的订阅者
        for (Subscription subscription : newSubscriptions) {
            subscriptions.add(subscription);
        }

        LOG.debug("Sending SUBACK response CId={}, messageId={}", clientID, messageID);
        // 四、发送订阅回应
        channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);

        // fire the persisted messages in session
        // 五、扫描持久化的消息匹配到当前订阅主题的当即向此链接发送消息
        for (Subscription subscription : newSubscriptions) {
            publishRetainedMessagesInSession(subscription, username);
        }

        boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
        if (!success) {
            LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
        } else {
            LOG.info("Client <{}> subscribed to topics", clientID);
        }
    }
复制代码

发布

基本概念

Packet Identifier:报文标识存在报文的可变报头部分,非零两个字节整数 (0-65535]。

一个流程中重复:这些报文包含PacketID,并且在一次通讯流程内保持一致:PUBLISH(QoS>0时)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCIBE、UNSUBACK 。

新的不重复:客户端每次发送一个新的这些类型的报文时都必须分配一个当前 未使用的PacketID 当客户端处理完这个报文对应的确认后,这个报文标识符就释放可重用。

独立维护:客户端和服务端彼此独立地分配报文标识符。所以,客户端服务端组合使用相同的报文标识符能够实现并发的消息交换。客户端和服务端产生的Packet Identifier一致不算异常。

Payload: 有效载荷即消息体最大容许 256MB。 Publish 的 Payload 容许为空,在不少场合下表明将持久消息(或者遗嘱消息)清空。采用UTF-8编码。

Retain:持久消息(粘性消息)

RETAIN 标记:每一个Publish消息都须要指定的标记

  • 0 服务端不能存储这个消息,也不能移除或替换任何 现存的保留消息
  • 1 服务端必须存储这个应用消息和它的QoS等级,以便它能够被分发给将来的订阅者

每一个Topic只会保留最多一个 Retain 持久消息 客户端订阅带有持久消息的Topic,会当即受到这条消息。

服务器能够选择丢弃持久消息,好比内存或者存储吃紧的时候。

若是客户端想要删除某个Topic 上面的持久消息,能够向这个Topic发送一个Payload为空的持久消息 遗嘱消息(Will)的Retain持久机制同理。

QoS :服务等级(消息可靠性)

发布流程

public void processPublish(Channel channel, MqttPublishMessage msg) {
        final MqttQoS qos = msg.fixedHeader().qosLevel();
        final String clientId = NettyUtils.clientID(channel);
        LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId,
                msg.variableHeader().topicName(), msg.variableHeader().messageId(), qos);
        switch (qos) {
            case AT_MOST_ONCE:
                this.qos0PublishHandler.receivedPublishQos0(channel, msg);
                break;
            case AT_LEAST_ONCE:
                this.qos1PublishHandler.receivedPublishQos1(channel, msg);
                break;
            case EXACTLY_ONCE:
                this.qos2PublishHandler.receivedPublishQos2(channel, msg);
                break;
            default:
                LOG.error("Unknown QoS-Type:{}", qos);
                break;
        }
    }
复制代码

从上述代码的switch语句中能够看出会根据消息的Qos级别分别进行处理

QoS0 最多一次
sequenceDiagram
ClientA->>ServerBroker: 发送消息
ServerBroker->>ClientB: 发送消息
复制代码
  1. 权限判断
  2. 向全部该主题的订阅者发布消息
  3. QoS == 0 && retain => clean old retained
void receivedPublishQos0(Channel channel, MqttPublishMessage msg) {
        // verify if topic can be write
        final Topic topic = new Topic(msg.variableHeader().topicName());
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 权限判断
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }
        // route message to subscribers
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);
        // 2. 向全部该主题的订阅者发布消息
        this.publisher.publish2Subscribers(toStoreMsg, topic);

        if (msg.fixedHeader().isRetain()) {
            // 3. QoS == 0 && retain => clean old retained
            m_messagesStore.cleanRetained(topic);
        }

        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }
复制代码
QoS1 至少一次
sequenceDiagram
ClientA->>ServerBroker: 1.发送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存储消息
ServerBroker->>ClientA: 1.2发送消息回应PUBACK
ServerBroker->>ClientB: 2.发送消息
ClientB->>ServerBroker: 2.1发送消息回应PUBACK
ServerBroker->>ServerBroker: 2.2删除消息
复制代码

1.发送消息PUBLISH

  1. 权限判断
  2. 向全部该主题的订阅者发布消息(每一个session中存储即将要发送的消息)
  3. 发送Ack回应
  4. retain = true => 存储消息
void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {
        // verify if topic can be write
        final Topic topic = new Topic(msg.variableHeader().topicName());
        topic.getTokens();
        if (!topic.isValid()) {
            LOG.warn("Invalid topic format, force close the connection");
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 权限判断
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }

        final int messageID = msg.variableHeader().messageId();

        // route message to subscribers
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);

        // 2. 向全部该主题的订阅者发布消息(每一个session中存储即将要发送的消息)
        this.publisher.publish2Subscribers(toStoreMsg, topic, messageID);

        // 3. 发送Ack回应
        sendPubAck(clientID, messageID);

        // 4. retain = true => 存储消息
        if (msg.fixedHeader().isRetain()) {
            if (!msg.payload().isReadable()) {
                m_messagesStore.cleanRetained(topic);
            } else {
                // before wasn't stored m_messagesStore.storeRetained(topic, toStoreMsg); } } m_interceptor.notifyTopicPublished(msg, clientID, username); } 复制代码

2.1发送消息回应PUBACK

服务端Server接收到PUBACK消息后将执行:

  1. 删除存储在session中的消息
public void processPubAck(Channel channel, MqttPubAckMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = msg.variableHeader().messageId();
        String username = NettyUtils.userName(channel);
        LOG.trace("retrieving inflight for messageID <{}>", messageID);

        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        StoredMessage inflightMsg = targetSession.inFlightAcknowledged(messageID);

        String topic = inflightMsg.getTopic();
        InterceptAcknowledgedMessage wrapped = new InterceptAcknowledgedMessage(inflightMsg, topic, username,
                                                                                messageID);
        m_interceptor.notifyMessageAcknowledged(wrapped);
    }
复制代码
QoS2 有且仅有一次
sequenceDiagram
ClientA->>ServerBroker: 1.发送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存储消息
ServerBroker->>ClientA: 1.2发送消息回应Rec
ClientA->>ServerBroker: 2.发送消息Rel
ServerBroker->>ServerBroker: 2.1删除消息
ServerBroker->>ServerBroker: 2.2存储消息到发送列队
ServerBroker->>ClientB: 2.3发送消息
ServerBroker->>ClientA: 2.4发送消息回应Comp
ClientB->>ServerBroker: 3.发送消息回应Rec
ServerBroker->>ServerBroker: 3.1删除2.2中存储的消息(一次确认)
ServerBroker->>ServerBroker: 3.2存储消息
ServerBroker->>ClientB: 3.3发送消息Rel
ClientB->>ServerBroker: 3.4发送消息回应Comp
ServerBroker->>ServerBroker: 3.5删除消息(二次确认)
复制代码

1.发送消息PUBLISH

  1. 权限判断
  2. 存储消息
  3. 发送Rec回应
void receivedPublishQos2(Channel channel, MqttPublishMessage msg) {
        final Topic topic = new Topic(msg.variableHeader().topicName());
        // check if the topic can be wrote
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 权限判断
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }
        final int messageID = msg.variableHeader().messageId();

        // 2. 存储消息
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);

        LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", clientID, topic, messageID);
        if (LOG.isTraceEnabled()) {
            LOG.trace("payload={}, subs Tree={}", payload2Str(toStoreMsg.getPayload()), subscriptions.dumpTree());
        }

        this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageID, toStoreMsg);

        // 3. 发送Rec回应
        sendPubRec(clientID, messageID);

        // Next the client will send us a pub rel
        // NB publish to subscribers for QoS 2 happen upon PUBREL from publisher

//        if (msg.fixedHeader().isRetain()) {
//            if (msg.payload().readableBytes() == 0) {
//                m_messagesStore.cleanRetained(topic);
//            } else {
//                m_messagesStore.storeRetained(topic, toStoreMsg);
//            }
//        }
        //TODO this should happen on PUB_REL, else we notify false positive
        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }
复制代码

2.发送消息Rel

  1. 删除消息
  2. 转发消息
  3. 发送Comp 回应给客户端-A
void processPubRel(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.info("Processing PUBREL message. CId={}, messageId={}", clientID, messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // 1. 删除消息
        IMessagesStore.StoredMessage evt = targetSession.inboundInflight(messageID);
        if (evt == null) {
            LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", clientID, messageID);
            throw new IllegalArgumentException("Can't find inbound inflight message");
        }
        final Topic topic = new Topic(evt.getTopic());

        // 2. 转发消息
        this.publisher.publish2Subscribers(evt, topic, messageID);

        if (evt.isRetained()) {
            if (evt.getPayload().readableBytes() == 0) {
                m_messagesStore.cleanRetained(topic);
            } else {
                m_messagesStore.storeRetained(topic, evt);
            }
        }

        //TODO here we should notify to the listeners
        //m_interceptor.notifyTopicPublished(msg, clientID, username);
        // 3.发送Comp 回应
        sendPubComp(clientID, messageID);
    }
复制代码

3.发送消息回应Rec

  1. 删除消息
  2. 存储消息(分别存储在secondPhaseStore和outboundInflightMap)
  3. 发送PUBREL
public void processPubRec(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // remove from the inflight and move to the QoS2 second phase queue
        // 1. 删除消息
        StoredMessage ackedMsg = targetSession.inFlightAcknowledged(messageID);
        // 2. 存储消息(分别存储在secondPhaseStore和outboundInflightMap)
        targetSession.moveInFlightToSecondPhaseAckWaiting(messageID, ackedMsg);
        // once received a PUBREC reply with a PUBREL(messageID)
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
        // 3. 发送PUBREL
        MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
        MqttMessage pubRelMessage = new MqttMessage(pubRelHeader, from(messageID));
        channel.writeAndFlush(pubRelMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
    }
复制代码

3.4发送消息回应Comp

  1. 删除消息
public void processPubComp(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", clientID, messageID);
        // once received the PUBCOMP then remove the message from the temp memory
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // 1. 删除消息
        StoredMessage inflightMsg = targetSession.completeReleasedPublish(messageID);
        String username = NettyUtils.userName(channel);
        String topic = inflightMsg.getTopic();
        final InterceptAcknowledgedMessage interceptAckMsg = new InterceptAcknowledgedMessage(inflightMsg, topic,
            username, messageID);
        m_interceptor.notifyMessageAcknowledged(interceptAckMsg);
    }
复制代码

Topic & Subcribe

基本概念

Topic 话题 和 TopicFilter 话题过滤器

Pub-Sub消息模型的核心机制 UTF-8 编码字符串,不能超过 65535 字节。层级数量没有限制 不能包含任何的下文中提到的特殊符号(/、+、#),必须至少包含一个字符
区分大小写,能够包含空格,不能包含空字符 (Unicode U+0000)
在收部或尾部增长 斜杠 “/”,会产生不一样的Topic和TopicFilter。举例:

  • “/A” 和 “A” 是不一样的
  • “A” 和 “A/” 是不一样的

只包含斜杠 “/” 的 Topic 或 TopicFilter 是合法的

TopicFilter中的特殊符号

层级分隔符 / 用于分割主题的每一个层级,为主题名提供一个分层结构
主题层级分隔符能够出如今 Topic 或 TopicFilter 的任何位置
特例:相邻的主题层次分隔符表示一个零长度的主题层级

单层通配符 +

只能用于单个主题层级匹配的通配符。例如,“a/b/+” 匹配 “a/b/c1” 和 “a/b/c2” ,可是不匹配 “a/b/c/d”
能够匹配 任意层级,包括第一个和最后一个层级。

例如,“+” 是有效的,“sport/+/player1” 也是有效的。 能够在多个层级中使用它,也能够和多层通配符一块儿使用。

例如,“+/tennis/#” 是有效的。只能匹配本级不能匹配上级。

例如,“sport/+” 不匹配 “sport” 可是却匹配“sport/”,“/finance” 匹配 “+/+” 和 “/+” ,可是不匹配 “+”。

多层通配符 #

用于匹配主题中任意层级的通配符 匹配包含自己的层级和子层级。

例如 “a/b/c/#" 能够匹配 “a/b/c”、“a/b/c/d” 和 “a/b/c/d/e” 必须是最后的结尾。

例如 “sport/tennis/#/ranking”是无效的

“#”是有效的,会收到全部的应用消息。 (服务器端应将此类 TopicFilter禁掉 )

以$开头的,服务器保留

服务端不能将 $ 字符开头的 Topic 匹配通配符 (#或+) 开头的 TopicFilter

服务端应该阻止客户端使用这种 Topic 与其它客户端交换消息。

服务端实现能够将 $ 开头的主题名用做其余目的。

SYS/ 被普遍用做包含服务器特定信息或控制接口的主题的前缀
客户端不特地订阅开头的 Topic,就不会收到对应的消息

  • 订阅 “#” 的客户端不会收到任何发布到以 “$” 开头主题的消息
  • 订阅 “+/A/B” 的客户端不会收到任何发布到 “$SYS/A/B” 的消息
  • 订阅 “SYS/#” 的客户端会收到发布到以 “SYS/” 开头主题的消息
  • 订阅 “SYS/A/+” 的客户端会收到发布到 “SYS/A/B” 主题的消息

若是客户端想同时接受以 “SYS/” 开头主题的消息和不以 开头主题的消息,它须要同时 订阅 “#” 和 “$SYS/#”

存储结构

  • a/b/c
  • a/a
  • a/haha
  • msg

这4个主题会存储成以下结构:

  1. children 指向下层节点
  2. subscriptions 存储当前主题全部的订阅者

image

查找算法

订阅
@Override
    public void add(Subscription newSubscription) {
        Action res;
        do {
            res = insert(newSubscription.clientId, newSubscription.topicFilter, this.root, newSubscription.topicFilter);
        } while (res == Action.REPEAT);
    }

    private Action insert(String clientId, Topic topic, final INode inode, Topic fullpath) {
        Token token = topic.headToken();
        if (!topic.isEmpty() && inode.mainNode().anyChildrenMatch(token)) {
            Topic remainingTopic = topic.exceptHeadToken();
            INode nextInode = inode.mainNode().childOf(token);
            return insert(clientId, remainingTopic, nextInode, fullpath);
        } else {
            if (topic.isEmpty()) {
                return insertSubscription(clientId, fullpath, inode);
            } else {
                return createNodeAndInsertSubscription(clientId, topic, inode, fullpath);
            }
        }
    }
复制代码
删除订阅
public void removeSubscription(Topic topic, String clientID) {
        Action res;
        do {
            res = remove(clientID, topic, this.root, NO_PARENT);
        } while (res == Action.REPEAT);
    }

    private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
        Token token = topic.headToken();
        if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) {
            Topic remainingTopic = topic.exceptHeadToken();
            INode nextInode = inode.mainNode().childOf(token);
            return remove(clientId, remainingTopic, nextInode, inode);
        } else {
            final CNode cnode = inode.mainNode();
            if (cnode instanceof TNode) {
                // this inode is a tomb, has no clients and should be cleaned up
                // Because we implemented cleanTomb below, this should be rare, but possible
                // Consider calling cleanTomb here too
                return Action.OK;
            }
            if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
                // last client to leave this node, AND there are no downstream children, remove via TNode tomb
                if (inode == this.root) {
                    return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
                }
                TNode tnode = new TNode();
                return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
            } else if (cnode.contains(clientId) && topic.isEmpty()) {
                CNode updatedCnode = cnode.copy();
                updatedCnode.removeSubscriptionsFor(clientId);
                return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
            } else {
                //someone else already removed
                return Action.OK;
            }
        }
    }
复制代码
查找
Set<Subscription> recursiveMatch(Topic topic, INode inode) {
        CNode cnode = inode.mainNode();
        if (Token.MULTI.equals(cnode.token)) {
            return cnode.subscriptions;
        }
        if (topic.isEmpty()) {
            return Collections.emptySet();
        }
        if (cnode instanceof TNode) {
            return Collections.emptySet();
        }
        final Token token = topic.headToken();
        if (!(Token.SINGLE.equals(cnode.token) || cnode.token.equals(token) || ROOT.equals(cnode.token))) {
            return Collections.emptySet();
        }
        Topic remainingTopic = (ROOT.equals(cnode.token)) ? topic : topic.exceptHeadToken();
        Set<Subscription> subscriptions = new HashSet<>();
        if (remainingTopic.isEmpty()) {
            subscriptions.addAll(cnode.subscriptions);
        }
        for (INode subInode : cnode.allChildren()) {
            subscriptions.addAll(recursiveMatch(remainingTopic, subInode));
        }
        return subscriptions;
    }
复制代码

尾巴

相关参考

MQTT协议通俗讲解

相关文章
相关标签/搜索