NIO socket是非阻塞的通信模式,与IO阻塞式的通信不一样点在于NIO的数据要经过channel放到一个缓存池ByteBuffer中,而后再从这个缓存池中读出数据,而IO的模式是直接从inputstream中read。因此对于NIO,因为存在缓存池的大小限制和网速的不均匀会形成一次读的操做放入缓存池中的数据不完整,便造成了断包问题。同理,若是一次性读入两个及两个以上的数据,则没法分辨两个数据包的界限问题,也就形成了粘包。对于NIO的SocketChannel每次触发OP_READ事件时,发送端不必定仅仅写入了一次,同理,发送端若是一次发送数据包过大,那么发送端的一次写入也可能会被拆分红两次OP_READ事件,因此OP_READ事件和发送端的OP_WRITE事件并非一一对应的。缓存
第一个问题:对于粘包问题的解决socket
粘包问题主要是因为数据包界限不清,因此这个问题比较好解决,最好的解决办法就是在发送数据包前事先发送一个int型数据,该数据表明将要发送的数据包的大小,这样接收端能够每次触发OP_READ的时候先接受一个int大小的数据段到缓存池中,而后,紧接着读出后续完整的大小的包,这样就会处理掉粘包问题。由于channel.read()方法不能给读取数据的大小的参数,因此没法手动指定读取数据段的大小。但每次调用channel.read()返回的是他实际读取的大小,这样,思路就有了:首先调整缓存池的大小固定为要读出数据段的大小,这样保证不会过量读出。因为OP_READ和OP_WRITE不是一一对应的,因此一次OP_READ能够While循环调用channel.read()不停读取channel中的数据到缓存池,并捕获其返回值,当返回值累计达到要读取数据段大小时break掉循环,这样保证数据读取充足。因此这样就完美解决粘包问题。ide
第二个问题:对于断包问题的解决函数
断包问题主要是因为数据包过量读入时,缓存池结尾处只有半个数据包,channel里还有半个数据包,这样形成了这个包没法处理的问题。这个问题的解决思路是保证每次不过量读入,这样也就不存在断包了。仍是由于channel.read()的读取不可控的缘由,因此没法从read函数中控制读取大小,仍是从缓存池入手。方法是调整缓存池的大小为要读数据的大小,这样就不会断包。spa
下附某次开发过程的源代码参考:线程
发送端:指针
private void sendIntoChannel() { Runnable run = new Runnable() { @Override public void run() { try { ByteArrayOutputStream bOut; ObjectOutputStream out; CBaseDataBean cbdb; ByteBuffer bb = ByteBuffer.allocate(MemCache); while (true) { cbdb = CloudServer.cdsq.read();//Blocking Method //处理自我命令:断开链接 退出线程 if (cbdb.getDataType() == CMsgTypeBean.MSG_TYPE_CUTDOWN) { break; } bOut = new ByteArrayOutputStream(); out = new ObjectOutputStream(bOut); out.writeObject(cbdb); out.flush(); //构造发送数据:整型数据头+有效数据段 byte[] arr = bOut.toByteArray(); final int ObjLength = arr.length; //获取有效数据段长度 bb.clear(); bb.limit(IntLength + ObjLength); //调整缓存池大小 bb.putInt(ObjLength); bb.put(arr); bb.position(0); //调整重置读写指针 SocketChannel channel = (SocketChannel) key.channel(); channel.write(bb); out.close(); bOut.close(); } } catch (IOException ex) { } } }; CloudServer.cstp.putNewThread(run); }
/** * 开辟线程分发消息 */ private void Dispatcher() { Runnable run = new Runnable() { @Override public void run() { try { while (true) { selector.selectNow(); Thread.sleep(100); Iterator<SelectionKey> itor = selector.selectedKeys().iterator(); while (itor.hasNext()) { SelectionKey selKey = itor.next(); itor.remove(); if (selKey.isValid() && selKey.isAcceptable()) { finshAccept(selKey); } if (selKey.isValid() && selKey.isReadable()) { //消息分发 Processer(); } } } } catch (IOException | InterruptedException ex) { System.out.println(ex.toString()); } } }; CloudServer.cstp.putNewThread(run); } /** * 消息处理器 */ private void Processer() { ByteBuffer bbInt = ByteBuffer.allocate(IntLength); //读取INT头信息的缓存池 ByteBuffer bbObj = ByteBuffer.allocate(MemCache); //读取OBJ有效数据的缓存池 SocketChannel channel = (SocketChannel)key.channel(); ByteArrayInputStream bIn; ObjectInputStream in; CBaseDataBean cbdb; //有效数据长度 int ObjLength; //从NIO信道中读出的数据长度 int readObj; try { //读出INT数据头 while (channel.read(bbInt) == IntLength) { //获取INT头中标示的有效数据长度信息并清空INT缓存池 ObjLength = bbInt.getInt(0); bbInt.clear(); //清空有效数据缓存池设置有效缓存池的大小 bbObj.clear(); bbObj.limit(ObjLength); //循环读满缓存池以保证数据完整性 readObj = channel.read(bbObj); while (readObj != ObjLength) { readObj += channel.read(bbObj); } bIn = new ByteArrayInputStream(bbObj.array()); in = new ObjectInputStream(bIn); cbdb = (CBaseDataBean) in.readObject(); switch (cbdb.getDataType()) { case CMsgTypeBean.MSG_TYPE_COMMAND: rcv_msg_command(cbdb); break; case CMsgTypeBean.MSG_TYPE_CUTDOWN: rcv_msg_cutdown(); break; case CMsgTypeBean.MSG_TYPE_VERIFYFILE: rcv_msg_verifyfile(cbdb); break; case CMsgTypeBean.MSG_TYPE_SENDFILE: rcv_msg_sendfile(cbdb); break; case CMsgTypeBean.MSG_TYPE_DISPATCHTASK: rcv_msg_dispchtask(cbdb); break; } in.close(); } } catch (ClassNotFoundException | IOException ex) { } }