Mina实现Socket通讯完整过程


title: Mina服务端客户端通讯
date: 2018-09-30 09:00:30
tags:编程

- [mina]
- [tcp]

categories:segmentfault

- [编程]

permalink: zxh

[TOC]缓存

前两章节已经完整的介绍了理论部分,今天咱们就利用这些理论来实现tcp协议的c/s 通讯。首先咱们简单回顾下以前的介绍,
在mina中咱们的客户端和服务端简直就是如出一辙,只是咱们用不一样适配器。可是他的数据处理流程是同样的。今天咱们就重点看看如何创建服务端、客户端
而且处理二者之间的消息通讯处理微信

服务端

服务端和客户端不一样的就是咱们建立的监听对象不一样而已,客户端发送消息到服务端,服务端须要经历过滤器的处理才能到达消息中心,可是在过滤器中咱们就须要将消息进行解码,而后才会到消息接收的地方处理咱们的业务。正常状况下咱们处理完消息须要对客户端进行回应。回应的时候也会经历过滤器中的编码逻辑,进行数据编码而后发送。信息发送到客户端咱们能够当作服务端的方向。也是须要进行编解码的。下面看看服务端的建立代码session

//建立监听对象
IoAcceptor acceptor = new NioSocketAcceptor();
TextLineCodecFactory textLineCodecFactory =
        new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(),
                LineDelimiter.WINDOWS.getValue());
//添加过滤器
acceptor.getFilterChain().addLast("logger",new LoggingFilter());
acceptor.getFilterChain().addLast("protocal",new ProtocolCodecFilter(
        textLineCodecFactory
));
//设置时间处理的handler
acceptor.setHandler(new ServerMessageHandler());
//设置读取数据缓存区的大小
acceptor.getSessionConfig().setReadBufferSize(Constaint.READSIZE);
//设置多久没有消息就进入空闲状态
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME);
//绑定端口
try {
    acceptor.bind(new InetSocketAddress(Constaint.REMOTE_PORT));
} catch (IOException e) {
    logger.error(String.format("bind %s error",Constaint.REMOTE_PORT));
    e.printStackTrace();
}
logger.info(String.format("bind %s success",Constaint.REMOTE_PORT));

客户端

//建立监听对象
IoConnector connector = new NioSocketConnector();
TextLineCodecFactory textLineCodecFactory =
        new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(),
                LineDelimiter.WINDOWS.getValue());
//添加过滤器
//日志过滤器 。  sltf日志设置
connector.getFilterChain().addLast("logger",new LoggingFilter());
//在这个过滤器中提供了编解码,这里的编码是以信息中已\r\n结尾算是一条信息 
connector.getFilterChain().addLast("protocal",new ProtocolCodecFilter(
        new SocketFactory()
));
//设置时间处理的handler , 提供session生命周期的监听函数,消息接受,发送的函数
connector.setHandler(new ClientMessageHandler());
//设置读取数据缓存区的大小
connector.getSessionConfig().setReadBufferSize(Constaint.READSIZE);
//设置多久没有消息就进入空闲状态
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME);
ConnectFuture future = connector.connect(new InetSocketAddress(Constaint.REMOTE_IP,Constaint.REMOTE_PORT));
//是异步处理,这里不会形成阻塞
future.addListener(new IoFutureListener<IoFuture>() {
    @Override
    public void operationComplete(IoFuture ioFuture) {
        logger.info("链接准备完成");
        IoSession session = ioFuture.getSession();

    }
});

通讯

  • 其实上面服务端,客户端两边建立好就应经在通讯了,在上面建立的时候咱们发现。建立的时候须要指定消息处理器(IoHandlerAdapter) , 这个在IoService中会排在IoFilter以后执行。在过滤器执行以后咱们就会调用咱们的消息处理器。
private static Logger logger = LogManager.getLogger(ServerMessageHandler.class);
    public void sessionCreated(IoSession session) throws Exception {
        super.sessionCreated(session);
        logger.info("sessionCreated");
    }

    public void sessionOpened(IoSession session) throws Exception {
        super.sessionOpened(session);
        try {
            IoBuffer buffer = IoBuffer.allocate(30);
            buffer.clear();
            buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder());
            buffer.flip();
            session.write(buffer);
        } catch (Exception e) {
            logger.error(e.toString());
        }
        logger.info("sessionOpened");
    }

    public void sessionClosed(IoSession session) throws Exception {
        super.sessionClosed(session);
        logger.info("sessionClosed");
    }

    public void sessionIdle(IoSession session, IdleStatus idleStatus) throws Exception {
        super.sessionIdle(session,idleStatus);
        try {
            IoBuffer buffer = IoBuffer.allocate(30);
            buffer.clear();
            buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder());
            buffer.flip();
            session.write(buffer);
        } catch (Exception e) {
            logger.error(e.toString());
        }
        //        logger.info("sessionIdle");
    }

    public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception {
        logger.info("exceptionCaught");
        throwable.printStackTrace();
    }

    public void messageReceived(IoSession session, Object message) throws Exception {
        super.messageReceived(session, message);
        String info = message.toString();
        Date date = new Date(System.currentTimeMillis());
        SimpleDateFormat sdf = new  SimpleDateFormat("yy-MM-dd HH:mm:ss");
        String time = sdf.format(date);
        session.write(time);
        System.out.println("接收到的消息:"+info);
    }

    public void messageSent(IoSession session, Object message) throws Exception {
        super.messageSent(session, message);
        logger.info("messageSent");
    }
  • 这里消息处理器,提供了几个时刻能够控制,好比session建立、销毁的时候执行的地方。消息接收的地方,消息发送成功的地方。这些控制力度能够根据咱们的须要进行适度的复写。

自定义工厂编解码

  • 工厂是提供编解码的方法。这个工厂是加载在ProtocolCodecFilter这个过滤器中的。咱们也能够自定义过滤器,在自定义的过滤器中咱们也能够加载咱们自定义的工厂,实现编解码。咱们在编解码的地方,就能够加入咱们的业务代码。好比解码经过约定的协议方式读取到内容后经过ProtocolDecoderOutput 将消息写出去就能够在咱们的IoHandlerAdapter的messageReceived方法中获取到消息。而后业务书写。这样作到代码的解耦。
public class SocketFactory  implements ProtocolCodecFactory {
    private MessageDecoder decoder;
    private MessageEncoder encoder;

    public SocketFactory() {
        decoder = new MessageDecoder();
        encoder = new MessageEncoder();
    }

    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return this.decoder;
    }

    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return this.encoder;
    }
}

解码器

  • 上面的工厂就是提供编解码的。和咱们生活中同样工厂提供功能,可是实际并非工厂作的,工厂可能只代理功能,仅仅是个加工厂而已。mina通讯也是如此。真正编解码的并非工厂执行的,本节将揭露解码者CumulativeProtocolDecoder
  • 解码器写好以后只须要在上面自定义工厂中建立就行了。至于自定义编码器只须要继承CumulativeProtocolDecoder这个类就行了。并且复写doDecode方法就行了。这个方法的返回值是boolean类型。返回值不一样表明意义不一。这里须要重点理清楚异步

    • true: 返回true表示你已经从CumulativeProtocolDecoder的消息中消费了信息,在编码器中返回true以前也应该调用ProtocolDecoderOutput 的wirte将消息发布到IoHandAdaptor中进行业务处理。可是这里会出现其余状况,应为咱们服务端客户端是长链接因此在咱们消息中消息是不断发过来的,咱们缓存中的消息多是完整一条消息,也可能不够一整条消息,也多是一整条多了一点,
      一、若是不是一条完整(半包)的那么咱们返回falsed等待客户端继续发送
      二、若是正好是一整条,那么咱们接受到以后返回true的时候咱们缓存中就没有数据了,在CumulativeProtocolDecoder会中止对解码中doDecode的调用了,这种状况不会出现意外
      三、数据比一条完整信息(粘包)多,那么咱们处理到一条信息后也须要返回true,可是CumulativeProtocolDecoder会将剩余的缓存继续拼装,剩余消息就至关于内部进行了第二次解码。若是不过那么至关于上面第一种状况

      记住三种状况 半包 、 正常 、 粘包tcp

    • false: 返回false就是缓存中的数据不够咱们一整条消息,须要继续等待客户端的消息。CumulativeProtocolDecoder中的缓存机制会不断的将客户端发过来的数据拼接到缓存中
public class MessageDecoder extends CumulativeProtocolDecoder {
    /**
     * 此方法return true : 表示父类中CumulativeProtocolDecoder会不断的调用此方法进行消息的消费
     *       return false: 表示消息已经消费彻底了,缓存中就算有数据也不会再消费了。等待再次客户端
     *       发送消息时会触发消息发送接口,此时会将新旧消息拼接再一块儿进行处理
     * @param ioSession
     * @param ioBuffer
     * @param protocolDecoderOutput
     * @return
     * @throws Exception
     */
    @Override
    protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
        IoBuffer buffer = IoBuffer.allocate(10);
        while (ioBuffer.hasRemaining()) {
            if (ioBuffer.remaining()<3) {
                //继续接受
                return false;
            }
            //获取三个字节
            int oldLimit = ioBuffer.limit();
            ioBuffer.limit(ioBuffer.position()+3);
            String text = ioBuffer.getString(Charset.forName("UTF-8").newDecoder());
            protocolDecoderOutput.write(text);
            ioBuffer.limit(oldLimit);
            if (ioBuffer.hasRemaining()) {
                return true;
            }
        }
        return false;
    }
}

编码器

  • 编码器相对解码器简单不少,编码器就是加入咱们的协议,正常状况就是咱们业务代码中消息是一个Java实体,咱们须要作的是将Java实体按照协议转换成IoBuffer进行发送。可是咱们mina中发送消息是经过IoSession中write方法发送的。咱们查看源码发如今IoSession.write(Object o),发送的若是是IoBuffer那么就不通过咱们的编码器,不然会通过咱们编码器进行编码最终将转换后的IoBuffer发送出去。

public class MessageEncoder extends ProtocolEncoderAdapter {
    @Override
    public void encode(IoSession ioSession, Object o, ProtocolEncoderOutput protocolEncoderOutput) throws Exception {
        //TODO  根据协议编码
        //组装好以后  ioSession.write(IoBuffer)写出
        System.out.println(o);
    }
}

总结

加入战队ide

<span id="addMe">加入战队</span>

微信公众号

微信公众号

相关文章
相关标签/搜索