使用Netty也有一段时间了,对Netty也有个大概的了解。回想起刚刚使用Netty的时候踩了不少坑,不少Netty的组件也不会使用,或者说用得不够好,不能称之为"最佳实践"。此文的目的即是带领你们使用Netty构建出一个完整的项目,将本身在实际开发经验中整理出的一些最佳实践分享出来,固然这些最佳实践不必定就是真正的最佳实践,只是本身在开发中整理的,或者参考其余优秀的代码一块儿整理出的,你们若是有什么不一样意见或者更好的实践,欢迎你们在评论区分享,你们一块儿学习一块儿进步!php
先奉上完整版代码 zpsw/jt808-nettygit
开发环境:IDEA+JDK1.8+Mavengithub
使用框架: Netty + Spring Boot + Spring Data JPA数据库
其余工具: lombok(没用过的同窗建议了解一下,很方便)bash
3.1.认识JT808协议
3.2.构建编/解码器
3.3.构建业务Handler
3.4.Channel的高效管理方式
3.5.一些改进
复制代码
下面简单介绍一下JT808协议的格式说明,彻底版在JT808协议技术规范.pdf网络
其中消息体属性中咱们先只关注消息体长度,不关注其余,分包状况先不考虑。数据结构
根据消息头和消息体咱们能够抽象出一个最基本的数据结构并发
@Data
public class DataPacket {
protected Header header = new Header(); //消息头
protected ByteBuf byteBuf; //消息流
@Data
public static class Header {
private short msgId;// 消息ID 2字节
private short msgBodyProps;//消息体属性 2字节
private String terminalPhone; // 终端手机号 6字节
private short flowId;// 流水号 2字节
//获取包体长度
public short getMsgBodyLength() {
return (short) (msgBodyProps & 0x3ff);
}
//获取加密类型 3bits
public byte getEncryptionType() {
return (byte) ((msgBodyProps & 0x1c00) >> 10);
}
//是否分包
public boolean hasSubPackage() {
return ((msgBodyProps & 0x2000) >> 13) == 1;
}
}
}
复制代码
咱们能够先将Header解析出来,而后由子类本身解析包体框架
public void parse() {
try{
this.parseHead();
//验证包体长度
if (this.header.getMsgBodyLength() != this.byteBuf.readableBytes()) {
throw new RuntimeException("包体长度有误");
}
this.parseBody();//由子类重写
}finally {
ReferenceCountUtil.safeRelease(this.byteBuf);//注意释放
}
}
protected void parseHead() {
header.setMsgId(byteBuf.readShort());
header.setMsgBodyProps(byteBuf.readShort());
header.setTerminalPhone(BCD.BCDtoString(readBytes(6)));
header.setFlowId(byteBuf.readShort());
}
protected void parseBody() {
}
复制代码
其中readByte(int length)方法是对ByteBuf.readBytes(byte[] dst)的一个简单封装ide
public byte[] readBytes(int length) {
byte[] bytes = new byte[length];
this.byteBuf.readBytes(bytes);
return bytes;
}
复制代码
由于没有在Netty官方的Api中找到相似的方法,因此本身定义了一个
另外定义一个方法用于响应重写。
响应重写:
public ByteBuf toByteBufMsg() {
ByteBuf bb = ByteBufAllocator.DEFAULT.heapBuffer();
bb.writeInt(0);//先占4字节用来写msgId和msgBodyProps
bb.writeBytes(BCD.toBcdBytes(StringUtils.leftPad(this.header.getTerminalPhone(), 12, "0")));
bb.writeShort(this.header.getFlowId());
return bb;
}
**
"最佳实践":尽可能使用内存池分配ByteBuf,效率相比非池化Unpooled.buffer()高不少,可是得注意释放,不然会内存泄漏
在ChannelPipeLine中咱们可使用ctx.alloc()或者channel.alloc()获取Netty默认内存分配器,
其余地方不必定要创建独有的内存分配器,能够经过ByteBufAllocator.DEFAULT获取,跟前面获取的是同一个(不特别配置的话)。
**
复制代码
这里当咱们将响应转化为ByteBuf写出去的时候,此时并不知道消息体的具体长度,全部此时咱们先占住位置,回头再来写。
全部的消息都继承自DataPacket,咱们挑出一个字段相对较多的-》 位置上报消息
而后咱们创建位置上报消息的数据结构,先看位置消息的格式
创建结构以下:
@Data
public class LocationMsg extends DataPacket {
private int alarm; //告警信息 4字节
private int statusField;//状态 4字节
private float latitude;//纬度 4字节
private float longitude;//经度 4字节
private short elevation;//海拔高度 2字节
private short speed; //速度 2字节
private short direction; //方向 2字节
private String time; //时间 6字节BCD
public LocationMsg(ByteBuf byteBuf) {
super(byteBuf);
}
@Override
public void parseBody() {
ByteBuf bb = this.byteBuf;
this.setAlarm(bb.readInt());
this.setStatusField(bb.readInt());
this.setLatitude(bb.readUnsignedInt() * 1.0F / 1000000);
this.setLongitude(bb.readUnsignedInt() * 1.0F / 1000000);
this.setElevation(bb.readShort());
this.setSpeed(bb.readShort());
this.setDirection(bb.readShort());
this.setTime(BCD.toBcdTimeString(readBytes(6)));
}
}
复制代码
全部的消息若是没有本身的应答的话,须要默认应答,默认应答格式以下
@Data
public class CommonResp extends DataPacket {
private short replyFlowId; //应答流水号 2字节
private short replyId; //应答 ID 2字节
private byte result; //结果 1字节
public CommonResp() {
this.getHeader().setMsgId(JT808Const.SERVER_RESP_COMMON);
}
@Override
public ByteBuf toByteBufMsg() {
ByteBuf bb = super.toByteBufMsg();
bb.writeShort(replyFlowId);
bb.writeShort(replyId);
bb.writeByte(result);
return bb;
}
}
复制代码
前面协议能够看到,标识位为0x7e,因此咱们第一个解码器能够用Netty自带的DelimiterBasedFrameDecoder,其中的delimiters天然就是0x7e了。(Netty有不少自带的编解码器,建议先确认Netty自带的不能知足需求,再本身自定义)
通过DelimiterBasedFrameDecoder帮咱们截断以后,信息就到了咱们本身的解码器中了,咱们的目的是将ByteBuf转化为咱们前面定义的数据结构。 定义解码器
public class JT808Decoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
}
}
复制代码
第一步:转义还原,转义规则以下
0x7d 0x01 -> 0x7d
0x7d 0x02 -> 0x7e
public ByteBuf revert(byte[] raw) {
int len = raw.length;
ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(len);//DataPacket parse方法回收
for (int i = 0; i < len; i++) {
if (raw[i] == 0x7d && raw[i + 1] == 0x01) {
buf.writeByte(0x7d);
i++;
} else if (raw[i] == 0x7d && raw[i + 1] == 0x02) {
buf.writeByte(0x7e);
i++;
} else {
buf.writeByte(raw[i]);
}
}
return buf;
}
复制代码
第二步:校验
byte pkgCheckSum = escape.getByte(escape.writerIndex() - 1);
escape.writerIndex(escape.writerIndex() - 1);//排除校验码
byte calCheckSum = JT808Util.XorSumBytes(escape);
if (pkgCheckSum != calCheckSum) {
log.warn("校验码错误,pkgCheckSum:{},calCheckSum:{}", pkgCheckSum, calCheckSum);
ReferenceCountUtil.safeRelease(escape);//必定不要漏了释放
return null;
}
复制代码
第三步:解码
public DataPacket parse(ByteBuf bb) {
DataPacket packet = null;
short msgId = bb.getShort(bb.readerIndex());
switch (msgId) {
case TERNIMAL_MSG_HEARTBEAT:
packet = new HeartBeatMsg(bb);
break;
case TERNIMAL_MSG_LOCATION:
packet = new LocationMsg(bb);
break;
case TERNIMAL_MSG_REGISTER:
packet = new RegisterMsg(bb);
break;
case TERNIMAL_MSG_AUTH:
packet = new AuthMsg(bb);
break;
case TERNIMAL_MSG_LOGOUT:
packet = new LogOutMsg(bb);
break;
default:
packet = new DataPacket(bb);
break;
}
packet.parse();
return packet;
}
复制代码
switch里咱们尽可能将收到频率高的放在前面,避免过多的if判断
而后咱们将消息out.add(msg)就可让消息到咱们的业务Handler中了。
编码器须要讲咱们的DataPacket转化为ByteBuf,而后再转义发送出去。 定义编码器
public class JT808Encoder extends MessageToByteEncoder<DataPacket> {
protected void encode(ChannelHandlerContext ctx, DataPacket msg, ByteBuf out) throws Exception {
}
}
复制代码
第一步:转换
ByteBuf bb = msg.toByteBufMsg();
复制代码
还记得咱们DataPacket转换header时占用了4个字节等到后面覆盖吗
bb.markWriterIndex();//标记一下,先到前面去写覆盖的,而后回到标记写校验码
short bodyLen = (short) (bb.readableBytes() - 12);
short bodyProps = createDefaultMsgBodyProperty(bodyLen);
//覆盖占用的4字节
bb.writerIndex(0);
bb.writeShort(msg.getHeader().getMsgId());
bb.writeShort(bodyProps);
bb.resetWriterIndex();
bb.writeByte(JT808Util.XorSumBytes(bb));
复制代码
第二步:转义
public ByteBuf escape(ByteBuf raw) {
int len = raw.readableBytes();
ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer(len + 12);//假设最多有12个须要转义
buf.writeByte(JT808Const.PKG_DELIMITER);
while (len > 0) {
byte b = raw.readByte();
if (b == 0x7e) {
buf.writeByte(0x7d);
buf.writeByte(0x02);
} else if (b == 0x7d) {
buf.writeByte(0x7d);
buf.writeByte(0x01);
} else {
buf.writeByte(b);
}
len--;
}
ReferenceCountUtil.safeRelease(raw);
buf.writeByte(JT808Const.PKG_DELIMITER);
return buf;
}
**
"最佳实践":咱们这里返回ByteBuf是写出去的,因此采用directBuffer效率更高
**
复制代码
转义完成,就直接发送出去了,固然不能忘了释放。
ByteBuf escape = escape(bb);
out.writeBytes(escape);
ReferenceCountUtil.safeRelease(escape);
复制代码
解码器中咱们返回的是DataPacket对象,因此编写Handler此时咱们有两种选择:
一种是定义一个Handler接收DataPacket而后判断具体类型,以下图
@Component
@ChannelHandler.Sharable
public class JT808ServerHandler extends SimpleChannelInboundHandler<DataPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, DataPacket msg) throws Exception {
log.debug(msg.toString());
if (msg instanceof AuthMsg || msg instanceof HeartBeatMsg || msg instanceof LocationMsg || msg instanceof LogOutMsg) {
CommonResp resp = CommonResp.success(msg, getFlowId(ctx));
ctx.writeAndFlush(resp);
} else if (msg instanceof RegisterMsg) {
RegisterResp resp = RegisterResp.success(msg, getFlowId(ctx));
ctx.writeAndFlush(resp);
}
}
}
复制代码
另外一种是每一个DataPacket的子类型都定义一个Handler,以下图
public class LocationMsgHandler extends SimpleChannelInboundHandler<LocationMsg>
public class HeartBeatMsgHandler extends SimpleChannelInboundHandler<HeartBeatMsg>
public class RegisterMsgHandler extends SimpleChannelInboundHandler<LogOutMsg>
复制代码
这里我选择第二种,一个缘由是由于代码风格好,另外一个缘由后面会具体说明。
这里列举一个LocationMsgHandler的详细代码,将位置保存到数据库而后回复设备
@Slf4j
@Component
@ChannelHandler.Sharable
public class LocationMsgHandler extends BaseHandler<LocationMsg> {
@Autowired
private LocationRepository locationRespository;
@Override
protected void channelRead0(ChannelHandlerContext ctx, LocationMsg msg) throws Exception {
log.debug(msg.toString());
locationRespository.save(LocationEntity.parseFromLocationMsg(msg));
CommonResp resp = CommonResp.success(msg, getSerialNumber(ctx.channel()));
write(ctx, resp);
}
}
复制代码
BaseHandler继承SimpleChannelInboundHandler ,里面定义了一些通用的方法,例如getSerialNumber()获取应答的流水号
private static final AttributeKey<Short> SERIAL_NUMBER = AttributeKey.newInstance("serialNumber");
public short getSerialNumber(Channel channel){
Attribute<Short> flowIdAttr = channel.attr(SERIAL_NUMBER);
Short flowId = flowIdAttr.get();
if (flowId == null) {
flowId = 0;
} else {
flowId++;
}
flowIdAttr.set(flowId);
return flowId;
}
复制代码
咱们将流水号存入Channel内部,方便维护。
假设如今出现了一个需求,咱们须要找到一个特定的链接发送一条消息,在咱们这个项目里,特定指的是根据header中的手机号找到链接并发送消息。咱们能够本身维护一个Map用来存放全部Channel,可是这样就浪费了Netty自带的DefaultChannelGroup提供的一系列方法了。因此咱们改进一下,定义一个ChannelManager,内部采用DefaultChannelGroup维护Channel,本身维护手机号->ChannelId的映射关系。
@Component
public class ChannelManager {
private static final AttributeKey<String> TERMINAL_PHONE = AttributeKey.newInstance("terminalPhone");
private ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private Map<String, ChannelId> channelIdMap = new ConcurrentHashMap<>();
private ChannelFutureListener remover = future ->
channelIdMap.remove(future.channel().attr(TERMINAL_PHONE).get());
public boolean add(String terminalPhone, Channel channel) {
boolean added = channelGroup.add(channel);
if (added) {
channel.attr(TERMINAL_PHONE).set(terminalPhone);
channel.closeFuture().addListener(remover);
channelIdMap.put(terminalPhone, channel.id());
}
return added;
}
public boolean remove(String terminalPhone) {
return channelGroup.remove(channelIdMap.remove(terminalPhone));
}
public Channel get(String terminalPhone) {
return channelGroup.find(channelIdMap.get(terminalPhone));
}
public ChannelGroup getChannelGroup() {
return channelGroup;
}
}
复制代码
咱们定义了一个ChannelFutureListener,当channel关闭时,会执行这个回调,帮助咱们维护本身的channelIdMap不至于太过臃肿,提高效率,DefaultChannelGroup中也是如此,因此没必要担忧Channel都不存在了 还占用着内存这种状况。另外咱们能够将DefaultChannelGroup提供出去,以便某些时候进行广播。
1.咱们的LocationMsgHandler中出现了数据库操做
locationRespository.save(LocationEntity.parseFromLocationMsg(msg));
复制代码
然而在Netty中,默认状况下Handler由Reactor线程驱动,一旦阻塞就会大大下降并发能力,因此咱们定义一个专门的EventExecutorGroup(不认识的话能够先理解为线程池),用来驱动耗时的Handler,只要在初始化Channel时指定便可。前面所说的每一个DataPacket子类型定义一个Handler的另外一个好处就体如今这里,咱们可让那些耗时的Handler用专门的业务线程池去驱动,而不耗时的Handler由默认的Reactor线程驱动,增长了灵活性。
pipeline.addLast(heartBeatMsgHandler);
pipeline.addLast(businessGroup,locationMsgHandler);//由于locationMsgHandler中涉及到数据库操做,因此放入businessGroup
pipeline.addLast(authMsgHandler);
pipeline.addLast(registerMsgHandler);
pipeline.addLast(logOutMsgHandler);
复制代码
另外如解码器parse()中的switch里的case顺序同样,咱们这里也能够利用增长Handler的顺序,节省一些if判断。
2.接上面的,如今咱们LocationMsgHandler由businessGroup驱动了,然而写响应的时候仍是会移交给Reactor线程,因此为了减小一些判断提高略微的性能,咱们能够将write(ctx, resp);改成
workerGroup.execute(() -> write(ctx, resp));
复制代码
其中的workerGroup正是启动引导中的,咱们借助Spring把它单独定义成了bean,用的时候直接注解引入便可
serverBootstrap.group(bossGroup, workerGroup)
复制代码
3.借助Spring的力量咱们能够将几乎全部的组件定义成单例,提高了略微的性能,除了编码器和解码器,由于他们有一些属性须要维护,不能定义为单例。
一直看到这里的朋友感谢大家的耐心,这是我第一次写文章,有错误的地方还请多多包涵。
另外将彻底版代码奉上 zpsw/jt808-netty
这也是我的开源的第一个项目,若是对你有帮助,给个Star将不胜感激。
附上一些其余的Netty最佳实践(转自best practice in netty):
还有一些英文的就不贴过来了
另外给新手安利一个网络调试工具NetAssist网络调试助手
再见