上回说到Web manage的构建,完成的对产品,物模型中的属于数据,设备数据,并把对应的数据缓存到redis中,接下来就开始coap客户端和服务器的构建。html
现阶段PC网络交互中较多的是使用tcp和http协议,但物联网设备都要求较小的功耗、较小的带宽,而且CPU、内存都是有限的,因此在这种需求下,http相对就不实用了,由于http相对臃肿,而CoAP是受限制的应用协议的代名词,CoAp和http同样都是应用层的协议,而且它是一种类http协议,相同的如请求响应模式,url方式,请求方法(作了缩减),响应码(作了简化),不一样的是CoAp是基于UDP的,能够双向通讯(既是客户端又是服务端),而且CoAp协议很是的小,最小的数据包仅为4k。git
Var:版本编号redis
T:报文类型,coap定义了4种报文类型spring
TKL:标识符长度,CoAp定义了两种标识符,Message Id(必须)和Token(非必须)json
Code:响应码,如4.04,5.00,和http中的40四、500功能相似缓存
Message Id:报文编号服务器
Token:标识符具体的内容网络
Option:可选选项参数,一个或多个,可设置Uri-Host、Uri-Port、Uri-Path和Uri-Query,CoAp对Option定了3部分架构
1111 1111B:报文与所承载的数据分隔符并发
Payload:所承载的数据实体
请求方法和http相似,coap定义了4种请求方式
和http相似,用户定义所承载的数据的具体格式,如text/html,application/json
上面就是对coap作了一个简单的介绍,对coap协议有个大概的了解,接下来就开始对client和server的编码了,固然笔者这里也不可能本身写一个对coap的实现,笔者这里使用的是californium-core。
构建iot-coap-client模块,加入californium-core依赖
<dependency> <groupId>org.eclipse.californium</groupId> <artifactId>californium-core</artifactId> <version>2.0.0-M14</version> </dependency>
这里咱们用定时任务来模拟物理网设备数据的定时发送
建立Scheduler类,因咱们定了byte和json两种数据格式,因此这里编写sendByte()和sendJson(),iot-pt设定一类物理设备只能发送一种数据格式,但这里为了方便,笔者就使用具体发送数据格式的方法来模拟一种具体的设备,使用随机数来模拟设备的数据变化。
private Random ra = new Random(); @Scheduled(fixedRate = 2000) public void sendByte() throws URISyntaxException { //建立请求资源 URI uri = new URI("coap://localhost:5683/iot-byte?201904151718"); CoapClient client = new CoapClient(uri); StringBuilder sb = new StringBuilder(); sb.append(ra.nextInt(999)%(999-100+1)+100); sb.append(new BigDecimal(ra.nextDouble()).setScale(2, BigDecimal.ROUND_HALF_UP)); //请求资源 CoapResponse response = client.post(sb.toString(), MediaTypeRegistry.TEXT_PLAIN); if(response !=null){ System.out.println(response.getCode()); //请求状态码 System.out.println(response.getOptions()); //选项参数 System.out.println(response.getResponseText()); //内容文本信息 System.out.println(Utils.prettyPrint(response)); //报文内容 } }
@Scheduled(fixedRate = 4000) public void sendJson() throws URISyntaxException { //建立请求资源,201904151718 设备惟一编码,模拟imie URI uri = new URI("coap://localhost:5683/iot-json?2019041717"); CoapClient client = new CoapClient(uri); //温度 int temperature = ra.nextInt(999)%(999-100+1)+100; //湿度 String humidity = String.valueOf(new BigDecimal(ra.nextDouble()) .setScale(2, BigDecimal.ROUND_HALF_UP)); Map map = new HashMap<String,String>(); map.put("T",String.valueOf(temperature)); map.put("H",humidity); String json = JSONObject.toJSONString(map); client.post(json,MediaTypeRegistry.APPLICATION_JSON); }
Copa发送数据的客户端已经写好了,就下了开始server的撸码
首先仍是构建iot-coap-server模块,加入californium-core依赖
[@Component](https://my.oschina.net/u/3907912) public class ServerStart { @Value("${coap.port}") private int port; @Autowired private IotByteHandler iotHandler; @Autowired private IotJsonHandler iotJsonHandler; public void start(){ Thread thread = new Thread(new Runnable() { @Override public void run() { CoapServer server = new CoapServer(port); server.add(iotHandler); server.add(iotJsonHandler); server.start(); } }); thread.start(); } }
由于我这里使用的是spring boot 的核心组件,spring boot启动完成后因为没有应用线程运行,因此项目jvm会自动退出,由于这里使用Thread线程来启动CoapServer,CoapServer会一直监听消息接受,jvm守护进程就不会退出。
接下来编写IotByteHandler和IotJsonHandler,这种Handler的实现方式和netty有点相似。
@Component public class IotByteHandler extends CoapResource { public IotByteHandler(@Value("${coap.iot.byte}") String name) { super(name); } @Override public void handlePOST(CoapExchange exchange) { //状态码 System.out.println("code---"+exchange.getRequestCode()); //选项参数 System.out.println("Options---"+exchange.getRequestOptions()); //文本内容 System.out.println("text"+exchange.getRequestText()); System.out.println(exchange.getRequestOptions()); } }
@Component public class IotJsonHandler extends CoapResource { public IotJsonHandler(@Value("${coap.iot.json}") String name) { super(name); } @Override public void handlePOST(CoapExchange exchange) { System.out.println("option---"+exchange.getRequestOptions()); System.out.println("json---" + exchange.getRequestText()); } }
spring boot runner 启动coapServer
@Component public class CoapApplicationRunner implements ApplicationRunner { @Autowired private ServerStart serverStart; @Override public void run(ApplicationArguments args) throws Exception { serverStart.start(); } }
接着启动CoapServer和CoapClient,看数据是否符合咱们预约的格式发送过来了
ok,数据以及发到服务器了,安装咱们的架构设备,CoApServer须要把数据整理并发送到kafka的,所以kafka不少地方都须要使用,因此在这里独立构建一个kafka模块,iot-kafka。
KafkaSource负责把协议服务收到的消息发送给kafak
@Component public class KafkaSource { @Autowired private KafkaTemplate kafkaTemplate; public void send(KafkaSourceVO vo){ kafkaTemplate.send(SOURCE_TOPIC,JSONObject.toJSONString(vo)); } }
修改CoApServer的IotByteHandler和IotJsonHandler,加入kafka写消息
@Component public class IotByteHandler extends CoapResource { @Autowired private KafkaSource kafkaSource; public IotByteHandler(@Value("${coap.iot.byte}") String name) { super(name); } @Override public void handlePOST(CoapExchange exchange) { //状态码 System.out.println("code---"+exchange.getRequestCode()); //选项参数 System.out.println("Options---"+exchange.getRequestOptions()); //文本内容 System.out.println("text"+exchange.getRequestText()); System.out.println(exchange.getRequestOptions()); KafkaSourceVO vo = new KafkaSourceVO(exchange.getRequestOptions(). getUriQuery().get(0),exchange.getRequestText(),new Date()); kafkaSource.send(vo); exchange.respond(CoAP.ResponseCode.CONTENT,"ok"); } }
@Component public class IotJsonHandler extends CoapResource { @Autowired private KafkaSource kafkaSource; public IotJsonHandler(@Value("${coap.iot.json}") String name) { super(name); } @Override public void handlePOST(CoapExchange exchange) { KafkaSourceVO vo = new KafkaSourceVO(exchange.getRequestOptions(). getUriQuery().get(0),exchange.g etRequestText(),new Date()); kafkaSource.send(vo); exchange.respond(CoAP.ResponseCode.CONTENT,"ok"); } }
public class KafkaSourceVO { //设备惟一码 private String imei; //数据 private String data; //这里用服务接送到消息的时间模拟设备采集数据的时间 private Date collTime;
再次启动CoApServer和CoApClient,验证是否把数据写如kafka了。
接下来就是Mapping Server的实现了,请听下回分解,具体的代码细节在git