咱们已经把相关的链接报文搞定了。笔者想来想去仍是决定先讲解一下订阅报文(SUBSCRIBE )。若是传统的通讯方式是客户端和服务端之间通常就直接传输信息。可是MQTT的通讯方式是经过发布/订阅的方式进行的。笔者不知道他是否跟设计模式中的发布订阅模式有没有关系。但是他们思想却有一点类似之处。redis
客户端知道服务上有不少个主题。就比如如说有不少消息的分类同样子。有社会新闻、体育讲坛等。那么客户端只要找到本身感兴趣的进行订阅就能够了。一个客户端能够向服务器订阅多个主题。而所谓的发布就是客户端对不一样的主题进行发布信息。即比如如新闻的发布者同样子。这个时候只要订阅这个主题的客户端就能够接收到来自服务端的新闻。咱们的手机经常会接收到一些推送的信息。事实上有不少App应用都是用MQTT协议来进行的。因此不难看出服务端主要是负责客户端和客户端的之间信息的传输和信息管理。大至如图下sql
注意:发布者也是客户端。订阅者也是客户端数据库
主题(Topic )设计模式
若是主题只是一个字符串值的话,那么显然会比较单调。这样子功能也显得比较无力。因此在主题上面就了所谓的分隔符和通配符的说法(我的想法)。分隔符的意思就是让主题能够分层次。就好如说主题“体育讲坛/篮球/NBA”。看到这样子的主题,请问一下你还有什么不明白的话。是否是感受颇有层次感。剩下只有一个问题?若是咱们订阅了主题“体育讲坛/篮球/NBA”,并向主题“体育讲坛/篮球”发布一个信息。那么已经订阅主题“体育讲坛/篮球/NBA”的客户端们是否是能够接受到信息呢?反过来说若是咱们订阅了主题“体育讲坛/篮球”,向主题“体育讲坛/篮球/NBA”发信息,客户端们是否又能接受信息呢?服务器
笔者就以HiveMQ做服务器来作一下上面的小实验。以下学习
客验结果显然是失败的——订阅主题“体育讲坛/篮球/NBA”的客户端根本收不到来自主题“体育讲坛/篮球“的发布信息。说明分隔符就是用于主题名的分层次。没有别的意思。this
经过上面的实验咱们知道若是想要收到NBA就是必须订阅主题“体育讲坛/篮球/NBA”。但是老是有一些人只要是篮球的新闻有喜欢。怎么办。通配符的功能就出来了。通配符有俩种——"+"和“#”。+为单层的通配符。表示当前这一层的全都合非。这样子以上面的说到的例子来作实验。咱们订阅一个主题为“体育讲坛/篮球/+”。按照理解的意思就是只要是在“体育讲坛/篮球”的信息都是咱们想要的。结果以下spa
咱们能够看到笔者在“体育讲坛/篮球/NBA”和“体育讲坛/篮球/ABC”各发布了信息。结果他都能收到。那么若是咱们对主题“体育讲坛/篮球”或是主题“体育讲坛/篮球/NBA/福州专场”发布信息呢?笔者试过了很惋惜都不行。设计
记得咱们上面说到有一些人只要跟篮球相关的都喜欢。但是若是使用通配符“+”是能够接近咱们的要求。注意是接近。“+”通配符只是表示当前一层的。从当前的第二层就不行了。而自己的层也不算。就像上面的。只有篮球下的子一层才是合非的。讲到这里你们必定会想到用“#“通配符试试。没有错。“#“通配符就是表示当前自己和下面子层全部。以下3d
实验的结果很终知足了。
对于主题,在文档中有一个要求——主题不能以 ”#“ "+" "$" 为开头。对于”#“ ” +“的话,你们都好理解。那么”$“又是什么鬼。在文档咱们能够看到这样子的字符"$SYS"。事实上他们是想说”$“开头的主题通常用于系统内部的一些主题。大家能够去找一些第三方的MQTT服务器。都会有不少以”$“开头的主题。
SUBSCRIBE报文
经过上面的介绍。笔者想大家必定对MQTT通讯方式有了必定的概念。而本章的订阅报文就是用于告诉服务器我想要什么的主题了。经过前面几章的了解。咱们知道报文的固定报头是少了的。笔者就以MQTT 3.1.1来介绍吧。以下
SUBSCRIBE报文的INT值是8。因此对应的二进制为1000。后面的DUP QOS RETAIN对应是0010。其中QOS是必须是01。对订阅者来说,他必定但愿本身的订阅是成功的。因此订阅报文的QOS是01就至关好理解了。若是不理解QOS是什么的话,请看一下前面几章。
订阅报文也有可变报头,可变报头只有一个消息ID。消息ID是从客端端开始分配的。笔者为何样子认为呢?主要是看到客户端在发布信息的时候就要求消息ID。因此笔者才会以为消息ID在客户端进行分配的。固然也不是什么报文都会消息ID的。可是有消息ID通常QOS大于0。
订阅报文的有效载荷里面存在了相关的订阅订题列表。前面说过能够支持一个客户端多个订阅。列表里面每有一主题项只有俩个值。一个表示主题名,一个表示服务质量要求(Requested QoS)。这里的服务质量要求(Requested QoS)和 固定报头的服务质量的值是同样子。可是用意倒是不同子。这里是指这个订阅者接收这主题的服务质量最大等级。举个列子吧。笔者订阅了一个主题主题“体育讲坛/篮球/NBA”,同时他的服务质量要求(Requested QoS)的值为1。这个时候有一个发布者在这个主题上发布一个服务质QOS为2。笔者仍是能够收到这个发布者发来的信息。只是信息的服务质量QOS却变为1了。要明白QOS(1)和QOS(2)的执行行为是不样子的。这个后面章节会讲到。固然若是发布者在这个主题上发布一个服务质QOS为0。这就没有什么区别了。以下
对于有效载荷笔者这里就很少讲解了。也没有什么可说的。看文档的图片就够了以下。
宏观上:
微观上:
列表出咱们能够看他订阅了俩个主题。一个主题”a/b“,一个主题”c/b“。上面列出大概的图片(宏观上)和比较细的图片(微观上)。若是看不懂也没有关系。笔者接下来会用代码来抓一包看看。相信在对照一下就明白列表出画的是什么。
如今让咱们好好想一想当服务器接收到来自客户端的订阅报文的时候要作些什么样子的反应呢?首先咱们要明白若是服务端接收到一个订阅报文,第一步想到必定是查看订阅报文的格式是否是正确的。相关的主题名是否是为空的。主题名的写法是否是非法。这些必定离不开。固然对应的一些共有的验证笔者就不说了。一切没有问题的状况下,服务器会去看一下当前订阅者前面有没有订阅过相同的主题。若是有就替换当前的。若是没有就建立一下新的。而后服务器在根据当前主题查找一下符合保留的信息。若是有,就发送给当前的订阅者。而后发送一个订阅报文肯定(SUBACK )。固然这先后没有规定。先发送一个订阅报文肯定(SUBACK ),在处理保留的信息也是能够的。
注意:在发送符合保留的信息就要对QOS进行处理。上面笔者也讲过了。
SUBACK 报文
当服务端处理SUBSCRIBE报文的时候,都会生成一个SUBACK 报文来回应订阅者。笔者这里不想对他太过的讲解。他的内容也很简单。以下
对于SUBACK 报文的可变报头里面也只有一个消息ID。并且跟SUBSCRIBE报文的消息ID是同样子的。有效载何的内容存放是订阅主题的服务质量要求(Requested QoS)。笔者在MQTT 3.1 文档时面能够看到有多个主题的列子。但是在MQTT 3.1.1里面却没有。那么笔者就把MQTT 3.1.1的放在下里吧。读者们能够自行查看。
上面列表里面显示返回码,事实上是主题相关的服务质量要求(Requested QoS)。因此就能够知道他能够会返回四个值。以下
QOS 0:0x00
QOS 1:0x01
QOS2 :0x02
Failure :0x80
代码实现
有了上面的了解以后,笔者就想在经过一些代码来加深理解。固然从新写那是不可能的。笔者就用上一章的代码。并加上订阅报文相关的处理。以下
1 private void onSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) { 2 3 if (!this.connected) { 4 ctx.close(); 5 return; 6 } 7 int messageId = msg.variableHeader().messageId(); 8 9 List<MqttTopicSubscription> requestSubscriptions = msg.payload().topicSubscriptions(); 10 11 for (MqttTopicSubscription subscription : requestSubscriptions) { 12 13 if (StringUtils.isEmpty(subscription.topicName())) { 14 ctx.close(); 15 return; 16 } 17 } 18 19 List<Integer> grantedQosLevels = new ArrayList<Integer>(); 20 21 requestSubscriptions.forEach(subscription -> { 22 if (subscription.topicName().startsWith("$")) grantedQosLevels.add(MqttQoS.FAILURE.value()); 23 else grantedQosLevels.add(subscription.qualityOfService().value()); 24 }); 25 26 27 BrokerSessionHelper.sendMessage( 28 ctx, 29 MqttMessageFactory.newMessage( 30 new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 31 MqttMessageIdVariableHeader.from(messageId), 32 new MqttSubAckPayload(grantedQosLevels)), 33 this.clientId, 34 messageId, 35 true); 36 37 for (int i = 0; i < requestSubscriptions.size(); i++) { 38 39 MqttQoS grantedQoS = MqttQoS.valueOf(grantedQosLevels.get(i)); 40 String topic = requestSubscriptions.get(i).topicName(); 41 42 //1。查看之前有没有订阅过相同的主题,若是有就替换。 43 //2。查看有没有符合的保留信息,有发送 44 //读者们自行去实现。是要用redis,仍是要用sqllite自去实现。 45 46 } 47 }
订阅报文的实现并不难。难就在对于对保留信息的处理。还有就是服务端要对当前的客户端的订阅进行保留。那么笔者这边作的事情比较简单。主要是为了学习查看相关的报文格式。可是笔者仍是要列出来一下。以下
1.判断是否发生过链接。便是链接报文的处理。若是没有的话,断开链接。
if (!this.connected) { ctx.close(); return; }
2.得到报文的消息ID和相关的订阅主题。判断主题不为空。固然你也可自定义主题的验证合法规则。笔者这里就很少说了。
int messageId = msg.variableHeader().messageId(); List<MqttTopicSubscription> requestSubscriptions = msg.payload().topicSubscriptions(); for (MqttTopicSubscription subscription : requestSubscriptions) { if (StringUtils.isEmpty(subscription.topicName())) { ctx.close(); return; } }
3.得到相关主题的服务质量要求,用于返回码和处理保留的消息。并返回SUBACK报文
1 List<Integer> grantedQosLevels = new ArrayList<Integer>(); 2 3 requestSubscriptions.forEach(subscription -> { 4 if (subscription.topicName().startsWith("$")) grantedQosLevels.add(MqttQoS.FAILURE.value()); 5 else grantedQosLevels.add(subscription.qualityOfService().value()); 6 }); 7 8 9 BrokerSessionHelper.sendMessage( 10 ctx, 11 MqttMessageFactory.newMessage( 12 new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 13 MqttMessageIdVariableHeader.from(messageId), 14 new MqttSubAckPayload(grantedQosLevels)), 15 this.clientId, 16 messageId, 17 true);
4.处理保留的信息。这里笔者并没实现。由于这里要接合相关的数据库或是NOSQL。因此这里笔者没有去作。因这里太多的东西的。并且不一样的人实现和想法也不同子。因此笔者就没有列出来。
for (int i = 0; i < requestSubscriptions.size(); i++) { MqttQoS grantedQoS = MqttQoS.valueOf(grantedQosLevels.get(i)); String topic = requestSubscriptions.get(i).topicName(); //1。查看之前有没有订阅过相同的主题,若是有就替换。 //2。查看有没有符合的保留信息,有发送 //读者们自行去实现。是要用redis,仍是要用sqllite自去实现。 }
笔者把相关的抓到的报文格列出来。以下
SUBSCRIBE报文:
笔者已经把SUBSCRIBE报文的各个部分用不一样的颜色标出耿了。其中的黄色线表示下同主题的长度。就是上面微观图片里面的MSB和LSB。其余的也没有什么。 只是要注意最后一个值也就是服务质量要求(Requested QoS)。笔者这边是1。因此最后的二进制是00000001。
SUBACK 报文:
咱们能够看到SUBACK 报文的消息ID和SUBSCRIBE报文的消息是同样子的。还有就是记得最后的服务质量要求。