NIO socket 的简单链接池

     在最近的项目中,须要写一个socket 与 底层服务器通讯的模块。在设计中,请求对象被封装 xxxRequest,消息返回被封装为 xxxResponse. 因为socket的编程开发经验少,一开始我使用了短链接的方式,每一个请求创建一个socket通讯,因为每一个socket只进行一次读写,这大大浪费了 系统资源。

      因而考虑使用长链接,系统公用一个client socket 并对send 操做进行加锁,结果在处理并发的时候,各类慢,各类等待。没有办法,考虑使用两节池,预先建立多个 client socket 放入 链接池,须要发送请求时从链接池获取一个socket,完成请求时放入链接池中。下面是一个简单的实现。html

       

        private  static String IP=GlobalNames.industryIP;
 private  static int PORT =Integer.parseInt(GlobalNames.industryPort);
 
 private static  int CONNECTION_POOL_SIZE = 10;
 private static NIOConnectionPool self = null;
 private Hashtable<Integer, SocketChannel> socketPool = null; // 链接池
 private boolean[] socketStatusArray = null; // 链接的状态(true-被占用,false-空闲)
 private static Selector selector  = null;
 private static InetSocketAddress SERVER_ADDRESS = null;
 
 /**
  * 初始化链接池,最大TCP链接的数量为10
  *
  * @throws IOException
  */android 自定义SeekBarPreference 实现
 public static synchronized void init() throws Exception {
  self = new NIOConnectionPool();
  self.socketPool = new Hashtable<Integer, SocketChannel>();
  self.socketStatusArray = new boolean[CONNECTION_POOL_SIZE];
  buildConnectionPool();
 }android

 /**
  * 创建链接池
  */
 public synchronized static void buildConnectionPool() throws Exception {
  if (self == null) {
   init();
  }
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   SocketChannel client = allocateSocketChannel();
   self.socketPool.put(new Integer(i), client);
   self.socketStatusArray[i] = false;
  }
 }编程

 /**
  * 从链接池中获取一个空闲的Socket
  * 商帐追收
  * @return 获取的TCP链接
  */
 public static SocketChannel getConnection() throws Exception {
  if (self == null)
   init();
  int i = 0;
  for (i = 0; i < CONNECTION_POOL_SIZE; i++) {
   if (!self.socketStatusArray[i]) {
    self.socketStatusArray[i] = true;
    break;
   }
  }
  if (i < CONNECTION_POOL_SIZE) {
   return self.socketPool.get(new Integer(i));
   
  } else {服务器

  //目前链接池无可用链接时只是简单的新建一个链接
   SocketChannel newClient = allocateSocketChannel();
   CONNECTION_POOL_SIZE++;
   self.socketPool.put(CONNECTION_POOL_SIZE, newClient);
   return newClient;
  }
 }并发

 /**
  * 当得到的socket不可用时,从新得到一个空闲的socket。
  *
  * @param socket
  *            不可用的socket
  * @return 新获得的socket
  * @throws Exception
  */
 public static SocketChannel rebuildConnection(SocketChannel socket)
   throws Exception {
  if (self == null) {
   init();
  }
  SocketChannel newSocket = null;
  try {
   for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
    if (self.socketPool.get(new Integer(i)) == socket) {
     newSocket = allocateSocketChannel();
     self.socketPool.put(new Integer(i), newSocket);
     self.socketStatusArray[i] = true;
    }
   }socket

  } catch (Exception e) {
   System.out.println("重建链接失败!");
   throw new RuntimeException(e);
  }
  return newSocket;
 }ui


 /**
  * 将用完的socket放回池中,调整为空闲状态。此时链接并无断开。
  *
  * @param socket
  *            使用完的socket
  * @throws Exception
  */
 public static void releaseConnection(SocketChannel socket) throws Exception {
  if (self == null) {
   init();
  }
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   if (self.socketPool.get(new Integer(i)) == socket) {
    self.socketStatusArray[i] = false;
    break;
   }
  }
 }设计

 /**
  * 断开池中全部链接
  *
  * @throws Exception
  */
 public synchronized static void releaseAllConnection() throws Exception {
  if (self == null)
   init();htm

  // 关闭全部链接
  SocketChannel socket = null;
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   socket = self.socketPool.get(new Integer(i));
   try {
    socket.close();
    self.socketStatusArray[i] = false;
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }
   
 
 public static SocketChannel allocateSocketChannel(){
  
   SERVER_ADDRESS = new InetSocketAddress(  
                IP, PORT);  
  SocketChannel socketChannel = null;
     SocketChannel client = null;
     try{
     socketChannel = SocketChannel.open();  
        socketChannel.configureBlocking(false);  
        selector = Selector.open();  
        socketChannel.register(selector, SelectionKey.OP_CONNECT);  
        socketChannel.connect(SERVER_ADDRESS);
        Set<SelectionKey> selectionKeys;  
        Iterator<SelectionKey> iterator;  
        SelectionKey selectionKey;
        selector.select();  
        selectionKeys = selector.selectedKeys();  
        iterator = selectionKeys.iterator();  
        while (iterator.hasNext()) {  
            selectionKey = iterator.next();  
            if (selectionKey.isConnectable()) {  
                client = (SocketChannel) selectionKey.channel();  
                if (client.isConnectionPending()) {  
                    client.finishConnect();
                    client.register(selector, SelectionKey.OP_WRITE);  
                    break;
                }
            }
        }
 }catch(Exception e){
  e.printStackTrace();
 }
 return client;
  }对象

 public static Selector getSelector() {
  return selector;
 }

 

使用链接池进行通讯:

 /*缓冲区大小*/       private static int BLOCK = 8*4096;        /*发送数据缓冲区*/       private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);            /*接受数据缓冲区*/      private static ByteBuffer protocalNum = ByteBuffer.allocate(4);      private static ByteBuffer functionNum = ByteBuffer.allocate(4);      private static ByteBuffer messageLen = ByteBuffer.allocate(4);      private static ByteBuffer receivebuffer = null;      private  SocketChannel client = null;      private Selector selector = null;           private boolean readable = true;      private boolean writable = true;                public NIOSocketBackUp() throws Exception{       client = NIOConnectionPool.getConnection();       selector = NIOConnectionPool.getSelector();      }           public String send(ServiceRequest request) throws Exception {                           Set<SelectionKey> selectionKeys;            Iterator<SelectionKey> iterator;            SelectionKey selectionKey;            int count=0;            boolean flag = true;          String receiveText="";             while (flag) {                selector.select();                //返回此选择器的已选择键集。                selectionKeys = selector.selectedKeys();                iterator = selectionKeys.iterator();                while (iterator.hasNext()) {                    selectionKey = iterator.next();                    if (selectionKey.isWritable() && (writable)) {                            sendbuffer.clear();                            sendbuffer.put(request.getProtocalNum());                          sendbuffer.put(request.getFunctionNum());                          sendbuffer.put(request.getMessageLen());                          sendbuffer.put(request.getXmlbytes());                          sendbuffer.flip();                            client.write(sendbuffer);                            client.register(selector, SelectionKey.OP_READ);                            writable = false;                  } else if (selectionKey.isReadable() && (readable) ) {                        protocalNum.clear();                      functionNum.clear();                      messageLen.clear();                                                                count=client.read(protocalNum);                        count=client.read(functionNum);                      count=client.read(messageLen);                      messageLen.rewind();                      int length = messageLen.asIntBuffer().get(0);                      receivebuffer = ByteBuffer.allocate(length-12);                      receivebuffer.clear();                                           //read方式居然不阻塞                      int total=0;                      while(total!=(length-12)){                        count=client.read(receivebuffer);                        total+=count;                      }                      client.register(selector, SelectionKey.OP_WRITE);                        receiveText = new String(receivebuffer.array(),"GBK");                      flag = false;                      readable = false;                      break;                  }              }            }              NIOConnectionPool.releaseConnection(client);          return receiveText.trim();      }  

相关文章
相关标签/搜索