基于Netty的物联网应用

物联网是将无处不在(Ubiquitous)的末端设备(Devices)和设施(Facilities),包括具有“内在智能”的传感器、移动终端、工业系统、楼控系统、家庭智能设施、视频监控系统等、和“外在使能”(Enabled)的,如贴上RFID的各类资产(Assets)、携带无线终端的我的与车辆等等“智能化物件或动物”或“智能尘埃”(Mote),经过各类无线和/或有线的长距离和/或短距离通信网络。此次咱们要说的是智慧农业的一个项目。
html

本项目是基于局域网和即将到来的5G为信息载体,以终端节点(EndNodes)、网关(Gateway)、云服务器(LoRaWAN Server)和客户端(Client)组成。用于监测温室,大棚等局部环境变化。作到实时监控,提早预防。前端

先让咱们一块儿看一下ChannelPipeline对事件流的拦截和处理流程java

                                          

     

每一个ChannelHandler 被添加到ChannelPipeline 后,都会建立一个ChannelHandlerContext 并与之建立的ChannelHandler 关联绑定。web

在ChannelHandler 添加到 ChannelPipeline 时会建立一个实例,就是接口 ChannelHandlerContext,它表明了 ChannelHandler 和ChannelPipeline 之间的关联。接口ChannelHandlerContext 主要是对经过同一个 ChannelPipeline 关联的 ChannelHandler 之间的交互进行管理sql

那么我就很少说了,直接开干。数据库

第一步:先启动线程。ServerServletListenerexpress

public void contextInitialized(ServletContextEvent arg0) {json

Thread thread = new Thread(new Runnable() {bootstrap

@Override数组

public void run() {

try {

nc = NettyClient.getInstance();

nc.setRecvCallback(new RecvData());

nc.setSendCallback(new SendData());

nc.connect("127.0.0.1", 9001);

} catch (Exception e) {

nc.shutdown();

log.error("启动Netty服务失败:" + e);

}

}

});

thread.start();

}

TcpHandler负责与管道打交道,是整个项目的最底层,他继承自ChannelInboundHandlerAdapter

是接收LoRa终端回传数据最底层的类。

TcpHandler

 //recvMessage方法接收从LoRa客户端传过来的参数

   private void recvMessage(ByteBuf buf) {

     byte[] cbBuf = new byte[buf.readableBytes()];

     buf.readBytes(cbBuf);

     logger.debug("硬件类型:" + cbBuf[0]);

     switch (cbBuf[0]) {

     case 3:

       recvTHSensor(cbBuf);

       break;

     }

   }

//cbBuf是包含了最原始的数据信息

   //这个类的主要做用是对原始数据包进行处理

   private void recvTHSensor(byte[] cbBuf) {

System.out.println("========TcpHandler==recvTHSensor==========");

     int length = cbBuf.length;

     if (6 == length) {

       MsgTHSensorStateNotify msg = new MsgTHSensorStateNotify();

       boolean b = msg.Unpacking(cbBuf);

       ICallbackRecv callback = NettyClient.getInstance().getRecvCallback();

       if ((b) && (callback != null))

         callback.onTHSensorStateNotify(msg);

       else

         logger.info("解包出错!");

     }

     else if (10 == length) {

       MsgTHSensorNotify msg = new MsgTHSensorNotify();

       boolean b = msg.Unpacking(cbBuf);

       ICallbackRecv callback = NettyClient.getInstance().getRecvCallback();

       if ((b) && (callback != null))

         callback.onTHSensorNotify(msg);

       else

         logger.info("解包出错!");

     }  

}

MsgTHSensorNotify

//这个类的主要做用是对原始数据包进行拆包处理处理获取温度和湿度信息

  public boolean Unpacking(byte[] data)

   {

     int length = data.length;

     if (10 != length) return false;

     this.humidity = (Byte.toString(data[(length - 4)]) + "." + Byte.toString(data[(length - 3)]));

     this.temperature = (Byte.toString(data[(length - 2)]) + "." + Byte.toString(data[(length - 1)]));

     return true;

   }

RecvData

//收到传感器触发数据MsgTHSensorNotify解析出来的数据存入到数据库中

public void onTHSensorNotify(MsgTHSensorNotify arg0) {

logger.debug(arg0.toString());

try {

IDataRecordSetDao idrs = new DataRecordSetDao();

DataRecordSet drs = new DataRecordSet();

drs.setTemperature(arg0.getTemperature());

System.out.println("======onTHSensorNotifygetTemperature的值为"+arg0.getTemperature()+"==========");

drs.setHumidity(arg0.getHumidity());

System.out.println("=============onTHSensorNotifygetHumidity的值为"+arg0.getHumidity()+"=====");

drs.setData_time((new Date()).getTime());

idrs.add(drs);

} catch (SQLException e) {

logger.warn("数据新增出错"+e);

}

}

须要说明的一点是DataRecordSet是对MsgTHSensorNotify数据的封装,他的做用是将MsgTHSensorNotify的数据进行进一步的封装,以便数据库查询数据接收数据用在数据层。MsgTHSensorNotify是对TcpHandler传过来的数据进行提取封装的类。

DataRecordSetDao是数据库操做类

IDataRecordSetDao

DataRecordSet是对MsgTHSensorNotify数据的封装,他的做用是将MsgTHSensorNotify的数据进行进一步的封装,以便数据库查询数据接收数据用在数据层。

@Override

public void add(DataRecordSet drs) throws SQLException {

String sql = "insert into `data_record`(`temperature`,`humidity`,`dt`) values (?,?,?);";

PreparedStatement pstmt = connection.prepareStatement(sql);

pstmt.setString(1, drs.getTemperature());

pstmt.setString(2, drs.getHumidity());

pstmt.setLong(3, valueToLongs(drs.getData_time()));

System.out.println("===============插入数据库==============");

pstmt.executeUpdate();

pstmt.close();

connection.close();

}

这里必须强调一点的是drs.setData_time((new Date()).getTime());//设置的是毫秒数

他的数值已经很大了,细细一数已经到13位数了(1525513938762),这就涉及到存储的问题了。首先说java部分吧。

public void setData_time(long data_time)set方法是long类型的参数,而后从Long的包装类中valueOf(Long l)得到启发,便本身也写了一个相似的方法,在网上不少人都在问如何存储一个超过9位数的商品编号,那么这就是一个很好的例子。

public static Long valueToLongs(long l) {

return new Long(l);

}

数据库部分你须要定义一个bigint类型的time值。

如今数据已经存储到数据库了,接下来咱们在经过其余类来取出数据,更加客观的展示给人们。

咱们经过JSON传过来请求参数"code":0,"device":"Web","expression":{"field":"","start_time":start,"end_time":end,"length":length},"signature":"LoRa"

建立相应的实体类来接收数据,要强调一点的是实体类属性最好与AJAX传过来的键一致。相似POJO。这点很重要,不然会发生意想不到的错误,并且还很差修改。具体缘由请了解@ResponseBody@RequestBody的匹配规则。

先来看看AJAX请求的数据。

Url部分必定要修改为本身的url地址

Spring是位于前端控制器部分,他负责请求转发和数据处理。

@RequestMapping(value ="/getSensorRecord", method = {RequestMethod.POST }, produces = "application/json;charset=utf-8")

public @ResponseBody Map<String, Object> getSensorRecord(@RequestBody RecvJson recv) {

Map<String, Object> result = new HashMap<String, Object>();

if (null != recv) {

if (recv.getCode() == QueryCode.TemperatureHumidityCode && "LoRa".equals(recv.getSignature())) {

try {

//数据库接收数据,查询数据的类

IDataRecordSetDao drd = new DataRecordSetDao();

int length = (0 != recv.getExpression().getLength())?recv.getExpression().getLength():20 ;

ArrayList<DataRecordSet> list = drd.query(recv.getExpression().getStart_time(),

recv.getExpression().getEnd_time(), length);

int listLength = (list.size() > 20)?length:list.size();

for (int i = 0; i < listLength; i++) {

Map<String, Object> unit = new HashMap<String, Object>();

unit.put("id", i + 1);

if ("T".equals(recv.getExpression().getField().trim())) {

unit.put("temperature", list.get(i).getTemperature());

} else if ("H".equals(recv.getExpression().getField().trim())) {

unit.put("humidity", list.get(i).getHumidity());

} else {

unit.put("temperature", list.get(i).getTemperature());

unit.put("humidity", list.get(i).getHumidity());

}

unit.put("time",

(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).format(new Date(list.get(i).getData_time())));

result.put(Integer.toString(i + 1), unit);

unit = null;

}

result.put("error", 0);

result.put("data", recv.getCode());

list = null;

drd = null;

} catch (Exception e) {

logger.error("查询数据出错:" + e);

}

} else {

// {"error":1,"data":"0","message":"codesignature错误!"}

result.put("error", 1);

result.put("data", recv.getCode());

result.put("message", "codesignature错误!");

}

}

return result;

}

这部分我来详细解释一下。

@RequestMapping

RequestMapping是一个用来处理请求地址映射的注解,可用于类或方法上。用于类上,表示类中的全部响应请求的方法都是以该地址做为父路径。

RequestMapping注解有六个属性,下面咱们把她分红三类进行说明。

1valuemethod

value     指定请求的实际地址,指定的地址能够是URI Template 模式;

method  指定请求的method类型, GETPOSTPUTDELETE等;

2consumesproduces

consumes: 指定处理请求的提交内容类型(Content-Type),例如application/json, text/html;

produces:    指定返回的内容类型,仅当request请求头中的(Accept)类型中包含该指定类型才返回;

3paramsheaders

params: 指定request中必须包含某些参数值是,才让该方法处理。

headers: 指定request中必须包含某些指定的header值,才能让该方法处理请求。

Bat启动类

入口函数IotHubServer,这个启动类是由Netty框架实现的,适合有必定基础的朋友,在这里我是参考李林锋编著的《Netty权威指南》当知识积累到必定程度,要学会看书,找资料,看论文,这是培养思惟方式。不少同窗有一个误区,当有一个新技术出现的时候,若是脑海里第一时间想到的是有没有视频,这就完了,出视频的时候基本上是有人已经研究透这东西了,随着时间和经验的增加,要去当领跑者,而不是局限于跟随者,要提高本身的认知。固然,遇到问题最好先看源码,这样提高很快。

首先让咱们看一下入口main函数:

public static void main(String[] args) {

PropertyConfigurator.configure("config/log4j.properties");

System.out.println("===============IotHubServer==》》》main============================");

new IotHubServer(Port).run();

} 

/**

 *用于启动服务端 IotHubChannelInitializer

 */

public void run() {

try {

IotHubChannelInitializer iothub = new IotHubChannelInitializer();

iothub.run(this.port);

System.out.println("=================run============================");

} catch (Exception e) {

logger.error("服务启动失败->" + e.getMessage());

}

}

IotHubChannelInitializer

须要说明的是,Netty协议通讯双方链路创建成功以后,双方能够进行全双工通讯,不管客户端仍是服务端,均可以主动发送消息给对方,通讯方式能够是TWO WAY或者ONE WAY。双方之间的心跳采用的是Ping-Pong机制,当链路处于空闲状态时,客户端主动发送Ping消息给服务端,服务端接收到消息后发送应答消息Pong给客户端。

若是客户端连续发送NPing消息都没有收到服务端返回的Pong消息,说明链路已经挂死或者双方处于异常状态,客户端主动关闭链接,间隔周期T后发起重连操做,直到重连成功。

public void run(int port) throws Exception {

//配置服务器端的NIO线程组,接受客户端的链接、TCP数据的读写

EventLoopGroup bossGroup = new NioEventLoopGroup();

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup, workerGroup);

bootstrap.channel(NioServerSocketChannel.class);

bootstrap.childHandler(new IotHubChannelInitializer()); 

bootstrap.option(ChannelOption.SO_BACKLOG, 1024);

bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

bootstrap.option(ChannelOption.SO_REUSEADDR, true);

bootstrap.option(ChannelOption.SO_RCVBUF, 128);

bootstrap.option(ChannelOption.SO_SNDBUF, 128);

bootstrap.option(ChannelOption.SO_TIMEOUT, 5000);

bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);

logger.info("服务启动。");

ChannelFuture future = bootstrap.bind(port);

future.sync();

future.channel().closeFuture().sync();

} catch (InterruptedException e) {

throw e;

} finally {

workerGroup.shutdownGracefully();

bossGroup.shutdownGracefully();

}

} 

/*

*这个函数的做用是当建立NioSocketChannel成功以后,在初始化它的时候将它的channelhandler设置到ChannelPipeline 中去,用于处理网络IO事件。

*/

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

ByteBuf cbDelimiter = ch.alloc().buffer(2);

cbDelimiter.writeBytes(Delimiter.getBytes());

pipeline.addLast(new DelimiterBasedFrameDecoder(128, true, false, cbDelimiter));

pipeline.addLast(new IotHubHandler());

}

IotHubHandler

/*

 * ChannelHandler被添加到一个ChannelPipeline时被调用

 */

@Override

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

SocketChannel channel = (SocketChannel)ctx.channel();

String clientIp = channel.remoteAddress().getAddress().getHostAddress();

String log = String.format("Connect, Ip:%s", clientIp);

logger.info(log);

super.handlerAdded(ctx);

}

/*

 * 客户端和服务端TCP链路创建成功之后,NettyNIO线程会调用channelActive方法,发送查询指令给服务器。

 */

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

super.channelActive(ctx);

} 

/*

*  拦截处理

*/

@Override

public boolean acceptInboundMessage(Object msg) throws Exception {

return super.acceptInboundMessage(msg);

} 

/*

 * 当从Channel中读数据时被调用,channelRead0还有一个好处就是你不用关心释放资源,由于源码中已经帮你释放

 */

@Override

protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

System.out.println("=================channelRead0============================");

recvMessage(ctx, msg);

}

/*

 * Channel上的某个读操做完成时被调用

 */

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

ctx.flush();

System.out.println("=================channelReadComplete============================");

super.channelReadComplete(ctx);

} 

//这是用来接收数据的方法

private void recvMessage(ChannelHandlerContext ctx, Object msg) {

System.out.println("=================recvMessage============================");

SocketChannel socketchannel = (SocketChannel)ctx.channel();

String clientIp = socketchannel.remoteAddress().getAddress().getHostAddress();

logger.info("收到数据:" + clientIp);

try {

        ByteBuf byteBuf = (ByteBuf) msg;

        byteBuf.discardReadBytes();

        byte[] header = {0};

        header[0]= byteBuf.readByte();

        System.out.println("=================header[0]============================");

        switch (header[0]) {

        //硬件协议头

case com.nycent.iothub.entity.FrameType.ProtocolHead:

logger.debug("硬件链接");

mapContext.put(ctx, ClientType.ClientTypeThing); //记录硬件链接

recvFromThings(ctx, byteBuf);

break;

//默认软件控制系统

case com.nycent.iothub.entity.FrameType.ProtoApiTypeMobile:

mapContext.put(ctx, ClientType.ClientTypeMobile); //记录手机移动端

recvFromSystem(ctx, byteBuf);

break;

case com.nycent.iothub.entity.FrameType.ProtoApiTypeServer:

mapContext.put(ctx, ClientType.ClientTypeWeb); //记录web

recvFromSystem(ctx, byteBuf);

break;

default:

ctx.close();

break;

}

} catch (Exception e) {

logger.error("->data exception:" + e.getMessage());

logger.error("->ip:" + clientIp + "->msg:" + msg);

ctx.close();

} finally {

//ReferenceCountUtil.release(msg);

}

} 

/**

 * 收到硬件设备操做心跳检测,数据处理

 */

private void recvFromThings(ChannelHandlerContext ctx, ByteBuf byteBuf) {

System.out.println("=================recvFromThings============================");

try {

byte cbBuf[] = new byte[byteBuf.readableBytes()];

byteBuf.readBytes(cbBuf);

//打印日志

if(logger.isDebugEnabled()){

logger.debug("接收到硬件数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(cbBuf));

}

//处理接收到的数据

int length = cbBuf.length-2;

int val_sum = 0;

for(int i=0; i<=length; i++){

val_sum += (0x0ff & cbBuf[i]);

}

if((0x0ff & cbBuf[0]) == length && val_sum ==(0x0ff & cbBuf[length+1])){

logger.debug("校验成功");

//数组拷贝

byte buf[] = new byte[length+1];

buf[0] = cbBuf[2];

System.arraycopy(cbBuf, 1, buf, 1, length);

if(logger.isDebugEnabled()){

logger.debug("发送第三方数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(buf));

}

sendToSystem(buf);

}

} catch (Exception e) {

logger.error(e.getMessage());

ctx.close();

}

} 

/**

 * 转发数据到web服务器

 * @param buf

 */

public void sendToSystem(byte[] buf) {

System.out.println("=================sendToSystem============================");

for (Map.Entry<ChannelHandlerContext, Integer> entry : mapContext.entrySet()) {

if (entry.getValue() == ClientType.ClientTypeMobile) {

ChannelHandlerContext ctx = entry.getKey();

ByteBuf msg = ctx.alloc().buffer(buf.length + 1);

msg.writeByte(com.nycent.iothub.entity.FrameType.ProtoApiTypeMobile);

msg.writeBytes(buf);

msg.writeBytes(IotHubChannelInitializer.Delimiter.getBytes());

ctx.writeAndFlush(msg);

}else if(entry.getValue() == ClientType.ClientTypeWeb){

ChannelHandlerContext ctx = entry.getKey();

ByteBuf msg = ctx.alloc().buffer(buf.length + 1);

msg.writeByte(com.nycent.iothub.entity.FrameType.ProtoApiTypeServer);

msg.writeBytes(buf);

msg.writeBytes(IotHubChannelInitializer.Delimiter.getBytes());

ctx.writeAndFlush(msg);

}

}

} 

/**

 * 转发数据到硬件

 * @param cbBuf

 */

public void sendToGateway(byte[] buf) {

System.out.println("=================sendToGateway============================");

for (Map.Entry<ChannelHandlerContext, Integer> entry : mapContext.entrySet()) {

if (entry.getValue() == ClientType.ClientTypeThing) {

ChannelHandlerContext ctx = entry.getKey();

ByteBuf msg = ctx.alloc().buffer(buf.length + 1);

msg.writeByte(com.nycent.iothub.entity.FrameType.ProtocolHead);

msg.writeBytes(buf);    

//msg.writeBytes(IotHubChannelInitializer.Delimiter.getBytes());

//如下四行代码是测试msg的信息

 ByteBuf byteBuf = (ByteBuf) msg;

        byte[] req = new byte[byteBuf.readableBytes()];

        for (byte b : req) {

System.out.println("sendToGatewaymsg的信息为"+b);

}        

ctx.writeAndFlush(msg);

}

}

}

/**

 * 收到硬件设备操做

 */

private void recvFromThings(ChannelHandlerContext ctx, ByteBuf byteBuf) {

System.out.println("=================recvFromThings============================");

try {

byte cbBuf[] = new byte[byteBuf.readableBytes()];

byteBuf.readBytes(cbBuf);

//打印日志

if(logger.isDebugEnabled()){

logger.debug("接收到硬件数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(cbBuf));

}

//处理接收到的数据

int length = cbBuf.length-2;

int val_sum = 0;

for(int i=0; i<=length; i++){

val_sum += (0x0ff & cbBuf[i]);

} 

if((0x0ff & cbBuf[0]) == length && val_sum ==(0x0ff & cbBuf[length+1])){

logger.debug("校验成功");

//数组拷贝

byte buf[] = new byte[length+1];

buf[0] = cbBuf[2];

System.arraycopy(cbBuf, 1, buf, 1, length);

if(logger.isDebugEnabled()){

logger.debug("发送第三方数据:" + com.nycent.iothub.utils.HexUtil.Bytes2HexString(buf));

}

sendToSystem(buf);

}

} catch (Exception e) {

logger.error(e.getMessage());

ctx.close();

}

}

 

public class HexUtil {

 

public HexUtil() {

}

 

/**

 * 将指定byte数组以16进制的形式打印到控制台

 */

public static void printHexString(String hint, byte[] b) {

System.out.print("=========》》》"+hint);

System.out.print("======printHexString===》》》");

for (int i = 0; i < b.length; i++) {

String hex = Integer.toHexString(b[i] & 0xFF);

if (hex.length() == 1) {

hex = '0' + hex;

}

System.out.print(hex.toUpperCase() + " ");

}

System.out.println("");

}

 

/**

 *

 */

public static String Bytes2HexString(byte[] b) {

System.out.print("======Bytes2HexString===");

String ret = "";

for (int i = 0; i < b.length; i++) {

String hex = Integer.toHexString(b[i] & 0xFF);

if (hex.length() == 1) {

hex = '0' + hex;

}

ret += hex.toUpperCase() + "  ";

}

return ret;

}

 

/**

 * 将两个ASCII字符合成一个字节; 如:"EF"> 0xEF

 */

public static byte uniteBytes(byte src0, byte src1) {

byte _b0 = Byte.decode("0x" + new String(new byte[] { src0 }))

.byteValue();

_b0 = (byte) (_b0 << 4);

byte _b1 = Byte.decode("0x" + new String(new byte[] { src1 }))

.byteValue();

byte ret = (byte) (_b0 ^ _b1);

return ret;

}

/**

 * 将指定字符串src,以每两个字符分割转换为16进制形式 如:"2B44EFD9" > byte[]{0x2B, 0x44, 0xEF,

 * 0xD9}

 */

public static byte[] HexString2Bytes(String src) {

int nLen = src.length();

byte[] ret = new byte[nLen/2];

byte[] tmp = src.getBytes();

for (int i = 0; i < nLen/2; i++) {

ret[i] = uniteBytes(tmp[i * 2], tmp[i * 2 + 1]);

}

return ret;

}

}

/*

 * 计算Map对象的个数

 */

public class MapSize<K, V> {

/*

 * 计算值重复个数

 */

public int mapRepeatSize(Map<K, V> map, V value) {

if(map.size() < 1){

return 0;

}

int count = 0;

for(Map.Entry<K, V> entry : map.entrySet()){

if(entry.getValue() == value)count++;

}

return count;

}

}

 整个项目的核心代码所有在这里了,因为整个项目无法所有放置在这里。若是有须要源码的朋友能够给我留言,我双手奉上。

创做不易,如需转载请注明出处。