物联网时代-跟着Thingsboard学IOT架构-HTTP设备协议及API相关限制

thingsboard官网: thingsboard.io/html

thingsboard GitHub: github.com/thingsboard…java

thingsboard提供的体验地址: demo.thingsboard.io/git

BY Thingsboard teamgithub

如下内容是在原文基础上演绎的译文。除非另行注明,页面上全部内容采用知识共享-署名(CC BY 2.5 AU)协议共享。web

原文地址: ThingsBoard API参考:HTTP设备APIspring


HTTP

协议介绍

HTTP是可用于IoT应用程序的通用网络协议。您能够在此处找到有关HTTP的更多信息。HTTP协议是基于TCP的,并使用请求 - 响应模型。固然它的缺点也极为明显,HTTP对于嵌入式设备来讲过重了,也不灵活。apache

协议特色

http

  1. 支持客户/服务器模式。
  2. 简单快速: 客户向服务器请求服务时,只需传送请求方法和路径。请求方法经常使用的有GET、PUT、POST。每种方法规定了客户与服务器联系的类型不一样。因为HTTP协议简单,使得HTTP服务器的程序规模小,所以通讯速度很快。
  3. 灵活: HTTP容许传输任意类型的数据对象。正在传输的类型由Content-Type加以标记。
  4. 无链接:无链接的含义是限制每次链接只处理一个请求。服务器处理完客户的请求,并收到客户的应答后,即断开链接。采用这种方式能够节省传输时间。
  5. 无状态:HTTP协议是无状态协议。无状态是指协议对于事务处理没有记忆能力。缺乏状态意味着若是后续处理须要前面的信息,则它必须重传,这样可能致使每次链接传送的数据量增大。另外一方面,在服务器不须要先前信息时它的应答就较快。

客户端设置

Thingsboard的HTTP传输协议架构

由于Thingsboard最新release,是基于微服务架构,不利用单独理解代码。json

**Thingsboard CoAP设备传输协议源代码:**github.com/thingsboard…api

本文基于上面源代码后,剔除相关的安全验证和处理以后搭建简易的讲解项目:缓存

github.com/sanshengshu…


Spring Boot框架

Thingsboard的HTTP设备传输协议是基于Spring Boot

Spring Boot 是 Spring 的子项目,正如其名字,提供 Spring 的引导( Boot )的功能。

经过 Spring Boot ,咱们开发者能够快速配置 Spring 项目,引入各类 Spring MVC、Spring Transaction、Spring AOP、MyBatis 等等框架,而无需不断重复编写繁重的 Spring 配置,下降了 Spring 的使用成本。

犹记当年,Spring XML 为主的时代,大晚上各类搜索 Spring 的配置,苦不堪言。如今有了 Spring Boot 以后,生活真美好。

Spring Boot 提供了各类 Starter 启动器,提供标准化的默认配置。例如:

而且,Spring Boot 基本已经一统 Java 项目的开发,大量的开源项目都实现了其的 Starter 启动器。例如:

项目解读

项目结构

├── java
│   └── com
│       └── sanshengshui
│           └── http
│               ├── controller
│               │   └── DeviceApiController.java	// 设备传输API接口
│               ├── HttpApiServer.java	//项目启动主类
│               └── quota	//API限制类包
│                   ├── AbstractQuotaService.java	//抽象限制服务类
│                   ├── Clock.java		//时钟类
│                   ├── host
│                   │   ├── HostIntervalRegistryCleaner.java	//主机API清理器
│                   │   ├── HostIntervalRegistryLogger.java	 //主机API记录器
│                   │   ├── HostRequestIntervalRegistry.java	//主机API请求注册表
│                   │   ├── HostRequestLimitPolicy.java	 //主机API请求限制条件
│                   │   └── HostRequestsQuotaService.java	 //主机请求限制开关
│                   ├── inmemory
│                   │   ├── IntervalCount.java	 //间歇计数
│                   │   ├── IntervalRegistryCleaner.java	//时间间隔内注册表清理器
│                   │   ├── IntervalRegistryLogger.java 	//时间间隔内注册表记录器
│                   │   └── KeyBasedIntervalRegistry.java	 //基础API请求逻辑
│                   ├── QuotaService.java	//限制服务类
│                   └── RequestLimitPolicy.java //请求限制策略
└── resources
    └── application.yml

复制代码

项目代码

引入依赖

<dependencies>
        <dependency>
            <groupId>com.sanshengshui</groupId>
            <artifactId>IOT-Guide-TSL</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
        </dependency>
</dependencies>
复制代码
  • Spring Boot提供的web框架基于Tomcat,能够经过引入spring-boot-starter-web来配置依赖关系。
  • commons-lang3guava用于API请求限制服务。

参数配置

server:
 port: 8080


http:
 request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}"


quota:
 host:
 limit: "${QUOTA_HOST_LIMIT:10}"
 intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
 ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
 cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
 enabled: "${QUOTA_HOST_ENABLED:true}"
 whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
 blacklist: "${QUOTA_HOST_BLACKLIST:}"
 log:
 topSize: 10
 intervalMin: 2
复制代码
  • server.port: 8080: 服务器启动绑定的端口,缺省状况下是:8080。
  • http.request_timeout : 请求超时时间,此处设定为60000。
  • quota.host.limitquota.host.intervalMs: 分别为API请求限额数和单位时间。此处为了验证方便,设定为10次和60s,即60s内API请求限额数为10次。
  • quota.host.cleanPeriodMsquota.host.ttlMs : 分别为清理周期时间和TTL时间。
  • quota.host.enabledquota.host.whitelistquota.host.blacklist分别表示API请求开关、白名单及黑名单。
  • quota.host.log.topSizequota.host.log.intervalMin: 指的是高速缓存中的(近似)最大条目数和间隔时间。

API限制服务类

KeyBasedIntervalRegistry:基础API请求逻辑

package com.sanshengshui.http.quota.inmemory;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/** * @author james mu * @date 2019/8/10 下午4:50 */
@Slf4j
public class KeyBasedIntervalRegistry {

    private final Map<String, IntervalCount> hostCounts = new ConcurrentHashMap<>();
    private final long intervalDurationMs;
    private final long ttlMs;
    private final Set<String> whiteList;
    private final Set<String> blackList;

    public KeyBasedIntervalRegistry(long intervalDurationMs, long ttlMs, String whiteList, String blackList, String name) {
        this.intervalDurationMs = intervalDurationMs;
        this.ttlMs = ttlMs;
        this.whiteList = Sets.newHashSet(StringUtils.split(whiteList, ','));
        this.blackList = Sets.newHashSet(StringUtils.split(blackList, ','));

    }

    private void validate(String name) {
        if (ttlMs < intervalDurationMs) {
            log.warn("TTL for {} IntervalRegistry [{}] smaller than interval duration [{}]", name, ttlMs, intervalDurationMs);
        }
        log.info("Start {} KeyBasedIntervalRegistry with whitelist {}", name, whiteList);
        log.info("Start {} KeyBasedIntervalRegistry with blacklist {}", name, blackList);
    }

    public long tick(String clientHostId) {
        IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
        long currentCount = intervalCount.resetIfExpiredAndTick();
        if (whiteList.contains(clientHostId)) {
            return 0;
        } else if (blackList.contains(clientHostId)) {
            return Long.MAX_VALUE;
        }
        return currentCount;
    }

    public void clean() {
        hostCounts.entrySet().removeIf(entry -> entry.getValue().silenceDuration() > ttlMs);
    }

    public Map<String, Long> getContent() {
        return hostCounts.entrySet().stream()
                .collect(
                        Collectors.toMap(
                                Map.Entry:: getKey,
                                interval -> interval.getValue().getCount()
                        )
                );
    }
}

复制代码
  • validate(string name): 要求ttlMs<intervalDurationMs,并打印出API请求的黑名单和白名单。

  • 第42行经过computeIfAbsent函数对map中不存在key时的处理,在这里经过新建intervalCount(intervalDurationMs)的方式来处理。

  • 第43行经过intervalCount的resetIfExpiredAndTick()对时间间隔内进行计数。

  • 第44-48行经过判断API请求客户端地址是否在黑白名单中,若是在白名单,返回0,若是在黑名单中,返回Long.MAX_VALUE

  • clean()为经过时间间隔内是否大于ttlMs来过滤集合中的元素。

  • getContent()为遍历hostCounts中的客户端地址的IntervalCount。

IntervalCount: 间歇时间内计数

package com.sanshengshui.http.quota.inmemory;

import com.sanshengshui.http.quota.Clock;

import java.util.concurrent.atomic.LongAdder;

/** * @author james mu * @date 19-8-9 下午16:50 */
public class IntervalCount {

    private final LongAdder addr = new LongAdder();
    private final long intervalDurationMs;
    private volatile long startTime;
    private volatile long lastTickTime;

    public IntervalCount(long intervalDurationMs) {
        this.intervalDurationMs = intervalDurationMs;
        startTime = Clock.millis();
    }

    //计数或时间过时后重置时间
    public long resetIfExpiredAndTick(){
        if (isExpired()){
            reset();
        }
        tick();
        return addr.sum();
    }

   //计算已过期间
    public long silenceDuration() {
        return Clock.millis() - lastTickTime;
    }

    public long getCount() {
        return addr.sum();
    }

   //计数操做,累加一
    private void tick() {
        addr.add(1);
        lastTickTime = Clock.millis();
    }

   //重置计数时间
    private void reset() {
        addr.reset();
        lastTickTime = Clock.millis();
    }

//判断间隔时间是否失效
    private boolean isExpired() {
        return (Clock.millis() - startTime) > intervalDurationMs;
    }

}

复制代码

剩下的处理类,留给读者去本身研究了!

  1. 主机API清理器: HostIntervalRegistryCleaner注入quota.host.cleanPeriodMs并继承抽象类IntervalRegistryCleaner
  2. 主机API记录器: HostIntervalRegistryLogger注入quota.host.log.topSizequota.host.log.intervalMin并继承IntervalRegistryLogger
  3. 主机API请求注册表: HostRequestIntervalRegistry注入quota.host.intervalMsquota.host.ttlMsquota.host.whitelistquota.host.blacklist并继承KeyBasedIntervalRegistry
  4. 主机API请求限制条件: HostRequestLimitPolicy注入quota.host.limit并继承RequestLimitPolicy
  5. 主机请求限制开关: HostRequestsQuotaService注入quota.host.enabled并继承AbstractQuotaService

属性API和遥测数据上传API

@RestController
@RequestMapping("/api/v1")
@Slf4j
public class DeviceApiController {
    
    @Autowired(required = false)
    private HostRequestsQuotaService quotaService;//API限制服务类
    
    @RequestMapping(value = "/attributes",method = RequestMethod.POST)
    public DeferredResult<ResponseEntity> postDeviceAttributes( @RequestBody String json, HttpServletRequest request) {
        DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
        if (quotaExceeded(request, responseWriter)) {
            return responseWriter;
        }
        responseWriter.setResult(new ResponseEntity<>(HttpStatus.ACCEPTED));
        Set<AttributeKvEntry> attributeKvEntrySet = JsonConverter.convertToAttributes(new JsonParser().parse(json)).getAttributes();
        for (AttributeKvEntry attributeKvEntry : attributeKvEntrySet){
            System.out.println("属性名="+attributeKvEntry.getKey()+" 属性值="+attributeKvEntry.getValueAsString());
        }
        return responseWriter;
    }
    
    @RequestMapping(value = "/telemetry",method = RequestMethod.POST)
    public DeferredResult<ResponseEntity> postTelemetry(@RequestBody String json, HttpServletRequest request){
        DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
        if (quotaExceeded(request, responseWriter)) {
            return responseWriter;
        }
        responseWriter.setResult(new ResponseEntity(HttpStatus.ACCEPTED));
        Map<Long, List<KvEntry>> telemetryMaps = JsonConverter.convertToTelemetry(new JsonParser().parse(json)).getData();
        for (Map.Entry<Long,List<KvEntry>> entry : telemetryMaps.entrySet()) {
            System.out.println("key= " + entry.getKey());
            for (KvEntry kvEntry: entry.getValue()) {
                System.out.println("属性名="+kvEntry.getKey()+ " 属性值="+kvEntry.getValueAsString());
            }
        }
        return responseWriter;
    }
}
复制代码

项目演示

遥测上传API

要将遥测数据发布到服务器节点,请将POST请求发送到如下URL:

http://localhost:8080/api/v1/telemetry
复制代码

最简单的支持数据格式是:

{"key1":"value1", "key2":"value2"}
复制代码

要么

[{"key1":"value1"}, {"key2":"value2"}]
复制代码

请注意,在这种状况下,服务器端时间戳将分配给上传的数据!

若是您的设备可以获取客户端时间戳,您可使用如下格式:

{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}
复制代码

在上面的示例中,咱们假设“1451649600512”是具备毫秒精度的unix时间戳。例如,值'1451649600512'对应于'Fri,2016年1月1日12:00:00.512 GMT'

例子:

curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/telemetry --header "Content-Type:application/json"
复制代码
C:\Users\james>curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/telemetry --header "Content-Type:application/json"
Note: Unnecessary use of -X or --request, POST is already inferred.
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /api/v1/telemetry HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.55.1
> Accept: */*
> Content-Type:application/json
> Content-Length: 63
>
* upload completely sent off: 63 out of 63 bytes
< HTTP/1.1 202
< Content-Length: 0
< Date: Sun, 18 Aug 2019 16:16:07 GMT
<
* Connection #0 to host localhost left intact
复制代码

结果:

key= 1566144967139
属性名=stringKey 属性值=value1
属性名=booleanKey 属性值=true
属性名=doubleKey 属性值=42.0
属性名=longKey 属性值=73
复制代码

属性API

属性API容许设备

  • 将客户端设备属性上载到服务器。
  • 从服务器请求客户端和共享设备属性。

将属性更新发布到服务器

要将客户端设备属性发布到ThingsBoard服务器节点,请将POST请求发送到如下URL:

http://localhost:8080/api/v1/attributes
复制代码

例子:

curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/attributes --header "Content-Type:application/json"
复制代码
C:\Users\james>curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/attributes --header "Content-Type:application/json"
Note: Unnecessary use of -X or --request, POST is already inferred.
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /api/v1/attributes HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.55.1
> Accept: */*
> Content-Type:application/json
> Content-Length: 63
>
* upload completely sent off: 63 out of 63 bytes
< HTTP/1.1 202
< Content-Length: 0
< Date: Sun, 18 Aug 2019 16:21:00 GMT
<
* Connection #0 to host localhost left intact
复制代码

结果:

属性名=stringKey 属性值=value1
属性名=booleanKey 属性值=true
属性名=doubleKey 属性值=42.0
属性名=longKey 属性值=73
复制代码

API限额服务

为了演示方便,咱们设置60s内最多API请求测试为10次,如今咱们使用遥测上传API连续发起接口调用,咱们会发现以下的状况出现:

属性名=longKey 属性值=73
属性名=stringKey 属性值=value1
属性名=booleanKey 属性值=true
属性名=doubleKey 属性值=42.0
属性名=longKey 属性值=73
2019-08-19 00:26:25.696  WARN 16332 --- [nio-8080-exec-1] c.s.http.controller.DeviceApiController  : REST Quota exceeded for [0:0:0:0:0:0:0:1] . Disconnect
2019-08-19 00:26:26.402  WARN 16332 --- [nio-8080-exec-2] c.s.http.controller.DeviceApiController  : REST Quota exceeded for [0:0:0:0:0:0:0:1] . Disconnect
复制代码

这说明了咱们的API限额服务起了做用,固然你也能够测试黑白名单等功能。

当在真实状况下,一般的API限额会很大,我这里提供了一个gatling自动化测试来提供接口测试。地址为:github.com/sanshengshu…

关于gatling的其余信息,你们能够参考:

到此,物联网时代,相信你们对IOT架构下的HTTP协议和API相关限制有所了解了,感谢你们的阅读!

相关文章
相关标签/搜索