物联网时代-跟着Thingsboard学IOT架构-MQTT设备协议

演示视频请移步: james-1258744956.cos.ap-shanghai.myqcloud.com/thingsboard…html


Thingsboard的MQTT设备协议

thingsboard官网: thingsboard.io/java

thingsboard GitHub: github.com/thingsboard…git

thingsboard提供的体验地址: demo.thingsboard.io/github

BY Thingsboard teamnpm

如下内容是在原文基础上演绎的译文。除非另行注明,页面上全部内容采用知识共享-署名(CC BY 2.5 AU)协议共享。json

原文地址: ThingsBoard API参考:MQTT设备APIapi


MQTT基础知识

MQTT是一种轻量级的发布 - 订阅消息传递协议,可能使其最适合各类物联网设备。您能够在此处找到有关MQTT的更多信息。安全

ThingsBoard服务器节点充当MQTT Broker,支持QoS级别0(最多一次)和1(至少一次)以及一组预约义主题。bash


客户端库设置

您能够在Web上找到大量MQTT客户端库。本文中的示例将基于Mosquitto,MQTT.jsPaho,要设置其中一个工具。服务器

键值格式

默认状况下,ThingsBoard支持JSON中的键值内容。Key始终是一个字符串,而value能够是string,boolean,double或long。也可使用自定义二进制格式或某些序列化框架。有关详细信息,请参阅物模型。例如:

{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}
复制代码

遥测上传API

为了将遥测数据发布到ThingsBoard服务器节点,请将PUBLISH消息发送到如下主题:

v1/devices/me/telemetry
复制代码

最简单的支持数据格式是:

{"key1":"value1", "key2":"value2"}
复制代码

要么

[{"key1":"value1"}, {"key2":"value2"}]
复制代码

请注意,在这种状况下,服务器端时间戳将分配给上传的数据!

若是您的设备可以获取客户端时间戳,您可使用如下格式:

{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}
复制代码

在上面的示例中,咱们假设“1451649600512”是具备毫秒精度的unix时间戳。例如,值'1451649600512'对应于'Fri,2016年1月1日12:00:00.512 GMT'

属性API

ThingsBoard属性API容许设备

  • 客户端设备属性上载到服务器。
将属性更新发布到服务器

要将客户端设备属性发布到ThingsBoard服务器节点,请将PUBLISH消息发送到如下主题:

v1/devices/me/attributes
复制代码

更多请看上文给出的链接。


Thingsboard的MQTT传输协议架构

由于Thingsboard最新release,是基于微服务架构,不利用单独理解代码。

Thingsboard源代码: github.com/thingsboard…

本文基于上面源代码后,剔除相关的安全验证和处理以后搭建简易的讲解项目:

github.com/sanshengshu…


MQTT框架

由于Thingsboard是一个JVM技术栈的PaaS平台,因此使用的是基于Java通信框架的Netty,若是有对Netty不太熟悉的同窗,能够参考我以前搭建的Netty实践学习案例: github.com/sanshengshu…

项目结构

.
├── IOT-Guide-MQTT.iml
├── pom.xml
└── src
    └── main
        └── java
            └── com
                └── sanshengshui
                    └── mqtt
                        ├── adapter
                        │   └── JsonMqttAdaptor.java // MQTT json转换器,在跟Thingsboard学习IOT-物模型有所讲解
                        ├── IOTMqttServer.java // MQTT服务
                        ├── MqttTopicMatcher.java
                        ├── MqttTopics.java 
                        ├── MqttTransportHandler.java //MQTT处理类
                        └── MqttTransportServerInitializer.java

复制代码

项目代码讲解

IOTMqttServer

private static final int PORT = 1884;
    private static final String leakDetectorLevel = "DISABLED";
    private static final Integer bossGroupThreadCount = 1;
    private static final Integer workerGroupThreadCount = 12;
    private static final Integer maxPayloadSize = 65536;

    public static void main(String[] args) throws Exception {
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));

        EventLoopGroup bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
        EventLoopGroup workerGroup = new NioEventLoopGroup(workerGroupThreadCount);

        try {

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new MqttTransportServerInitializer(maxPayloadSize));
            ChannelFuture f = b.bind(PORT);
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
复制代码

第8行,设置服务端Netty内存读写泄漏级别,缺省条件下为:DISABLED

第10行和第11行,设置boss线程组和work线程组的线程数量。默认状况下,boss线程组的线程数量为1,work线程组的数量为运行服务机器内核数量的2倍。

第15行,经过建立ServerBootstrap对象,在第16行设置使用EventLoopGroup

在17和19行,设置要被实例化的NioServerSockerChannel类,并设置最大的负载内容数量。

最后咱们经过shutdowGracefully()函数优雅的关闭bossGroup和workGroup。


MqttTransportHandler#processMqttMsg()

private void processMqttMsg(ChannelHandlerContext ctx, MqttMessage msg) {
        address = (InetSocketAddress) ctx.channel().remoteAddress();
        if (msg.fixedHeader() == null) {
            processDisconnect(ctx);
            return;
        }

        switch (msg.fixedHeader().messageType()) {
            case CONNECT:
                processConnect(ctx, (MqttConnectMessage) msg);
                break;
            case PUBLISH:
                processPublish(ctx, (MqttPublishMessage) msg);
                break;
            case SUBSCRIBE:
                processSubscribe(ctx, (MqttSubscribeMessage) msg);
                break;
            case UNSUBSCRIBE:
                processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
                break;
            case PINGREQ:
                if (checkConnected(ctx)) {
                    ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP,false,AT_MOST_ONCE, false, 0)));
                }
                break;
            case DISCONNECT:
                if (checkConnected(ctx)) {
                    processDisconnect(ctx);
                }
                break;
            default:
                break;

        }
    }

复制代码

第3行,经过判断消息的固定头部是否为空,若是空;则经过processDisconnect(ctx)将设备链接关闭。

processDisconnect(channelHandlerContext ctx)

private void processDisconnect(ChannelHandlerContext ctx) {
        ctx.close();  // 关闭socket通道
    }

复制代码

第8行,经过判断固定头部的MQTT消息类型,针对不一样消息作相应的处理。


MqttTransportHandler#PublishDevicePublish

如下是对发布消息进行相关的解读,更多消息类型的处理类,你们请参考我上面的IOT-Guide-MQTT进行阅读。

private void processDevicePublish(ChannelHandlerContext ctx, MqttPublishMessage mqttMsg, String topicName, int msgId) {
        try {
            if (topicName.equals(MqttTopics.DEVICE_TELEMETRY_TOPIC)) { //若是主题为v1/devices/me/attributes
                JsonMqttAdaptor.convertToMsg(POST_TELEMETRY_REQUEST, mqttMsg);
            } else if(topicName.equals(DEVICE_ATTRIBUTES_TOPIC)) {
                JsonMqttAdaptor.convertToMsg(POST_ATTRIBUTES_REQUEST, mqttMsg);
            } else if(topicName.equals(MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX)) {
                JsonMqttAdaptor.convertToMsg(GET_ATTRIBUTES_REQUEST, mqttMsg);
            }
        } catch (AdaptorException e) {

        }

    }

复制代码

我上面的代码仅是对消息的主题进行判断,而后对主题内的内容进行物模型的解析,获得相关属性或者遥测数据的得到。


演示效果

咱们经过Paho或者MQTT.js和服务进行链接,发布消息到如下主题:

v1/devices/me/telemetry

复制代码

简易的数据格式以下:

{"key1":"value1", "key2":"value2"}
复制代码

Paho图示:

服务器控制台打印数据:

七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xf2bfb3a8] REGISTERED
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xf2bfb3a8] BIND: 0.0.0.0/0.0.0.0:1884
七月 24, 2019 1:37:18 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] ACTIVE
七月 24, 2019 1:37:22 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xf2bfb3a8, L:/0:0:0:0:0:0:0:0:1884] RECEIVED: [id: 0xe08abd12, L:/127.0.0.1:1884 - R:/127.0.0.1:48816]
key= 1563946708305
属性名=temperature 属性值=38
属性名=humidity 属性值=60
复制代码

如上所示,但愿你们对Thingsboard的IOT架构-MQTT设备协议这块有所了解!

相关文章
相关标签/搜索