由于NIO自己是非阻塞的,因此他的消息选择器Selector能够在单线程下链接多台客户端的访问。java
为了增强NIO的性能,咱们加入多线程的操做,固然NIO并不能简单的把Selector.select()放入Executor.execute(Runnable)的run方法中。bootstrap
为完成NIO的多线程,咱们应该有一个调度类,一个服务类。数组
调度类的目的是初始化必定数量的线程,以及线程交接。多线程
package com.netty.nionetty.pool; import com.netty.nionetty.NioServerBoss; import com.netty.nionetty.NioServerWorker; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; /** * Created by Administrator on 2018-05-17. */ public class NioSelectorRunnablePool { private final AtomicInteger bossIndex = new AtomicInteger(); //欢迎线程数组 private Boss[] bosses; private final AtomicInteger workerIndex = new AtomicInteger(); //工做线程数组 private Worker[] workers; public NioSelectorRunnablePool(Executor boss,Executor worker) { initBoss(boss,1); initWorker(worker,Runtime.getRuntime().availableProcessors() * 2); } //初始化1个欢迎线程 private void initBoss(Executor boss,int count) { this.bosses = new NioServerBoss[count]; for (int i = 0;i < bosses.length;i++) { bosses[i] = new NioServerBoss(boss,"boss thread " + (i + 1),this); } } //初始化2倍计算机核数的工做线程 private void initWorker(Executor worker,int count) { this.workers = new NioServerWorker[count]; for (int i = 0; i < workers.length;i++) { workers[i] = new NioServerWorker(worker,"worker thread" + (i + 1),this); } } //交接工做线程(从工做线程数组中挑出) public Worker nextWorker() { return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; } //交接欢迎线程(从欢迎线程数组中挑出) public Boss nextBoss() { return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; } }
另外带一个欢迎线程接口,一个工做线程接口socket
package com.netty.nionetty.pool; import java.nio.channels.ServerSocketChannel; /** * Created by Administrator on 2018-05-17. */ public interface Boss { void registerAcceptChannelTask(ServerSocketChannel serverChannel); }
package com.netty.nionetty.pool; import java.nio.channels.SocketChannel; /** * Created by Administrator on 2018-05-17. */ public interface Worker { void registerNewChannelTask(SocketChannel channel); }
有两种线程(欢迎线程和工做线程),因此咱们有一个抽象线程类ide
package com.netty.nionetty; import com.netty.nionetty.pool.NioSelectorRunnablePool; import java.io.IOException; import java.nio.channels.Selector; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; /** * Created by Administrator on 2018-05-17. */ public abstract class AbstractNioSelector implements Runnable { //线程池 private final Executor executor; //NIO消息选择器 protected Selector selector; protected final AtomicBoolean wakeUp = new AtomicBoolean(); //线程任务队列 private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>(); //线程名 private String threadName; //线程调度器 protected NioSelectorRunnablePool selectorRunnablePool; AbstractNioSelector(Executor executor,String threadName,NioSelectorRunnablePool selectorRunnablePool) { this.executor = executor; this.threadName = threadName; this.selectorRunnablePool = selectorRunnablePool; openSelector(); } private void openSelector() { try { this.selector = Selector.open(); //打开消息选择器 } catch (IOException e) { e.printStackTrace(); } //把线程放入线程池,开始执行run方法 executor.execute(this); } public void run() { Thread.currentThread().setName(this.threadName); while (true) { try { wakeUp.set(false); //把消息选择器的状态定为未唤醒状态 select(selector); //消息选择器选择消息方式 processTaskQueue(); //由于在主程序中绑定端口的时候已经注册了接收通道任务线程,因此这里是读出任务。 process(selector); //任务处理,欢迎线程跟工做线程各不相同 } catch (IOException e) { e.printStackTrace(); } } } //欢迎线程跟工做线程各自添加不一样的线程,再把消息选择器唤醒 protected final void registerTask(Runnable task) { taskQueue.add(task); Selector selector = this.selector; if (selector != null) { if (wakeUp.compareAndSet(false,true)) { selector.wakeup(); } }else { taskQueue.remove(task); } } public NioSelectorRunnablePool getSelectorRunnablePool() { return selectorRunnablePool; } private void processTaskQueue() { for (;;) { final Runnable task = taskQueue.poll(); if (task == null) { break; } task.run(); } } protected abstract int select(Selector selector) throws IOException; protected abstract void process(Selector selector) throws IOException; }
欢迎线程跟工做线程的具体实现性能
package com.netty.nionetty; import com.netty.nionetty.pool.Boss; import com.netty.nionetty.pool.NioSelectorRunnablePool; import com.netty.nionetty.pool.Worker; import java.io.IOException; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; /** * Created by Administrator on 2018-05-17. */ public class NioServerBoss extends AbstractNioSelector implements Boss { public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor,threadName,selectorRunnablePool); } //注册接收任务,会先调用抽象类,把任务线程先添加到任务队列,再注册接收消息类型 public void registerAcceptChannelTask(final ServerSocketChannel serverChannel) { final Selector selector = this.selector; registerTask(new Runnable() { public void run() { try { serverChannel.register(selector,SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); } @Override protected int select(Selector selector) throws IOException { return selector.select(); } //NIO操做,开始接收,接收后再启用工做线程,接收线程依然存在,并且工做线程也不断给到线程池未使用线程 //具体看初始化的时候初始了多少工做线程,可是是几个链接对应一个工做线程。 @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } for (Iterator<SelectionKey> i = selectedKeys.iterator();i.hasNext();) { SelectionKey key = i.next(); i.remove(); ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); Worker nextWorker = getSelectorRunnablePool().nextWorker(); nextWorker.registerNewChannelTask(channel); System.out.println("新客户端链接"); } } }
package com.netty.nionetty; import com.netty.nionetty.pool.NioSelectorRunnablePool; import com.netty.nionetty.pool.Worker; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; /** * Created by Administrator on 2018-05-17. */ public class NioServerWorker extends AbstractNioSelector implements Worker { public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor,threadName,selectorRunnablePool); } public void registerNewChannelTask(final SocketChannel channel) { final Selector selector = this.selector; registerTask(new Runnable() { public void run() { try { channel.register(selector,SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); } @Override protected int select(Selector selector) throws IOException { return selector.select(500); } @Override protected void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey)ite.next(); ite.remove(); SocketChannel channel = (SocketChannel)key.channel(); int ret = 0; boolean failure = true; ByteBuffer buffer = ByteBuffer.allocate(1024); try { ret = channel.read(buffer); failure = false; } catch (IOException e) { e.printStackTrace(); } if (ret <= 0 || failure) { key.cancel(); System.out.println("客户端断开链接"); }else { System.out.println("收到数据:" + new String(buffer.array())); ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes()); channel.write(outBuffer); } } } }
服务类this
package com.netty.nionetty; import com.netty.nionetty.pool.Boss; import com.netty.nionetty.pool.NioSelectorRunnablePool; import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.ServerSocketChannel; /** * Created by Administrator on 2018-05-17. */ public class ServerBootstap { private NioSelectorRunnablePool selectorRunnablePool; public ServerBootstap(NioSelectorRunnablePool selectorRunnablePool) { this.selectorRunnablePool = selectorRunnablePool; } public void bind(final SocketAddress localAddress) { try { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(localAddress); Boss nextBoss = selectorRunnablePool.nextBoss(); nextBoss.registerAcceptChannelTask(serverChannel); } catch (IOException e) { e.printStackTrace(); } } }
主程序atom
package com.netty.nionetty; import com.netty.nionetty.pool.NioSelectorRunnablePool; import java.net.InetSocketAddress; import java.util.concurrent.Executors; /** * Created by Administrator on 2018-05-17. */ public class Start { public static void main(String[] args) { NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()); ServerBootstap bootstrap = new ServerBootstap(nioSelectorRunnablePool); bootstrap.bind(new InetSocketAddress(10101)); System.out.println("start"); } }
其实最主要的就是在线程调度器中,各类线程已经被初始化存在于线程池内存中了,因此后面只是把这些线程拿出来,并注册消息类型,进行处理,这就是NIO的多线程处理了。spa