基于Apache Mina实现的TCP长链接和短链接实例

一、前言

Apache MINA是Apache组织的一个优秀的项目。MINA是Multipurpose Infrastructure for NetworkApplications的缩写。它是一个网络应用程序框架,用来帮助用户很是方便地开发高性能和高可靠性的网络应用程序。在本文中介绍了如何经过Apache Mina2.0来实现TCP协议长链接和短链接应用。java

二、系统介绍

2.1系统框架

整个系统由两个服务端程序和两个客户端程序组成。分别实现TCP长链接和短链接通讯。express

系统业务逻辑是一个客户端与服务端创建长链接,一个客户端与服务端创建短链接。数据从短链接客户端通过服务端发送到长链接客户端,并从长链接客户端接收响应数据。当收到响应数据后断开链接。数组

系统架构图以下:网络

2.2处理流程

系统处理流程以下:session

1)       启动服务端程序,监听8001和8002端口。架构

2)       长链接客户端向服务端8002端口创建链接,服务端将链接对象保存到共享内存中。因为采用长链接方式,链接对象是惟一的。框架

3)       短链接客户端向服务端8001端口创建链接。创建链接后建立一个链接对象。socket

4)       短链接客户端链接成功后发送数据。服务端接收到数据后从共享内存中获得长链接方式的链接对象,使用此对象向长链接客户端发送数据。发送前将短链接对象设为长链接对象的属性值。ide

5)       长链接客户端接收到数据后返回响应数据。服务端从长链接对象的属性中取得短链接对象,经过此对象将响应数据发送给短链接客户端。性能

6)       短链接客户端收到响应数据后,关闭链接。

三、服务端程序

3.1长链接服务端

服务启动

public class MinaLongConnServer {

private static final int PORT = 8002;

   

    public void start()throws IOException{

       IoAcceptor acceptor = new NioSocketAcceptor();

 

       acceptor.getFilterChain().addLast("logger", new LoggingFilter());

       acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));

 

       acceptor.setHandler(new MinaLongConnServerHandler());

       acceptor.getSessionConfig().setReadBufferSize(2048);

       acceptor.bind(new InetSocketAddress(PORT));

       System.out.println("Listeningon port " + PORT);

    }

}

消息处理

public class MinaLongConnServerHandler extends IoHandlerAdapter {

    private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());

 

    @Override

    public void sessionOpened(IoSession session) {

       InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();

       String clientIp = remoteAddress.getAddress().getHostAddress();

       logger.info("LongConnect Server opened Session ID ="+String.valueOf(session.getId()));

       logger.info("接收来自客户端 :" + clientIp + "的链接.");

       Initialization init = Initialization.getInstance();

       HashMap<String, IoSession> clientMap =init.getClientMap();

       clientMap.put(clientIp, session);

    }

 

    @Override

    public void messageReceived(IoSession session, Object message) {

       logger.info("Messagereceived in the long connect server..");

       String expression = message.toString();

       logger.info("Message is:" + expression);

       IoSession shortConnSession =(IoSession) session.getAttribute("shortConnSession");

       logger.info("ShortConnect Server Session ID ="+String.valueOf(shortConnSession.getId()));

       shortConnSession.write(expression);

    }

 

    @Override

    public void sessionIdle(IoSession session, IdleStatus status) {

       logger.info("Disconnectingthe idle.");

       // disconnect an idle client

       session.close(true);

    }

 

    @Override

    public void exceptionCaught(IoSession session, Throwable cause) {

       // close the connection onexceptional situation

       logger.warn(cause.getMessage(), cause);

       session.close(true);

    }

}

3.2短链接服务端

服务启动

public class MinaShortConnServer {

    private static final int PORT = 8001;

   

    public void start()throws IOException{

       IoAcceptor acceptor = new NioSocketAcceptor();

 

       acceptor.getFilterChain().addLast("logger", new LoggingFilter());

       acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));

 

       acceptor.setHandler(new MinaShortConnServerHandler());

       acceptor.getSessionConfig().setReadBufferSize(2048);

       acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 3);

       acceptor.bind(new InetSocketAddress(PORT));

       System.out.println("Listeningon port " + PORT);

    }

}

消息处理

public class MinaShortConnServerHandler extends IoHandlerAdapter {

    private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());

 

    @Override

    public void sessionOpened(IoSession session) {

       InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();

       logger.info(remoteAddress.getAddress().getHostAddress());

       logger.info(String.valueOf(session.getId()));

    }

 

    @Override

    public void messageReceived(IoSession session, Object message) {

       logger.info("Messagereceived in the short connect server...");

       String expression = message.toString();

       Initialization init = Initialization.getInstance();

       HashMap<String, IoSession> clientMap =init.getClientMap();

       if (clientMap == null || clientMap.size() == 0) {

           session.write("error");

       } else {

           IoSession longConnSession = null;

           Iterator<String> iterator =clientMap.keySet().iterator();

           String key = "";

           while (iterator.hasNext()) {

              key = iterator.next();

              longConnSession = clientMap.get(key);

           }

           logger.info("ShortConnect Server Session ID :"+String.valueOf(session.getId()));

           logger.info("LongConnect Server Session ID :"+String.valueOf(longConnSession.getId()));

           longConnSession.setAttribute("shortConnSession",session);

           longConnSession.write(expression);

       }

    }

 

    @Override

    public void sessionIdle(IoSession session, IdleStatus status) {

       logger.info("Disconnectingthe idle.");

       // disconnect an idle client

       session.close(true);

    }

 

    @Override

    public void exceptionCaught(IoSession session, Throwable cause) {

       // close the connection onexceptional situation

       logger.warn(cause.getMessage(), cause);

       session.close(true);

    }

}

 

四、客户端程序

4.1长链接客户端

使用Java.NET.Socket来实现向服务端创建链接。Socket创建后一直保持链接,从服务端接收到数据包后直接将原文返回。

public class TcpKeepAliveClient {

    private String ip;

    private int port;

    private static Socket socket = null;

    private static int timeout = 50 * 1000;

 

    public TcpKeepAliveClient(String ip, int port) {

       this.ip = ip;

       this.port = port;

    }

 

    public void receiveAndSend() throws IOException {

       InputStream input = null;

       OutputStream output = null;

 

       try {

           if (socket == null ||socket.isClosed() || !socket.isConnected()) {

              socket = new Socket();

              InetSocketAddress addr = new InetSocketAddress(ip, port);

              socket.connect(addr, timeout);

              socket.setSoTimeout(timeout);

              System.out.println("TcpKeepAliveClientnew ");

           }

 

           input = socket.getInputStream();

           output = socket.getOutputStream();

 

           // read body

           byte[] receiveBytes = {};// 收到的包字节数组

           while (true) {

              if (input.available() > 0) {

                  receiveBytes = new byte[input.available()];

                  input.read(receiveBytes);

 

                  // send

                  System.out.println("TcpKeepAliveClientsend date :" +new String(receiveBytes));

                  output.write(receiveBytes, 0, receiveBytes.length);

                  output.flush();

              }

           }

 

       } catch (Exception e) {

           e.printStackTrace();

           System.out.println("TcpClientnew socket error");

       }

    }

 

    public static void main(String[] args) throws Exception {

       TcpKeepAliveClient client = new TcpKeepAliveClient("127.0.0.1", 8002);

       client.receiveAndSend();

    }

 

}

4.2短链接客户端

服务启动

public class MinaShortClient {

    private static final int PORT = 8001;

 

    public static void main(String[] args) throws IOException,InterruptedException {

       IoConnector connector = new NioSocketConnector();

       connector.getSessionConfig().setReadBufferSize(2048);

 

       connector.getFilterChain().addLast("logger", new LoggingFilter());

       connector.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));

 

       connector.setHandler(new MinaShortClientHandler());

       for (int i = 1; i <= 10; i++) {

           ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1",PORT));

           future.awaitUninterruptibly();

           IoSession session =future.getSession();

           session.write(i);

           session.getCloseFuture().awaitUninterruptibly();

 

           System.out.println("result=" + session.getAttribute("result"));

       }

       connector.dispose();

 

    }

}

消息处理

public class MinaShortClientHandler extends IoHandlerAdapter{

    private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());

 

    public MinaShortClientHandler() {

      

    }

 

    @Override

    public void sessionOpened(IoSession session) {

    }

 

    @Override

    public void messageReceived(IoSession session, Object message) {

       logger.info("Messagereceived in the client..");

       logger.info("Message is:" + message.toString());

       session.setAttribute("result", message.toString());

       session.close(true);

    }

 

    @Override

    public void exceptionCaught(IoSession session, Throwable cause) {

       session.close(true);

    }

}

五、总结

经过本文中的例子,Apache Mina在服务端可实现TCP协议长链接和短链接。在客户端只实现了短链接模式,长链接模式也是能够实现的(在本文中仍是采用传统的Java Socket方式)。两个服务端之间经过共享内存的方式来传递链接对象也许有更好的实现方式。

 

工程下载地址: http://download.csdn.net/detail/peterwanghao/5306972

相关文章
相关标签/搜索