ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池 ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(8088); while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新链接到来 Socket socket = serverSocket.accept();//blocking executor.submit(new ConnectIOnHandler(socket));//为新的链接建立新的线程 } class ConnectIOnHandler extends Thread{ private Socket socket; public ConnectIOnHandler(Socket socket){ this.socket = socket; } public void run(){ while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){死循环处理读写事件 String someThing = socket.read()....//读取数据(blocking) if(someThing!=null){ ......//处理数据 socket.write()....//写数据 } } } }
不足:java
一、线程的建立和销毁成本很高 二、线程的切换成本是很高 三、线程数量过多,使系统负载压力过大。 四、没有充分利用多核CPU
while (true) { //无事件到底阻塞 selector.select(); Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); keys.remove(); handler(key); } } /** * 处理不一样事件的请求 * @param key */ private void handler(SelectionKey key) throws IOException { if (key.isAcceptable()) { handleAccept(key); } else if (key.isReadable()) { handleRead(key); } } //处理链接请求 private void handleAccept(SelectionKey key){ ... } //处理读操做 private void handleRead(SelectionKey key){ ... }
NIO和 BIO的对比bootstrap
private void startServer() throws InterruptedException { //建立boss接收进来的链接 EventLoopGroup boss = new NioEventLoopGroup(); //建立worker处理已经接收的链接 EventLoopGroup worker = new NioEventLoopGroup(); try { //建立nio辅助启动类 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new EchoServerHandler()); } }); //绑定端口准备接收进来的链接 ChannelFuture future = bootstrap.bind(port).sync(); //等待服务器socket关闭 future.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } }
Bootstrapping 有两种类型,一种是用于客户端的Bootstrap,一种是用于服务端的ServerBootstrap 服务器
Bootstrap如何引导客户端:网络
1.当 bind() 调用时,Bootstrap 将建立一个新的管道, 当 connect() 调用在 Channel 来创建链接 2.Bootstrap 将建立一个新的管道, 当 connect() 调用时 3.新的 Channel
ServerBootstrap如何引导服务端:多线程
1.当调用 bind() 后 ServerBootstrap 将建立一个新的管道,这个管道将会在绑定成功后接收子管道 2.接收新链接给每一个子管道 3.接收链接的 Channel
Netty 中EventLoopGroup是 Reactor 模型的一个实现并发
什么是Reactor呢?能够这样理解,Reactor就是一个执行while (true) { selector.select(); ...}循环的线程,会源源不断的产生新的事件,称做反应堆很贴切。 事件又分为链接事件、IO读和IO写事件,通常把链接事件单独放一线程里处理,即主Reactor(MainReactor),IO读和IO写事件放到另外的一组线程里处理,即从Reactor(SubReactor),从Reactor线程数量通常为2*(CPUs - 1)。 因此在运行时,MainReactor只处理Accept事件,链接到来,立刻按照策略转发给从Reactor之一,只处理链接,故开销很是小;每一个SubReactor管理多个链接,负责这些链接的读和写,属于IO密集型线程,读到完整的消息就丢给业务线程池处理业务,处理完比后,响应消息通常放到队列里,SubReactor会去处理队列,而后将消息写回。
Reactor单线程模型:app
所谓单线程, 即 acceptor 处理和 handler 处理都在一个线程中处理. 这个模型的坏处显而易见: 当其中某个 handler 阻塞时, 会致使其余全部的 client 的 handler 都得不到执行, 而且更严重的是, handler 的阻塞也会致使整个服务不能接收新的 client 请求(由于 acceptor 也被阻塞了). 由于有这么多的缺陷, 所以单线程Reactor 模型用的比较少.
Reactor多线程模型:框架
Reactor主从多线程模型dom
注意:异步
服务器端的ServerSocketChannel 只绑定到了bossGroup 中的一个线程, 所以在调用Java NIO 的 Selector.select 处理客户端的链接请求时, 其实是在一个线程中的, 因此对只有一个服务的应用来讲, bossGroup设置多个线程是没有什么做用的, 反而还会形成资源浪费.
NioEventLoopGroup 与 Reactor 线程模型的对应
//单线程模型 EventLoopGroup bossGroup = new NioEventLoopGroup(1); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup) .channel(NioServerSocketChannel.class) ... //多线程模型 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ... //主从多线程模型 EventLoopGroup bossGroup = new NioEventLoopGroup(4); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ...
NioEventLoop主要干两件事:
IO 事件的处理
//为了保证定时任务的执行不会由于过分挤占IO事件的处理,Netty提供了IO执行比例供用户设置,用户能够设置分 //配给IO的执行比例,防止由于海量定时任务的执行致使IO处理超时或者积压。默认是1:1 final int ioRatio = this.ioRatio;//默认为50 if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } }
hannelHandlerContext ctx = context;
ChannelPipeline pipeline = ctx.pipeline(); //1 pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
ChannelHandlerContext ctx = context; ctx.write(Unpooled.copiedBuffer("Netty in Action",CharsetUtil.UTF_8));
Netty的IO线程NioEventLoop因为聚合了多路复用器Selector,能够同时并发处理成百上千个客户端Channel,因为读写操做都是非阻塞的,这就能够充分提高IO线程的运行效率,避免因为频繁IO阻塞致使的线程挂起。
Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,直接使用堆外直接内存进行Socket读写。
ByteBuf header = ... ByteBuf body = ... CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); compositeByteBuf.addComponents(true, header, body);
虽然看起来 CompositeByteBuf 是由两个 ByteBuf 组合而成的, 不过在 CompositeByteBuf内部, 这两个 ByteBuf 都是单独存在的, CompositeByteBuf 只是逻辑上是一个总体. 以下:
ByteBuf byteBuf = ... ByteBuf header = byteBuf.slice(0, 5); ByteBuf body = byteBuf.slice(5, 10);
byte[] bytes = ... ByteBuf byteBuf = Unpooled.buffer(); byteBuf.writeBytes(bytes); //Netty byte[] bytes = ... ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
//传统io byte[] temp = new byte[1024]; FileInputStream in = new FileInputStream(srcFile); FileOutputStream out = new FileOutputStream(destFile); int length; while ((length = in.read(temp)) != -1) { out.write(temp, 0, length); } //netty RandomAccessFile srcFile = new RandomAccessFile(srcFileName, "r"); FileChannel srcFileChannel = srcFile.getChannel(); RandomAccessFile destFile = new RandomAccessFile(destFileName, "rw"); FileChannel destFileChannel = destFile.getChannel(); long position = 0; long count = srcFileChannel.size(); srcFileChannel.transferTo(position, count, destFileChannel);
PoolChunkList<T> qInit:存储内存利用率0-25%的chunk PoolChunkList<T> q000:存储内存利用率1-50%的chunk PoolChunkList<T> q025:存储内存利用率25-75%的chunk PoolChunkList<T> q050:存储内存利用率50-100%的chunk PoolChunkList<T> q075:存储内存利用率75-100%的chunk PoolChunkList<T> q100:存储内存利用率100%的chunk
PoolArena中申请内存:
为了尽量提高性能,Netty采用了串行无锁化设计,在IO线程内部进行串行操做,避免多线程竞争致使的性能降低。表面上看,串行化设计彷佛CPU利用率不高,并发程度不够。可是,经过调整NIO线程池的线程参数,能够同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列-多个工做线程模型性能更优
Netty的NioEventLoop读取到消息以后,直接调用ChannelPipeline的fireChannelRead(Object msg),只要用户不主动切换线程,一直会由NioEventLoop调用到用户的Handler,期间不进行线程切换,这种串行化处理方式避免了多线程操做致使的锁的竞争,从性能角度看是最优的。
Netty默认提供了对Google Protobuf的支持(Protobuf性能比较好),经过扩展Netty的编解码接口,用户能够实现其它的高性能序列化框架