阅读优秀的代码是一种享受,将优秀的代码用本身的世界观优秀地描述出来就十分痛苦了是要死一亿个脑细胞的。html
这篇源码阅读笔记早在一年前就有了当时只是简单的记录一下本身的总结,最近将她从新整理一下但愿能帮助有须要的人。node
随着移动互联网快速进入后半场,愈来愈多的企业将注意力转移到物联网。好比共享单车和小米的智能家居产品等都是典型的物联网应用。算法
企业相信借助于大数据和AI技术能够得到不少额外的价值产生新的商业模式。海量数据须要经过接入服务才能流向后端产生后续价值,在接入服务中MQTT已成为物联网中非明确的标准协议国内外云厂均有其broker实现。后端
MQTT协议是为大量计算能力有限,且工做在低带宽、不可靠的网络的远程传感器和控制设备通信而设计的协议,它具备如下主要的几项特性:bash
==下文中会对上述特性的实现方式进行讲解==服务器
使用MQTT的程序或者设备,如环境监控传感器、共享单车、共享充电宝等。网络
一个程序或设备,做为发送消息的客户端和请求订阅的客户端之间的中介。session
客户端-A 给 客户端-B 发送消息“hello”流程以下:并发
有别于HTTP协议的请求响应模式,客户端-A与客户端-B不发生直接链接关系,他们之间的消息传递经过服务端Server进行转发。 服务端Server又称 MQTT Broker 即订阅和发送的中间人app
在上述的客户端-A 给 客户端-B 发送消息“hello”流程中须要有以下动做。
下面将基于链接、订阅、发布这几个动做进行源码跟踪解读。
Session:会话即客户端(由ClientId做为标示)和服务端之间逻辑层面的通讯;生命周期(存在时间):会话 >= 网络链接。
ClientID:客户端惟一标识,服务端用于关联一个Session 只能包含这些 大写字母,小写字母 和 数字(0-9a-zA-Z),23个字符之内 若是 ClientID 在屡次 TCP链接中保持一致,客户端和服务器端会保留会话信息(Session) 同一时间内 Server 和同一个 ClientID 只能保持一个 TCP 链接,再次链接会踢掉前一个。
CleanSession:在Connect时,由客户端设置
Keep Alive:目的是保持长链接的可靠性,以及双方对彼此是否在线的确认。 客户端在Connect的时候设置 Keep Alive 时长。若是服务端在 1.5 * KeepAlive 时间内没有收到客户端的报文,它必须断开客户端的网络链接 Keep Alive 的值由具体应用指定,通常是几分钟。容许的最大值是 18 小时 12 分 15 秒。
Will:遗嘱消息(Will Message)存储在服务端,当网络链接关闭时,服务端必须发布这个遗嘱消息,因此被形象地称之为遗嘱,可用于通知异常断线。 客户端发送 DISCONNECT 关闭连接,遗嘱失效并删除 遗嘱消息发布的条件,包括: 服务端检测到了一个 I/O 错误或者网络故障 客户端在保持链接(Keep Alive)的时间内未能通信 客户端没有先发送 DISCONNECT 报文直接关闭了网络链接 因为协议错误服务端关闭了网络链接 相关设置项,须要在Connect时,由客户端指定。
Will Flag :遗嘱的总开关
Will QoS: 遗嘱消息 QoS可取值 0、一、2,含义与消息QoS相同
Will Retain:遗嘱是否保留
Will Topic:遗嘱话题
Will Payload:遗嘱消息内容
public void processConnect(Channel channel, MqttConnectMessage msg) {
MqttConnectPayload payload = msg.payload();
String clientId = payload.clientIdentifier();
LOG.debug("Processing CONNECT message. CId={}, username={}", clientId, payload.userName());
// 1. 判断客户端链接时发送的MQTT协议版本号,非3.1和3.1.1版本发送协议不支持响应报文并在发送完成后关闭链接
if (msg.variableHeader().version() != MqttVersion.MQTT_3_1.protocolLevel()
&& msg.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
MqttConnAckMessage badProto = connAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
LOG.error("MQTT protocol version is not valid. CId={}", clientId);
channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
final boolean cleanSession = msg.variableHeader().isCleanSession();
if (clientId == null || clientId.length() == 0) {
// 2. 在客户端配置了cleanSession=false 或者服务端不容许clientId不存在的状况下客户端若是未上传clientId发送协议不支持响应报文并在发送完成后关闭链接
if (!cleanSession || !this.allowZeroByteClientId) {
MqttConnAckMessage badId = connAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
channel.writeAndFlush(badId).addListener(FIRE_EXCEPTION_ON_FAILURE);
channel.close().addListener(CLOSE_ON_FAILURE);
LOG.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
return;
}
// Generating client id.
clientId = UUID.randomUUID().toString().replace("-", "");
LOG.info("Client has connected with a server generated identifier. CId={}, username={}", clientId,
payload.userName());
}
// 3. 判断用户名和密码是否合法
if (!login(channel, msg, clientId)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
// 4. 初始化链接对象并将链接对象引用放入链接管理中,若是发现链接管理中存在相同客户端ID的对象则关闭前一个链接并将新的链接对象放入链接管理中
ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
if (existing != null) {
LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
existing.abort();
//return;
this.connectionDescriptors.removeConnection(existing);
this.connectionDescriptors.addConnection(descriptor);
}
// 5. 根据客户端上传的心跳时间调整服务端当前链接的心跳判断时间(keepAlive * 1.5f)
initializeKeepAliveTimeout(channel, msg, clientId);
// 6. 遗嘱消息存储(当链接意外断开时向存储的主题发布消息)
storeWillMessage(msg, clientId);
// 7. 发送链接成功响应
if (!sendAck(descriptor, msg, clientId)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
m_interceptor.notifyClientConnected(msg);
if (!descriptor.assignState(SENDACK, SESSION_CREATED)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
// 8. 建立当前链接session
final ClientSession clientSession = this.sessionsRepository.createOrLoadClientSession(clientId, cleanSession);
// 9. 当cleanSession=false 发送当前session已经存储的消息
if (!republish(descriptor, msg, clientSession)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
int flushIntervalMs = 500/* (keepAlive * 1000) / 2 */;
setupAutoFlusher(channel, flushIntervalMs);
final boolean success = descriptor.assignState(MESSAGES_REPUBLISHED, ESTABLISHED);
if (!success) {
channel.close().addListener(CLOSE_ON_FAILURE);
}
LOG.info("Connected client <{}> with login <{}>", clientId, payload.userName());
}
复制代码
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);
RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
if (currentStatus != null) {
LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
clientID, messageID);
return;
}
String username = NettyUtils.userName(channel);
// 一、订阅的主题校验(权限、主题path合法性)
List<MqttTopicSubscription> ackTopics = doVerify(clientID, username, msg);
MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);
if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " +
"messageId={}", clientID, messageID);
return;
}
LOG.debug("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics);
// 二、在当前session中存储订阅的主题
List<Subscription> newSubscriptions = doStoreSubscription(ackTopics, clientID);
// save session, persist subscriptions from session
// 三、采用全局tree结构存储订阅信息(主题和订阅者信息),用于消息转发时根据主题查找到对应的订阅者
for (Subscription subscription : newSubscriptions) {
subscriptions.add(subscription);
}
LOG.debug("Sending SUBACK response CId={}, messageId={}", clientID, messageID);
// 四、发送订阅回应
channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
// fire the persisted messages in session
// 五、扫描持久化的消息匹配到当前订阅主题的当即向此链接发送消息
for (Subscription subscription : newSubscriptions) {
publishRetainedMessagesInSession(subscription, username);
}
boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
if (!success) {
LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
} else {
LOG.info("Client <{}> subscribed to topics", clientID);
}
}
复制代码
Packet Identifier:报文标识存在报文的可变报头部分,非零两个字节整数 (0-65535]。
一个流程中重复:这些报文包含PacketID,并且在一次通讯流程内保持一致:PUBLISH(QoS>0时)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCIBE、UNSUBACK 。
新的不重复:客户端每次发送一个新的这些类型的报文时都必须分配一个当前 未使用的PacketID 当客户端处理完这个报文对应的确认后,这个报文标识符就释放可重用。
独立维护:客户端和服务端彼此独立地分配报文标识符。所以,客户端服务端组合使用相同的报文标识符能够实现并发的消息交换。客户端和服务端产生的Packet Identifier一致不算异常。
Payload: 有效载荷即消息体最大容许 256MB。 Publish 的 Payload 容许为空,在不少场合下表明将持久消息(或者遗嘱消息)清空。采用UTF-8编码。
Retain:持久消息(粘性消息)
RETAIN 标记:每一个Publish消息都须要指定的标记
每一个Topic只会保留最多一个 Retain 持久消息 客户端订阅带有持久消息的Topic,会当即受到这条消息。
服务器能够选择丢弃持久消息,好比内存或者存储吃紧的时候。
若是客户端想要删除某个Topic 上面的持久消息,能够向这个Topic发送一个Payload为空的持久消息 遗嘱消息(Will)的Retain持久机制同理。
QoS :服务等级(消息可靠性)
public void processPublish(Channel channel, MqttPublishMessage msg) {
final MqttQoS qos = msg.fixedHeader().qosLevel();
final String clientId = NettyUtils.clientID(channel);
LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId,
msg.variableHeader().topicName(), msg.variableHeader().messageId(), qos);
switch (qos) {
case AT_MOST_ONCE:
this.qos0PublishHandler.receivedPublishQos0(channel, msg);
break;
case AT_LEAST_ONCE:
this.qos1PublishHandler.receivedPublishQos1(channel, msg);
break;
case EXACTLY_ONCE:
this.qos2PublishHandler.receivedPublishQos2(channel, msg);
break;
default:
LOG.error("Unknown QoS-Type:{}", qos);
break;
}
}
复制代码
从上述代码的switch语句中能够看出会根据消息的Qos级别分别进行处理
sequenceDiagram
ClientA->>ServerBroker: 发送消息
ServerBroker->>ClientB: 发送消息
复制代码
void receivedPublishQos0(Channel channel, MqttPublishMessage msg) {
// verify if topic can be write
final Topic topic = new Topic(msg.variableHeader().topicName());
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 权限判断
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
// route message to subscribers
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
// 2. 向全部该主题的订阅者发布消息
this.publisher.publish2Subscribers(toStoreMsg, topic);
if (msg.fixedHeader().isRetain()) {
// 3. QoS == 0 && retain => clean old retained
m_messagesStore.cleanRetained(topic);
}
m_interceptor.notifyTopicPublished(msg, clientID, username);
}
复制代码
sequenceDiagram
ClientA->>ServerBroker: 1.发送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存储消息
ServerBroker->>ClientA: 1.2发送消息回应PUBACK
ServerBroker->>ClientB: 2.发送消息
ClientB->>ServerBroker: 2.1发送消息回应PUBACK
ServerBroker->>ServerBroker: 2.2删除消息
复制代码
1.发送消息PUBLISH
void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {
// verify if topic can be write
final Topic topic = new Topic(msg.variableHeader().topicName());
topic.getTokens();
if (!topic.isValid()) {
LOG.warn("Invalid topic format, force close the connection");
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 权限判断
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
final int messageID = msg.variableHeader().messageId();
// route message to subscribers
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
// 2. 向全部该主题的订阅者发布消息(每一个session中存储即将要发送的消息)
this.publisher.publish2Subscribers(toStoreMsg, topic, messageID);
// 3. 发送Ack回应
sendPubAck(clientID, messageID);
// 4. retain = true => 存储消息
if (msg.fixedHeader().isRetain()) {
if (!msg.payload().isReadable()) {
m_messagesStore.cleanRetained(topic);
} else {
// before wasn't stored m_messagesStore.storeRetained(topic, toStoreMsg); } } m_interceptor.notifyTopicPublished(msg, clientID, username); } 复制代码
2.1发送消息回应PUBACK
服务端Server接收到PUBACK消息后将执行:
public void processPubAck(Channel channel, MqttPubAckMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = msg.variableHeader().messageId();
String username = NettyUtils.userName(channel);
LOG.trace("retrieving inflight for messageID <{}>", messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
StoredMessage inflightMsg = targetSession.inFlightAcknowledged(messageID);
String topic = inflightMsg.getTopic();
InterceptAcknowledgedMessage wrapped = new InterceptAcknowledgedMessage(inflightMsg, topic, username,
messageID);
m_interceptor.notifyMessageAcknowledged(wrapped);
}
复制代码
sequenceDiagram
ClientA->>ServerBroker: 1.发送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存储消息
ServerBroker->>ClientA: 1.2发送消息回应Rec
ClientA->>ServerBroker: 2.发送消息Rel
ServerBroker->>ServerBroker: 2.1删除消息
ServerBroker->>ServerBroker: 2.2存储消息到发送列队
ServerBroker->>ClientB: 2.3发送消息
ServerBroker->>ClientA: 2.4发送消息回应Comp
ClientB->>ServerBroker: 3.发送消息回应Rec
ServerBroker->>ServerBroker: 3.1删除2.2中存储的消息(一次确认)
ServerBroker->>ServerBroker: 3.2存储消息
ServerBroker->>ClientB: 3.3发送消息Rel
ClientB->>ServerBroker: 3.4发送消息回应Comp
ServerBroker->>ServerBroker: 3.5删除消息(二次确认)
复制代码
1.发送消息PUBLISH
void receivedPublishQos2(Channel channel, MqttPublishMessage msg) {
final Topic topic = new Topic(msg.variableHeader().topicName());
// check if the topic can be wrote
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 权限判断
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
final int messageID = msg.variableHeader().messageId();
// 2. 存储消息
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", clientID, topic, messageID);
if (LOG.isTraceEnabled()) {
LOG.trace("payload={}, subs Tree={}", payload2Str(toStoreMsg.getPayload()), subscriptions.dumpTree());
}
this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageID, toStoreMsg);
// 3. 发送Rec回应
sendPubRec(clientID, messageID);
// Next the client will send us a pub rel
// NB publish to subscribers for QoS 2 happen upon PUBREL from publisher
// if (msg.fixedHeader().isRetain()) {
// if (msg.payload().readableBytes() == 0) {
// m_messagesStore.cleanRetained(topic);
// } else {
// m_messagesStore.storeRetained(topic, toStoreMsg);
// }
// }
//TODO this should happen on PUB_REL, else we notify false positive
m_interceptor.notifyTopicPublished(msg, clientID, username);
}
复制代码
2.发送消息Rel
void processPubRel(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.info("Processing PUBREL message. CId={}, messageId={}", clientID, messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// 1. 删除消息
IMessagesStore.StoredMessage evt = targetSession.inboundInflight(messageID);
if (evt == null) {
LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", clientID, messageID);
throw new IllegalArgumentException("Can't find inbound inflight message");
}
final Topic topic = new Topic(evt.getTopic());
// 2. 转发消息
this.publisher.publish2Subscribers(evt, topic, messageID);
if (evt.isRetained()) {
if (evt.getPayload().readableBytes() == 0) {
m_messagesStore.cleanRetained(topic);
} else {
m_messagesStore.storeRetained(topic, evt);
}
}
//TODO here we should notify to the listeners
//m_interceptor.notifyTopicPublished(msg, clientID, username);
// 3.发送Comp 回应
sendPubComp(clientID, messageID);
}
复制代码
3.发送消息回应Rec
public void processPubRec(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// remove from the inflight and move to the QoS2 second phase queue
// 1. 删除消息
StoredMessage ackedMsg = targetSession.inFlightAcknowledged(messageID);
// 2. 存储消息(分别存储在secondPhaseStore和outboundInflightMap)
targetSession.moveInFlightToSecondPhaseAckWaiting(messageID, ackedMsg);
// once received a PUBREC reply with a PUBREL(messageID)
LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
// 3. 发送PUBREL
MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
MqttMessage pubRelMessage = new MqttMessage(pubRelHeader, from(messageID));
channel.writeAndFlush(pubRelMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
}
复制代码
3.4发送消息回应Comp
public void processPubComp(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", clientID, messageID);
// once received the PUBCOMP then remove the message from the temp memory
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// 1. 删除消息
StoredMessage inflightMsg = targetSession.completeReleasedPublish(messageID);
String username = NettyUtils.userName(channel);
String topic = inflightMsg.getTopic();
final InterceptAcknowledgedMessage interceptAckMsg = new InterceptAcknowledgedMessage(inflightMsg, topic,
username, messageID);
m_interceptor.notifyMessageAcknowledged(interceptAckMsg);
}
复制代码
Topic 话题 和 TopicFilter 话题过滤器
Pub-Sub消息模型的核心机制 UTF-8 编码字符串,不能超过 65535 字节。层级数量没有限制 不能包含任何的下文中提到的特殊符号(/、+、#),必须至少包含一个字符
区分大小写,能够包含空格,不能包含空字符 (Unicode U+0000)
在收部或尾部增长 斜杠 “/”,会产生不一样的Topic和TopicFilter。举例:
只包含斜杠 “/” 的 Topic 或 TopicFilter 是合法的
TopicFilter中的特殊符号
层级分隔符 / 用于分割主题的每一个层级,为主题名提供一个分层结构
主题层级分隔符能够出如今 Topic 或 TopicFilter 的任何位置
特例:相邻的主题层次分隔符表示一个零长度的主题层级
单层通配符 +
只能用于单个主题层级匹配的通配符。例如,“a/b/+” 匹配 “a/b/c1” 和 “a/b/c2” ,可是不匹配 “a/b/c/d”
能够匹配 任意层级,包括第一个和最后一个层级。
例如,“+” 是有效的,“sport/+/player1” 也是有效的。 能够在多个层级中使用它,也能够和多层通配符一块儿使用。
例如,“+/tennis/#” 是有效的。只能匹配本级不能匹配上级。
例如,“sport/+” 不匹配 “sport” 可是却匹配“sport/”,“/finance” 匹配 “+/+” 和 “/+” ,可是不匹配 “+”。
多层通配符 #
用于匹配主题中任意层级的通配符 匹配包含自己的层级和子层级。
例如 “a/b/c/#" 能够匹配 “a/b/c”、“a/b/c/d” 和 “a/b/c/d/e” 必须是最后的结尾。
例如 “sport/tennis/#/ranking”是无效的
“#”是有效的,会收到全部的应用消息。 (服务器端应将此类 TopicFilter禁掉 )
以$开头的,服务器保留
服务端不能将 $ 字符开头的 Topic 匹配通配符 (#或+) 开头的 TopicFilter
服务端应该阻止客户端使用这种 Topic 与其它客户端交换消息。
服务端实现能够将 $ 开头的主题名用做其余目的。
开头的 Topic,就不会收到对应的消息
若是客户端想同时接受以 “ 开头主题的消息,它须要同时 订阅 “#” 和 “$SYS/#”
这4个主题会存储成以下结构:
@Override
public void add(Subscription newSubscription) {
Action res;
do {
res = insert(newSubscription.clientId, newSubscription.topicFilter, this.root, newSubscription.topicFilter);
} while (res == Action.REPEAT);
}
private Action insert(String clientId, Topic topic, final INode inode, Topic fullpath) {
Token token = topic.headToken();
if (!topic.isEmpty() && inode.mainNode().anyChildrenMatch(token)) {
Topic remainingTopic = topic.exceptHeadToken();
INode nextInode = inode.mainNode().childOf(token);
return insert(clientId, remainingTopic, nextInode, fullpath);
} else {
if (topic.isEmpty()) {
return insertSubscription(clientId, fullpath, inode);
} else {
return createNodeAndInsertSubscription(clientId, topic, inode, fullpath);
}
}
}
复制代码
public void removeSubscription(Topic topic, String clientID) {
Action res;
do {
res = remove(clientID, topic, this.root, NO_PARENT);
} while (res == Action.REPEAT);
}
private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
Token token = topic.headToken();
if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) {
Topic remainingTopic = topic.exceptHeadToken();
INode nextInode = inode.mainNode().childOf(token);
return remove(clientId, remainingTopic, nextInode, inode);
} else {
final CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
return Action.OK;
}
if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
// last client to leave this node, AND there are no downstream children, remove via TNode tomb
if (inode == this.root) {
return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
}
TNode tnode = new TNode();
return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
} else if (cnode.contains(clientId) && topic.isEmpty()) {
CNode updatedCnode = cnode.copy();
updatedCnode.removeSubscriptionsFor(clientId);
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
} else {
//someone else already removed
return Action.OK;
}
}
}
复制代码
Set<Subscription> recursiveMatch(Topic topic, INode inode) {
CNode cnode = inode.mainNode();
if (Token.MULTI.equals(cnode.token)) {
return cnode.subscriptions;
}
if (topic.isEmpty()) {
return Collections.emptySet();
}
if (cnode instanceof TNode) {
return Collections.emptySet();
}
final Token token = topic.headToken();
if (!(Token.SINGLE.equals(cnode.token) || cnode.token.equals(token) || ROOT.equals(cnode.token))) {
return Collections.emptySet();
}
Topic remainingTopic = (ROOT.equals(cnode.token)) ? topic : topic.exceptHeadToken();
Set<Subscription> subscriptions = new HashSet<>();
if (remainingTopic.isEmpty()) {
subscriptions.addAll(cnode.subscriptions);
}
for (INode subInode : cnode.allChildren()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode));
}
return subscriptions;
}
复制代码
相关参考