物联网是将无处不在(Ubiquitous)的末端设备(Devices)和设施(Facilities),包括具有“内在智能”的传感器、移动终端、工业系统、楼控系统、家庭智能设施、视频监控系统等、和“外在使能”(Enabled)的,如贴上RFID的各类资产(Assets)、携带无线终端的我的与车辆等等“智能化物件或动物”或“智能尘埃”(Mote),经过各类无线和/或有线的长距离和/或短距离通信网络。此次咱们要说的是智慧农业的一个项目。
html
本项目是基于局域网和即将到来的5G为信息载体,以终端节点(EndNodes)、网关(Gateway)、云服务器(LoRaWAN Server)和客户端(Client)组成。用于监测温室,大棚等局部环境变化。作到实时监控,提早预防。前端
先让咱们一块儿看一下ChannelPipeline对事件流的拦截和处理流程java
每一个ChannelHandler 被添加到ChannelPipeline 后,都会建立一个ChannelHandlerContext 并与之建立的ChannelHandler 关联绑定。web
在ChannelHandler 添加到 ChannelPipeline 时会建立一个实例,就是接口 ChannelHandlerContext,它表明了 ChannelHandler 和ChannelPipeline 之间的关联。接口ChannelHandlerContext 主要是对经过同一个 ChannelPipeline 关联的 ChannelHandler 之间的交互进行管理。sql
那么我就很少说了,直接开干。数据库
第一步:先启动线程。ServerServletListenerexpress
public void contextInitialized(ServletContextEvent arg0) {json
Thread thread = new Thread(new Runnable() {bootstrap
@Override数组
public void run() {
try {
nc = NettyClient.getInstance();
nc.setRecvCallback(new RecvData());
nc.setSendCallback(new SendData());
nc.connect("127.0.0.1", 9001);
} catch (Exception e) {
nc.shutdown();
log.error("启动Netty服务失败:" + e);
}
}
});
thread.start();
}
TcpHandler负责与管道打交道,是整个项目的最底层,他继承自ChannelInboundHandlerAdapter
是接收LoRa终端回传数据最底层的类。
//recvMessage方法接收从LoRa客户端传过来的参数
private void recvMessage(ByteBuf buf) {
byte[] cbBuf = new byte[buf.readableBytes()];
buf.readBytes(cbBuf);
logger.debug("硬件类型:" + cbBuf[0]);
switch (cbBuf[0]) {
case 3:
recvTHSensor(cbBuf);
break;
}
}
//cbBuf是包含了最原始的数据信息
//这个类的主要做用是对原始数据包进行处理
private void recvTHSensor(byte[] cbBuf) {
System.out.println("========TcpHandler==recvTHSensor==========");
int length = cbBuf.length;
if (6 == length) {
MsgTHSensorStateNotify msg = new MsgTHSensorStateNotify();
boolean b = msg.Unpacking(cbBuf);
ICallbackRecv callback = NettyClient.getInstance().getRecvCallback();
if ((b) && (callback != null))
callback.onTHSensorStateNotify(msg);
else
logger.info("解包出错!");
}
else if (10 == length) {
MsgTHSensorNotify msg = new MsgTHSensorNotify();
boolean b = msg.Unpacking(cbBuf);
ICallbackRecv callback = NettyClient.getInstance().getRecvCallback();
if ((b) && (callback != null))
callback.onTHSensorNotify(msg);
else
logger.info("解包出错!");
}
}
//这个类的主要做用是对原始数据包进行拆包处理处理,获取温度和湿度信息
public boolean Unpacking(byte[] data)
{
int length = data.length;
if (10 != length) return false;
this.humidity = (Byte.toString(data[(length - 4)]) + "." + Byte.toString(data[(length - 3)]));
this.temperature = (Byte.toString(data[(length - 2)]) + "." + Byte.toString(data[(length - 1)]));
return true;
}
RecvData
//收到传感器触发数据,把MsgTHSensorNotify解析出来的数据存入到数据库中
public void onTHSensorNotify(MsgTHSensorNotify arg0) {
logger.debug(arg0.toString());
try {
IDataRecordSetDao idrs = new DataRecordSetDao();
DataRecordSet drs = new DataRecordSet();
drs.setTemperature(arg0.getTemperature());
System.out.println("======onTHSensorNotify中getTemperature的值为"+arg0.getTemperature()+"==========");
drs.setHumidity(arg0.getHumidity());
System.out.println("=============onTHSensorNotify中getHumidity的值为"+arg0.getHumidity()+"=====");
drs.setData_time((new Date()).getTime());
idrs.add(drs);
} catch (SQLException e) {
logger.warn("数据新增出错"+e);
}
}
须要说明的一点是DataRecordSet是对MsgTHSensorNotify数据的封装,他的做用是将MsgTHSensorNotify的数据进行进一步的封装,以便数据库查询数据,接收数据,用在数据层。MsgTHSensorNotify是对TcpHandler传过来的数据进行提取封装的类。
DataRecordSetDao是数据库操做类
DataRecordSet是对MsgTHSensorNotify数据的封装,他的做用是将MsgTHSensorNotify的数据进行进一步的封装,以便数据库查询数据,接收数据,用在数据层。
@Override
public void add(DataRecordSet drs) throws SQLException {
String sql = "insert into `data_record`(`temperature`,`humidity`,`dt`) values (?,?,?);";
PreparedStatement pstmt = connection.prepareStatement(sql);
pstmt.setString(1, drs.getTemperature());
pstmt.setString(2, drs.getHumidity());
pstmt.setLong(3, valueToLongs(drs.getData_time()));
System.out.println("===============插入数据库==============");
pstmt.executeUpdate();
pstmt.close();
connection.close();
}
这里必须强调一点的是drs.setData_time((new Date()).getTime());//设置的是毫秒数
他的数值已经很大了,细细一数已经到13位数了(1525513938762),这就涉及到存储的问题了。首先说java部分吧。
public void setData_time(long data_time),set方法是long类型的参数,而后从Long的包装类中valueOf(Long l)得到启发,便本身也写了一个相似的方法,在网上不少人都在问如何存储一个超过9位数的商品编号,那么这就是一个很好的例子。
public static Long valueToLongs(long l) {
return new Long(l);
}
数据库部分你须要定义一个bigint类型的time值。
如今数据已经存储到数据库了,接下来咱们在经过其余类来取出数据,更加客观的展示给人们。
咱们经过JSON传过来请求参数"code":0,"device":"Web","expression":{"field":"","start_time":start,"end_time":end,"length":length},"signature":"LoRa"
建立相应的实体类来接收数据,要强调一点的是实体类属性最好与AJAX传过来的键一致。相似POJO。这点很重要,不然会发生意想不到的错误,并且还很差修改。具体缘由请了解@ResponseBody和@RequestBody的匹配规则。
先来看看AJAX请求的数据。
Url部分必定要修改为本身的url地址
Spring是位于前端控制器部分,他负责请求转发和数据处理。
@RequestMapping(value ="/getSensorRecord", method = {RequestMethod.POST }, produces = "application/json;charset=utf-8")
public @ResponseBody Map<String, Object> getSensorRecord(@RequestBody RecvJson recv) {
Map<String, Object> result = new HashMap<String, Object>();
if (null != recv) {
if (recv.getCode() == QueryCode.TemperatureHumidityCode && "LoRa".equals(recv.getSignature())) {
try {
//数据库接收数据,查询数据的类
IDataRecordSetDao drd = new DataRecordSetDao();
int length = (0 != recv.getExpression().getLength())?recv.getExpression().getLength():20 ;
ArrayList<DataRecordSet> list = drd.query(recv.getExpression().getStart_time(),
recv.getExpression().getEnd_time(), length);
int listLength = (list.size() > 20)?length:list.size();
for (int i = 0; i < listLength; i++) {
Map<String, Object> unit = new HashMap<String, Object>();
unit.put("id", i + 1);
if ("T".equals(recv.getExpression().getField().trim())) {
unit.put("temperature", list.get(i).getTemperature());
} else if ("H".equals(recv.getExpression().getField().trim())) {
unit.put("humidity", list.get(i).getHumidity());
} else {
unit.put("temperature", list.get(i).getTemperature());
unit.put("humidity", list.get(i).getHumidity());
}
unit.put("time",
(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date(list.get(i).getData_time())));
result.put(Integer.toString(i + 1), unit);
unit = null;
}
result.put("error", 0);
result.put("data", recv.getCode());
list = null;
drd = null;
} catch (Exception e) {
logger.error("查询数据出错:" + e);
}
} else {
// {"error":1,"data":"0","message":"code或signature错误!"}
result.put("error", 1);
result.put("data", recv.getCode());
result.put("message", "code或signature错误!");
}
}
return result;
}
这部分我来详细解释一下。
@RequestMapping
RequestMapping是一个用来处理请求地址映射的注解,可用于类或方法上。用于类上,表示类中的全部响应请求的方法都是以该地址做为父路径。
RequestMapping注解有六个属性,下面咱们把她分红三类进行说明。
value: 指定请求的实际地址,指定的地址能够是URI Template 模式;
method: 指定请求的method类型, GET、POST、PUT、DELETE等;
consumes: 指定处理请求的提交内容类型(Content-Type),例如application/json, text/html;
produces: 指定返回的内容类型,仅当request请求头中的(Accept)类型中包含该指定类型才返回;
params: 指定request中必须包含某些参数值是,才让该方法处理。
headers: 指定request中必须包含某些指定的header值,才能让该方法处理请求。
入口函数IotHubServer,这个启动类是由Netty框架实现的,适合有必定基础的朋友,在这里我是参考李林锋编著的《Netty权威指南》当知识积累到必定程度,要学会看书,找资料,看论文,这是培养思惟方式。不少同窗有一个误区,当有一个新技术出现的时候,若是脑海里第一时间想到的是有没有视频,这就完了,出视频的时候基本上是有人已经研究透这东西了,随着时间和经验的增加,要去当领跑者,而不是局限于跟随者,要提高本身的认知。固然,遇到问题最好先看源码,这样提高很快。
首先让咱们看一下入口main函数:
public static void main(String[] args) {
PropertyConfigurator.configure("config/log4j.properties");
System.out.println("===============IotHubServer==》》》main============================");
new IotHubServer(Port).run();
}
/**
*用于启动服务端 IotHubChannelInitializer
*/
public void run() {
try {
IotHubChannelInitializer iothub = new IotHubChannelInitializer();
iothub.run(this.port);
System.out.println("=================run============================");
} catch (Exception e) {
logger.error("服务启动失败->" + e.getMessage());
}
}
须要说明的是,Netty协议通讯双方链路创建成功以后,双方能够进行全双工通讯,不管客户端仍是服务端,均可以主动发送消息给对方,通讯方式能够是TWO WAY或者ONE WAY。双方之间的心跳采用的是Ping-Pong机制,当链路处于空闲状态时,客户端主动发送Ping消息给服务端,服务端接收到消息后发送应答消息Pong给客户端。
若是客户端连续发送N条Ping消息都没有收到服务端返回的Pong消息,说明链路已经挂死或者双方处于异常状态,客户端主动关闭链接,间隔周期T后发起重连操做,直到重连成功。
public void run(int port) throws Exception {
//配置服务器端的NIO线程组,接受客户端的链接、TCP数据的读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new IotHubChannelInitializer());
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.SO_RCVBUF, 128);
bootstrap.option(ChannelOption.SO_SNDBUF, 128);
bootstrap.option(ChannelOption.SO_TIMEOUT, 5000);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
logger.info("服务启动。");
ChannelFuture future = bootstrap.bind(port);
future.sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw e;
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
/*
*这个函数的做用是当建立NioSocketChannel成功以后,在初始化它的时候将它的channelhandler设置到ChannelPipeline 中去,用于处理网络IO事件。
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ByteBuf cbDelimiter = ch.alloc().buffer(2);
cbDelimiter.writeBytes(Delimiter.getBytes());
pipeline.addLast(new DelimiterBasedFrameDecoder(128, true, false, cbDelimiter));
pipeline.addLast(new IotHubHandler());
}
IotHubHandler
/*
* 当ChannelHandler被添加到一个ChannelPipeline时被调用
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel)ctx.channel();
String clientIp = channel.remoteAddress().getAddress().getHostAddress();
String log = String.format("Connect, Ip:%s", clientIp);
logger.info(log);
super.handlerAdded(ctx);
}
/*
* 当客户端和服务端TCP链路创建成功之后,Netty的NIO线程会调用channelActive方法,发送查询指令给服务器。
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
/*
* 拦截处理器
*/
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
return super.acceptInboundMessage(msg);
}
/*
* 当从Channel中读数据时被调用,channelRead0还有一个好处就是你不用关心释放资源,由于源码中已经帮你释放
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("=================channelRead0============================");
recvMessage(ctx, msg);
}
/*
* 当Channel上的某个读操做完成时被调用
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
System.out.println("=================channelReadComplete============================");
super.channelReadComplete(ctx);
}
//这是用来接收数据的方法
private void recvMessage(ChannelHandlerContext ctx, Object msg) {
System.out.println("=================recvMessage============================");
SocketChannel socketchannel = (SocketChannel)ctx.channel();
String clientIp = socketchannel.remoteAddress().getAddress().getHostAddress();
logger.info("收到数据:" + clientIp);
try {
ByteBuf byteBuf = (ByteBuf) msg;
byteBuf.discardReadBytes();
byte[] header = {0};
header[0]= byteBuf.readByte();
System.out.println("=================header[0]============================");
switch (header[0]) {
//硬件协议头
case com.nycent.iothub.entity.FrameType.ProtocolHead:
logger.debug("硬件链接");
mapContext.put(ctx, ClientType.ClientTypeThing); //记录硬件链接
recvFromThings(ctx, byteBuf);
break;
//默认软件控制系统
case com.nycent.iothub.entity.FrameType.ProtoApiTypeMobile:
mapContext.put(ctx, ClientType.ClientTypeMobile); //记录手机移动端
recvFromSystem(ctx, byteBuf);
break;
case com.nycent.iothub.entity.FrameType.ProtoApiTypeServer:
mapContext.put(ctx, ClientType.ClientTypeWeb); //记录web端
recvFromSystem(ctx, byteBuf);
break;
default:
ctx.close();
break;
}
} catch (Exception e) {
logger.error("->data exception:" + e.getMessage());
logger.error("->ip:" + clientIp + "->msg:" + msg);
ctx.close();
} finally {
//ReferenceCountUtil.release(msg);
}
}
/**
* 收到硬件设备操做,心跳检测,数据处理
*/
private void recvFromThings(ChannelHandlerContext ctx, ByteBuf byteBuf) {
System.out.println("=================recvFromThings============================");
try {
byte cbBuf[] = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(cbBuf);
//打印日志
if(logger.isDebugEnabled()){
logger.debug("接收到硬件数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(cbBuf));
}
//处理接收到的数据
int length = cbBuf.length-2;
int val_sum = 0;
for(int i=0; i<=length; i++){
val_sum += (0x0ff & cbBuf[i]);
}
if((0x0ff & cbBuf[0]) == length && val_sum ==(0x0ff & cbBuf[length+1])){
logger.debug("校验成功");
//数组拷贝
byte buf[] = new byte[length+1];
buf[0] = cbBuf[2];
System.arraycopy(cbBuf, 1, buf, 1, length);
if(logger.isDebugEnabled()){
logger.debug("发送第三方数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(buf));
}
sendToSystem(buf);
}
} catch (Exception e) {
logger.error(e.getMessage());
ctx.close();
}
}
/**
* 转发数据到web服务器
* @param buf
*/
public void sendToSystem(byte[] buf) {
System.out.println("=================sendToSystem============================");
for (Map.Entry<ChannelHandlerContext, Integer> entry : mapContext.entrySet()) {
if (entry.getValue() == ClientType.ClientTypeMobile) {
ChannelHandlerContext ctx = entry.getKey();
ByteBuf msg = ctx.alloc().buffer(buf.length + 1);
msg.writeByte(com.nycent.iothub.entity.FrameType.ProtoApiTypeMobile);
msg.writeBytes(buf);
msg.writeBytes(IotHubChannelInitializer.Delimiter.getBytes());
ctx.writeAndFlush(msg);
}else if(entry.getValue() == ClientType.ClientTypeWeb){
ChannelHandlerContext ctx = entry.getKey();
ByteBuf msg = ctx.alloc().buffer(buf.length + 1);
msg.writeByte(com.nycent.iothub.entity.FrameType.ProtoApiTypeServer);
msg.writeBytes(buf);
msg.writeBytes(IotHubChannelInitializer.Delimiter.getBytes());
ctx.writeAndFlush(msg);
}
}
}
/**
* 转发数据到硬件
* @param cbBuf
*/
public void sendToGateway(byte[] buf) {
System.out.println("=================sendToGateway============================");
for (Map.Entry<ChannelHandlerContext, Integer> entry : mapContext.entrySet()) {
if (entry.getValue() == ClientType.ClientTypeThing) {
ChannelHandlerContext ctx = entry.getKey();
ByteBuf msg = ctx.alloc().buffer(buf.length + 1);
msg.writeByte(com.nycent.iothub.entity.FrameType.ProtocolHead);
msg.writeBytes(buf);
//msg.writeBytes(IotHubChannelInitializer.Delimiter.getBytes());
//如下四行代码是测试msg的信息
ByteBuf byteBuf = (ByteBuf) msg;
byte[] req = new byte[byteBuf.readableBytes()];
for (byte b : req) {
System.out.println("sendToGateway中msg的信息为"+b);
}
ctx.writeAndFlush(msg);
}
}
}
/**
* 收到硬件设备操做
*/
private void recvFromThings(ChannelHandlerContext ctx, ByteBuf byteBuf) {
System.out.println("=================recvFromThings============================");
try {
byte cbBuf[] = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(cbBuf);
//打印日志
if(logger.isDebugEnabled()){
logger.debug("接收到硬件数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(cbBuf));
}
//处理接收到的数据
int length = cbBuf.length-2;
int val_sum = 0;
for(int i=0; i<=length; i++){
val_sum += (0x0ff & cbBuf[i]);
}
if((0x0ff & cbBuf[0]) == length && val_sum ==(0x0ff & cbBuf[length+1])){
logger.debug("校验成功");
//数组拷贝
byte buf[] = new byte[length+1];
buf[0] = cbBuf[2];
System.arraycopy(cbBuf, 1, buf, 1, length);
if(logger.isDebugEnabled()){
logger.debug("发送第三方数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(buf));
}
sendToSystem(buf);
}
} catch (Exception e) {
logger.error(e.getMessage());
ctx.close();
}
}
public class HexUtil {
public HexUtil() {
}
/**
* 将指定byte数组以16进制的形式打印到控制台
*/
public static void printHexString(String hint, byte[] b) {
System.out.print("=========》》》"+hint);
System.out.print("======printHexString===》》》");
for (int i = 0; i < b.length; i++) {
String hex = Integer.toHexString(b[i] & 0xFF);
if (hex.length() == 1) {
hex = '0' + hex;
}
System.out.print(hex.toUpperCase() + " ");
}
System.out.println("");
}
/**
*
*/
public static String Bytes2HexString(byte[] b) {
System.out.print("======Bytes2HexString===");
String ret = "";
for (int i = 0; i < b.length; i++) {
String hex = Integer.toHexString(b[i] & 0xFF);
if (hex.length() == 1) {
hex = '0' + hex;
}
ret += hex.toUpperCase() + " ";
}
return ret;
}
/**
* 将两个ASCII字符合成一个字节; 如:"EF"–> 0xEF
*/
public static byte uniteBytes(byte src0, byte src1) {
byte _b0 = Byte.decode("0x" + new String(new byte[] { src0 }))
.byteValue();
_b0 = (byte) (_b0 << 4);
byte _b1 = Byte.decode("0x" + new String(new byte[] { src1 }))
.byteValue();
byte ret = (byte) (_b0 ^ _b1);
return ret;
}
/**
* 将指定字符串src,以每两个字符分割转换为16进制形式 如:"2B44EFD9" –> byte[]{0x2B, 0x44, 0xEF,
* 0xD9}
*/
public static byte[] HexString2Bytes(String src) {
int nLen = src.length();
byte[] ret = new byte[nLen/2];
byte[] tmp = src.getBytes();
for (int i = 0; i < nLen/2; i++) {
ret[i] = uniteBytes(tmp[i * 2], tmp[i * 2 + 1]);
}
return ret;
}
}
/*
* 计算Map对象的个数
*/
public class MapSize<K, V> {
/*
* 计算值重复个数
*/
public int mapRepeatSize(Map<K, V> map, V value) {
if(map.size() < 1){
return 0;
}
int count = 0;
for(Map.Entry<K, V> entry : map.entrySet()){
if(entry.getValue() == value)count++;
}
return count;
}
}
整个项目的核心代码所有在这里了,因为整个项目无法所有放置在这里。若是有须要源码的朋友能够给我留言,我双手奉上。
创做不易,如需转载请注明出处。