Apache MINA是Apache组织的一个优秀的项目。MINA是Multipurpose Infrastructure for NetworkApplications的缩写。它是一个网络应用程序框架,用来帮助用户很是方便地开发高性能和高可靠性的网络应用程序。在本文中介绍了如何经过Apache Mina2.0来实现TCP协议长链接和短链接应用。java
整个系统由两个服务端程序和两个客户端程序组成。分别实现TCP长链接和短链接通讯。express
系统业务逻辑是一个客户端与服务端创建长链接,一个客户端与服务端创建短链接。数据从短链接客户端通过服务端发送到长链接客户端,并从长链接客户端接收响应数据。当收到响应数据后断开链接。数组
系统架构图以下:网络
系统处理流程以下:session
1) 启动服务端程序,监听8001和8002端口。架构
2) 长链接客户端向服务端8002端口创建链接,服务端将链接对象保存到共享内存中。因为采用长链接方式,链接对象是惟一的。框架
3) 短链接客户端向服务端8001端口创建链接。创建链接后建立一个链接对象。socket
4) 短链接客户端链接成功后发送数据。服务端接收到数据后从共享内存中获得长链接方式的链接对象,使用此对象向长链接客户端发送数据。发送前将短链接对象设为长链接对象的属性值。ide
5) 长链接客户端接收到数据后返回响应数据。服务端从长链接对象的属性中取得短链接对象,经过此对象将响应数据发送给短链接客户端。性能
6) 短链接客户端收到响应数据后,关闭链接。
服务启动
public class MinaLongConnServer {
private static final int PORT = 8002;
public void start()throws IOException{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
acceptor.setHandler(new MinaLongConnServerHandler());
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("Listeningon port " + PORT);
}
}
消息处理
public class MinaLongConnServerHandler extends IoHandlerAdapter {
private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
public void sessionOpened(IoSession session) {
InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();
String clientIp = remoteAddress.getAddress().getHostAddress();
logger.info("LongConnect Server opened Session ID ="+String.valueOf(session.getId()));
logger.info("接收来自客户端 :" + clientIp + "的链接.");
Initialization init = Initialization.getInstance();
HashMap<String, IoSession> clientMap =init.getClientMap();
clientMap.put(clientIp, session);
}
public void messageReceived(IoSession session, Object message) {
logger.info("Messagereceived in the long connect server..");
String expression = message.toString();
logger.info("Message is:" + expression);
IoSession shortConnSession =(IoSession) session.getAttribute("shortConnSession");
logger.info("ShortConnect Server Session ID ="+String.valueOf(shortConnSession.getId()));
shortConnSession.write(expression);
}
public void sessionIdle(IoSession session, IdleStatus status) {
logger.info("Disconnectingthe idle.");
// disconnect an idle client
session.close(true);
}
public void exceptionCaught(IoSession session, Throwable cause) {
// close the connection onexceptional situation
logger.warn(cause.getMessage(), cause);
session.close(true);
}
}
服务启动
public class MinaShortConnServer {
private static final int PORT = 8001;
public void start()throws IOException{
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
acceptor.setHandler(new MinaShortConnServerHandler());
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 3);
acceptor.bind(new InetSocketAddress(PORT));
System.out.println("Listeningon port " + PORT);
}
}
消息处理
public class MinaShortConnServerHandler extends IoHandlerAdapter {
private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
public void sessionOpened(IoSession session) {
InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();
logger.info(remoteAddress.getAddress().getHostAddress());
logger.info(String.valueOf(session.getId()));
}
@Override
public void messageReceived(IoSession session, Object message) {
logger.info("Messagereceived in the short connect server...");
String expression = message.toString();
Initialization init = Initialization.getInstance();
HashMap<String, IoSession> clientMap =init.getClientMap();
if (clientMap == null || clientMap.size() == 0) {
session.write("error");
} else {
IoSession longConnSession = null;
Iterator<String> iterator =clientMap.keySet().iterator();
String key = "";
while (iterator.hasNext()) {
key = iterator.next();
longConnSession = clientMap.get(key);
}
logger.info("ShortConnect Server Session ID :"+String.valueOf(session.getId()));
logger.info("LongConnect Server Session ID :"+String.valueOf(longConnSession.getId()));
longConnSession.setAttribute("shortConnSession",session);
longConnSession.write(expression);
}
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
logger.info("Disconnectingthe idle.");
// disconnect an idle client
session.close(true);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
// close the connection onexceptional situation
logger.warn(cause.getMessage(), cause);
session.close(true);
}
}
使用Java.NET.Socket来实现向服务端创建链接。Socket创建后一直保持链接,从服务端接收到数据包后直接将原文返回。
public class TcpKeepAliveClient {
private String ip;
private int port;
private static Socket socket = null;
private static int timeout = 50 * 1000;
public TcpKeepAliveClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void receiveAndSend() throws IOException {
InputStream input = null;
OutputStream output = null;
try {
if (socket == null ||socket.isClosed() || !socket.isConnected()) {
socket = new Socket();
InetSocketAddress addr = new InetSocketAddress(ip, port);
socket.connect(addr, timeout);
socket.setSoTimeout(timeout);
System.out.println("TcpKeepAliveClientnew ");
}
input = socket.getInputStream();
output = socket.getOutputStream();
// read body
byte[] receiveBytes = {};// 收到的包字节数组
while (true) {
if (input.available() > 0) {
receiveBytes = new byte[input.available()];
input.read(receiveBytes);
// send
System.out.println("TcpKeepAliveClientsend date :" +new String(receiveBytes));
output.write(receiveBytes, 0, receiveBytes.length);
output.flush();
}
}
} catch (Exception e) {
e.printStackTrace();
System.out.println("TcpClientnew socket error");
}
}
public static void main(String[] args) throws Exception {
TcpKeepAliveClient client = new TcpKeepAliveClient("127.0.0.1", 8002);
client.receiveAndSend();
}
}
服务启动
public class MinaShortClient {
private static final int PORT = 8001;
public static void main(String[] args) throws IOException,InterruptedException {
IoConnector connector = new NioSocketConnector();
connector.getSessionConfig().setReadBufferSize(2048);
connector.getFilterChain().addLast("logger", new LoggingFilter());
connector.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
connector.setHandler(new MinaShortClientHandler());
for (int i = 1; i <= 10; i++) {
ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1",PORT));
future.awaitUninterruptibly();
IoSession session =future.getSession();
session.write(i);
session.getCloseFuture().awaitUninterruptibly();
System.out.println("result=" + session.getAttribute("result"));
}
connector.dispose();
}
}
消息处理
public class MinaShortClientHandler extends IoHandlerAdapter{
private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());
public MinaShortClientHandler() {
}
@Override
public void sessionOpened(IoSession session) {
}
@Override
public void messageReceived(IoSession session, Object message) {
logger.info("Messagereceived in the client..");
logger.info("Message is:" + message.toString());
session.setAttribute("result", message.toString());
session.close(true);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
session.close(true);
}
}
经过本文中的例子,Apache Mina在服务端可实现TCP协议长链接和短链接。在客户端只实现了短链接模式,长链接模式也是能够实现的(在本文中仍是采用传统的Java Socket方式)。两个服务端之间经过共享内存的方式来传递链接对象也许有更好的实现方式。
工程下载地址: http://download.csdn.net/detail/peterwanghao/5306972