本文为该系列的第三篇文章,设计需求为:服务端程序和众多客户端程序经过 TCP 协议进行通讯,通讯双方需通讯的消息种类众多。上一篇文章以一个具体的需求为例,探讨了指定的 Java 消息对象与其相应的二进制数据帧相互转换的方法。本文仍以该实例为例,探讨该自定义通讯协议的具体工做流程,以及如何以注册的形式灵活插拔通讯消息对象。java
经过该系列的第二篇文章可知,各个消息对象的编解码器类均拥有一个静态工厂方法,用于手动传入功能位及功能文字描述,进而生成包含这些参数的编解码器。如此设计,使得全部消息的功能位和文字描述均可以统一管理,下降维护成本。缓存
根据上述需求,可经过 Map 容器管理全部的编解码器,有以下优势:并发
通讯消息对象注册方法以下所示:工具
/** * 消息对象的注册 * * @param toolkit 消息对象编解码器容器的工具类 */
private void initialMsg() {
saveNormalMsgCodec(new MsgCodecDeviceUnlock(0x10, 0x11, "客户端解锁"));
saveNormalMsgCodec(new MsgCodecDeviceClear(0x10, 0x13, "客户端初始化"));
saveNormalMsgCodec(new MsgCodecDeviceId(0x10, 0x1B, "客户端ID设置"));
saveNormalMsgCodec(new MsgCodecEmployeeName(0x10, 0x1C, "客户端别名设置"));
... ...
}
/** * 将普通消息对象及其回复消息对象的编解码器均保存到 HashMap 中 * * @param baseMsgCodec 特定的消息对象编解码器 */
private void saveNormalMsgCodec(BaseMsgCodec baseMsgCodec) {
saveSpecialMsgCodec(baseMsgCodec);
baseMsgCodec = new MsgCodecReplyNormal(baseMsgCodec.getMajorMsgId() + 0x10, baseMsgCodec.getSubMsgId(), baseMsgCodec.getDetail());
saveSpecialMsgCodec(baseMsgCodec);
}
/** * 将消息对象的编解码器保存到 HashMap 中 * * @param baseMsgCodec 特定的编解码器 */
private void saveSpecialMsgCodec(BaseMsgCodec baseMsgCodec) {
HASH_MAP.put(figureFrameId(baseMsgCodec.getMajorMsgId(), baseMsgCodec.getSubMsgId()), baseMsgCodec);
}
复制代码
上述代码代表,若是有新的业务需求,须要增删「插拔」业务消息对象,只需在 initialMsg()
方法中,对相应编解码器的注册语句进行增删便可。优化
saveNormalMsgCodec(BaseMsgCodec)
方法能够同时注册特定业务消息对象及其通用回复消息对象,操做方法清晰、简洁。this
因此,在启动该 Java 程序时,只须要在启动过程当中,执行上述 initialMsg()
方法,便可完成全部业务消息对象的注册。编码
由该系列的第一篇文章可知,若是某二进制数据帧所要传输的数据体部份内容不多,致使一个帧的大部分容量均被帧头占据,致使有效数据的占比很小,这就产生了巨大的浪费,故数据帧的数据体部分由子帧组成,同一类子帧都可被组装进同一个数据帧。如此作法,整个通讯链路的数据量会明显减小,IO 负担也会所以减轻。spa
该需求的实现原理以下所示:线程
/** * 启动一个Channel的定时任务,用于间隔指定的时间对消息队列进行轮询,并发送指定数据帧 * * @param deque 指定的消息发送队列 * @param channelId 指定 Channel 的序号 */
private void startMessageQueueTask(LinkedBlockingDeque<BaseMsg> deque, Integer channelId) {
executorService.scheduleWithFixedDelay(() -> {
try {
BaseMsg baseMsg = deque.take(); // 从队列中取出一个消息对象,队列为空时阻塞
Thread.sleep(AWAKE_TO_PROCESS_INTERVAL);// 等待极短的时间,保证队列中缓存尽量多的对象
Channel channel = touchChannel(channelId); // 获取指定的待发送的 Channel
List<ByteBuf> dataList = new ArrayList<>();// 子帧容器
ByteBuf data = baseMsg.subFrameEncode(channel.alloc().buffer());// 编码一个子帧
dataList.add(data);
touchNeedReplyMsg(baseMsg); // 对该子帧设置检错重发任务
int length = data.readableBytes();
int flag = baseMsg.combineFrameFlag(); // 获取消息对象标识
while (true) {
BaseMsg subMsg = deque.peek(); // 查看队列中的第一个消息对象
if (subMsg == null || subMsg.combineFrameFlag() != flag) {
break; // 消息对象标识不一样,即欲生成的主帧帧头不一样,不能组合进同一主帧
}
data = subMsg.subFrameEncode(channel.alloc().buffer());
if (length + data.readableBytes() > FrameSetting.MAX_DATA_LENGTH) {
break;
}
length += data.readableBytes();
dataList.add(data); // 组合进了同一主帧
deque.poll(); // 从队列中移除该消息对象
touchNeedReplyMsg(subMsg);
}
FrameMajorHeader frameHeader = new FrameMajorHeader(
baseMsg.getMajorMsgId(),
baseMsg.getGroupId(),
baseMsg.getDeviceId(),
length); // 生成主帧帧头消息对象
channel.writeAndFlush(new SendableMsgContainer(frameHeader, dataList)); // 送入Channel进行发送
} catch (InterruptedException e) {
logger.warn("消息队列定时发送任务被中断");
}
}, channelId, CommSetting.FRAME_SEND_INTERVAL, TimeUnit.MILLISECONDS);
}
复制代码
由代码可知,待发送的消息对象均被送入指定的发送队列进行缓存,某客户端相应的线程对队列进行操做,取出消息对象并进行编码、组装、发送等。固然,当客户端数量较多时,上述的线程实现方式可采用 Netty 的 NIO 方式进行优化,以下降系统开销。设计
由上述描述可知,欲发送一个消息对象,只需将该消息对象送入相应的发送队列便可。
因为每一个 Java 消息对象均内含相应编解码器的引用,故可直接对该消息对象进行编码操做,代码以下:
public abstract class BaseMsg implements Cloneable {
private final BaseMsgCodec msgCodec;
... ...
/** * 将 java 消息对象编码为 TCP 子帧 * * @param buffer 空白的 TCP 子帧的容器 * @return 保存有 TCP 子帧的容器 */
public ByteBuf subFrameEncode(ByteBuf buffer) {
return msgCodec.code(this, buffer);
}
}
复制代码
首先根据数据帧的帧头,便可解析出 FrameMajorHeader
对象,而后便可调用以下方法完成子帧的解析工做。实现原理文章开头已指出。
/** * TCP 帧解码为 Java 消息对象 * * @param head 主帧头 * @param subMsgId 子帧功能位 * @param data 子帧数据 * @return 已解码的 Java 对象 */
public BaseMsg decode(FrameMajorHeader head, int subMsgId, byte[] data) {
BaseMsgCodec msgCodec = MsgCodecToolkit.getMsgCodec(head.getMsgId(), subMsgId);
return msgCodec.decode(head.getGroupId(), head.getDeviceId(), data);
}
复制代码