基于Mina的配置中心(三)

基于Mina的配置中心(三)

在第二章里咱们已经自定义了包MessagePack。接下来咱们要定义编码器和解码器。java

  • 编码器: 把 java对象转为二进制编码,由于在网络中传输的是二进制数据。
  • 解码器:把二进制数据转为 java对象,也就是编码的逆向过程。

编码解码器工厂 MessageProtocolCodecFactory

首先咱们要自定义一个编码器工厂,就像TextLineCodecFactory同样。因为这个写法是固定的,因此就不放代码了,具体能够到GitHub查看源代码。git

编码器 MessageProtocolEncoder

这个东西的写法也是固定的。惟一不一样的地方就是放置数据。github

@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {  MessagePack pack = (MessagePack) message;  //设置缓冲区大小,并自动增加  IoBuffer ioBuffer = IoBuffer.allocate(pack.getLen()).setAutoExpand(true);  log.info("MessageProtocolEncoder_encode_length:{}", pack.getLen());  //放置长度  ioBuffer.putInt(pack.getLen());  //放置模块代码  ioBuffer.putInt(pack.getModule());  if (StringUtils.isNotBlank(pack.getBody())) {  log.info("MessageProtocolEncoder_encode_length:{}", pack.getBody().getBytes().length);  //放置字节数组  ioBuffer.putString(pack.getBody(), charset.newEncoder());  }  ioBuffer.flip();  out.write(ioBuffer); } 复制代码

咱们把从MessagePack中取出的数据,按照顺序放到IoBuffer中,而后使用out.write(ioBuffer);把消息写出去,实际上是把二进制数据存储到了一个队列中Queue<Object> messageQueueweb

实际上是在这里把消息发出去的org.apache.mina.filter.codec.ProtocolCodecFilter#filterWriteapache

解码器 MessageProtocolDecoder

其实解码器会麻烦一点,咱们脑壳里要有这个包的模型,包的开头是length(总长度),而后是module(模块代码),最后是Json字符串(Message),在解码时,还要判断一下是不是完整的包。json

像下面这样:数组

@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  // 包头的长度  // 拆包时,若是可读数据的长度小于包头的长度,就不进行读取  if (in.remaining() < MessagePack.PACK_HEAD_LEN) {  return false;  } else {  //标记当前position,以便后继的reset操做能恢复position位置  in.mark();  // 获取总长度  int length = in.getInt();  //获取模块代码  int module = in.getInt();  log.info("CustomProtocolDecoder_doDecode_length:{}, module:{}", length, module);  // 若是可读取数据的长度 小于 总长度 - 包头的长度 ,则结束拆包,等待下一次  if (in.remaining() < (length - MessagePack.PACK_HEAD_LEN)) {  in.reset();  return false;  } else {  //重置回复position位置到操做前 并读取一条完整记录  in.reset();  byte[] bytes = new byte[length];  // 获取长度4个字节、模块4个字节、内容,即获取完整消息  in.get(bytes, 0, length);  String content = new String(bytes, MessagePack.PACK_HEAD_LEN, length - MessagePack.PACK_HEAD_LEN, charset);  // 封装为自定义的java对象  MessagePack pack = new MessagePack(module, content);  out.write(pack);  // 若是读取一条记录后,还存在数据(粘包),则再次进行调用  return in.remaining() > 0;  }  } } 复制代码

MessageProtocolCodecFactory中,引入这两个类。编码解码器完成,如今须要把它配置到Mina的配置类中。服务器

MinaServerConfig 中添加下面代码。网络

/**  * 编解码器filter  */ @Bean public ProtocolCodecFilter protocolCodecFilter() {  return new ProtocolCodecFilter(new MessageProtocolCodecFactory()); } 复制代码

心跳检测

心跳检测简单点说就是客户端每隔一段时间,向服务器发送一个消息,也就是心跳包,让服务器知道链接状态没有问题,客户端正常在线。若是没有发送,在必定时间内超过指定次数,服务器会认为客户端掉线了,为了节省资源,会关闭链接。session

心跳检测通常有下面几种类型:

  1. 活跃型: 小心跳请求包被接受到后,当即发出心跳反馈。
  2. 半活跃型:发送心跳请求,不在意有没有心跳反馈。可是接收到心跳请求后,也会当即发出心跳反馈。
  3. 聋子型:主动发送心跳请求,不想发送任何心跳反馈,可是接收到心跳请求后,也会当即发出心跳反馈。
  4. 持续监听型:既不想发送心跳请求也不想发送心跳反馈。

这里咱们使用被动型,服务器接受客户端心跳请求,当在规定时间内没有收到时 将客户端链接关闭。

package com.lww.mina.filter;
 import com.alibaba.fastjson.JSONObject; import com.lww.mina.protocol.MessagePack; import com.lww.mina.util.Const; import lombok.extern.slf4j.Slf4j; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;  /**  * 被动型心跳机制  *  * @author lww  * @date 2020-07-06 22:40  */ @Slf4j public class ServerKeepAliveFactoryImpl implements KeepAliveMessageFactory {   /**  * 用来判断接收到的消息是否是一个心跳请求包,是就返回true[接收端使用]  */  @Override  public boolean isRequest(IoSession session, Object message) {  if (message instanceof MessagePack) {  MessagePack pack = (MessagePack) message;  if (pack.getModule() == Const.HEART_BEAT) {  log.info("收到 心跳请求 ServerKeepAliveFactoryImpl_isRequest_pack:{}", JSONObject.toJSONString(message));  return true;  }  }  return false;  }   /**  * 用来判断接收到的消息是否是一个心跳回复包,是就返回true[发送端使用]  */  @Override  public boolean isResponse(IoSession session, Object message) {  return false;  }   /**  * 在须要发送心跳时,用来获取一个心跳请求包[发送端使用]  */  @Override  public Object getRequest(IoSession session) {  return null;  }   /**  * 在须要回复心跳时,用来获取一个心跳回复包[接收端使用]  */  @Override  public Object getResponse(IoSession session, Object message) {  MessagePack pack = (MessagePack) message;  // 将超时次数置为0  session.setAttribute(Const.TIME_OUT_KEY, 0);  log.info("响应 心跳请求 ServerKeepAliveFactoryImpl_getResponse_request:{}", JSONObject.toJSONString(message));  return new MessagePack(Const.HEART_BEAT, "heart");  } } 复制代码

而后一样把心跳检测也配置到 MinaServerConfig 中。

/**  * 心跳检测  */ @Bean public ServerKeepAliveFactoryImpl keepAliveFactoryImpl() {  return new ServerKeepAliveFactoryImpl(); }  /**  * 心跳filter  */ @Bean public KeepAliveFilter keepAliveFilter(ServerKeepAliveFactoryImpl keepAliveFactory) {  // 注入心跳工厂,读写空闲  KeepAliveFilter filter = new KeepAliveFilter(keepAliveFactory, IdleStatus.BOTH_IDLE);  // 设置是否forward到下一个filter  filter.setForwardEvent(true);  // 设置心跳频率 5秒一次  filter.setRequestInterval(Const.HEART_BEAT_RATE);  return filter; } 复制代码

最后将过滤器注入到mina的链式管理器中,还有开启minaserver服务,并设置对应的参数。

/**  * 将过滤器注入到mina的链式管理器中  */ @Bean public DefaultIoFilterChainBuilder defaultIoFilterChainBuilder(ExecutorFilter executorFilter,  LoggingFilter loggingFilter, ProtocolCodecFilter protocolCodecFilter, KeepAliveFilter keepAliveFilter) {  DefaultIoFilterChainBuilder chainBuilder = new DefaultIoFilterChainBuilder();  Map<String, IoFilter> filters = new LinkedHashMap<>();  //多线程过滤器  filters.put("executor", executorFilter);  //日志  filters.put("logger", loggingFilter);  //编码 解码  filters.put("codec", protocolCodecFilter);  //心跳  filters.put("keepAliveFilter", keepAliveFilter);  chainBuilder.setFilters(filters);  return chainBuilder; }  /**  * 开启mina的server服务,并设置对应的参数  */ @Bean public IoAcceptor ioAcceptor(DefaultIoFilterChainBuilder filterChainBuilder) throws IOException {  IoAcceptor acceptor = new NioSocketAcceptor();  //设置缓冲区大小  acceptor.getSessionConfig().setReadBufferSize(config.getReadBufferSize());  //设置空闲状态时间,10秒没操做就进入空闲状态  acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, config.getIdelTimeOut());  //过滤器链  acceptor.setFilterChainBuilder(filterChainBuilder);  //处理器 这个 handler 处理全部的链接事件  acceptor.setHandler(new MinaServerHandler());  //绑定地址  acceptor.bind(new InetSocketAddress(config.getAddress(), config.getPort()));  return acceptor; } 复制代码

这里有一个问题:

//设置空闲状态时间,10秒没操做就进入空闲状态
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, config.getIdelTimeOut()); 复制代码

这个空闲时间是没有生效的,由于使用了心跳检测,空闲状态时间就是心跳检测的时间,因此也就是5秒。

配置类已经基本完成了,还有一个MinaServerHandler,这个handler就是处理客户端消息的处理器。
能够先建立出来,这样配置类不会报错。

package com.lww.mina.handler;
 import org.apache.mina.core.service.IoHandlerAdapter;  /**  * 处理客户端发送的消息  *  * @author lww  * @date 2020-07-06 22:53  */ public class MinaServerHandler extends IoHandlerAdapter {  } 复制代码

最后结构图:

总结

配置类基本完成了,咱们这样配置了编码解码、心跳检测后,Mina会自动调用,是否是简单了不少?固然还剩下一个handler,已经粘了太多代码了,第四章再继续吧。

第四章会完成handler,还有Session管理,还有当配置更新时,推到客户端。完成了这些,基本上Server端就差很少完成了,而后会写Client端,Client才是含金量更高的东西。敬请期待!

本次的代码没有所有粘出来,有兴趣的能够去Github查看。

项目源码

欢迎你们关注个人公众号,共同窗习,一块儿进步。加油🤣

本文使用 mdnice 排版

相关文章
相关标签/搜索