Netty学习笔记(三)- Reactor模型

在学习 Netty 的 EventLoop 线程模型以前,须要先了解 Java 的 Reactor 模式。java

1、Reactor 模型简介

  在网络编程过程当中,服务器端最原始的方式就是经过一个循环来不断的监听端口是否有新的 socket 连接,若是有就直接处理,好比读取 socket 输入,写入输出等等,完成以后再开始下一次监听。这种方式有一个最大的问题,当前请求没有处理完以前,下一个请求只能阻塞着,直到当前请求处理完成,服务器吞吐量低。在并发量大的状况下,效率很低。
  天然的,咱们会想到使用线程来处理。当有新的 socket 连接时,建立一个线程,将后面的处理逻辑都交给前程处理,即一个请求(socket 连接)用一个线程处理,这样就不会产生阻塞了。虽然这样解决了吞吐量的问题,可是又带来新的问题,即在并发量大的状况下,会不断的产生新的线程。反复的线程建立会消耗大量的资源,而多个线程的上下文切换也会影响效率。
  采用事件驱动的方式能够很好的解决这些问题。当有事件发生,好比链接创建、数据可读或可写时,调用独立的处理器进行处理,避免阻塞主流程,同时也可使用有限的线程来处理多个事件,避免了线程过多消耗大量资源。
  Reactor 模型是一种事件驱动机制。通常的,应用程序经过主动调用某些 API 来完成处理,而 Reactor 却相反,应用程序提供相应的接口(回调函数)注册到 Reactor 上,若是有相应的事件发生,Reactor 将主动调用以前注册的接口。Reactor 能够同时接收多个请求,而后将它们分发到对应的处理器上。
  Reactor 模型有如下几个主要部分:react

  - Selector:事件通知器
  - Handler:事件处理器
  - SelectionKey:事件标识编程

  接下来使用以前的 demo,经过 Java 相关 API 实现 Server 端来学习 Reactor 模型的几种形态。
  复用 demo 中 Netty 的 Client 端,链接创建时发送消息到 Server,接收 Server 返回的消息打印在控制台,如下是 Client 端的代码示例:bootstrap

package com.niklai.demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

public class Client {
    private static final Logger logger = LoggerFactory.getLogger(Client.class.getSimpleName());

    public static void init() {
        try {
            Bootstrap bootstrap = new Bootstrap();
            NioEventLoopGroup group = new NioEventLoopGroup();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress("localhost", 9999))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect().sync();点
            future.channel().closeFuture().sync();
            group.shutdownGracefully().sync();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }

    static class ClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            String msg = "Client message!";
            ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            logger.info("read message: {}....", buf.toString(CharsetUtil.UTF_8));
        }
    }
}

2、单线程的 Reactor 模型

所谓单线程的 Reactor,就是只有一个线程来通知和处理事件。数组

package com.niklai.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class ReactorServer {
    private static final Logger logger = LoggerFactory.getLogger(ReactorServer.class.getSimpleName());
    private Selector selector;
    private boolean loop = true;
    private ServerSocketChannel serverChannel;

    public ReactorServer() {
        try {
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void init() {
        try {
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            ServerSocket socket = serverChannel.socket();
            socket.bind(new InetSocketAddress("localhost", 9999));
            SelectionKey selectionKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);       // 注册链接接受事件
            selectionKey.attach(new Accept(serverChannel, selector));       // 绑定链接接受事件的处理器
            while (loop) {
                int select = selector.select();     // 阻塞获取当前是否有事件触发
                if (select != 0) {
                    Set<SelectionKey> readKeys = selector.selectedKeys();       // 获取触发的事件
                    Iterator<SelectionKey> iterator = readKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        Runnable runnable = (Runnable) key.attachment();        // 获取事件绑定的处理器并执行
                        runnable.run();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 链接创建时的事件处理器
    static class Accept implements Runnable {
        private ServerSocketChannel channel;
        private Selector selector;

        public Accept(ServerSocketChannel channel, Selector selector) {
            this.channel = channel;
            this.selector = selector;
        }

        @Override
        public void run() {
            try {
                SocketChannel client = channel.accept();
                new Handler(selector, client);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    // 读、写事件处理器
    static class Handler implements Runnable {
        private Selector selector;
        private SocketChannel socket;
        private SelectionKey selectionKey;
        private HandleState state;

        public Handler(Selector selector, SocketChannel socket) {
            this.selector = selector;
            this.socket = socket;
            this.state = HandleState.READING;
            try {
                this.socket.configureBlocking(false);
                this.selectionKey = this.socket.register(this.selector, 0);
                this.selectionKey.interestOps(SelectionKey.OP_READ);        // 注册读事件
                this.selectionKey.attach(this);     // 当前类就是读写处理器
                this.selector.wakeup();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            switch (state) {
                case READING:
                    read();
                    break;
                case WRITING:
                    write();
                    break;
            }
        }

        private void read() {
            StringBuffer sb = new StringBuffer();
            ByteBuffer buf = ByteBuffer.allocate(1024);
            try {
                while (true) {
                    buf.clear();
                    int read = socket.read(buf);
                    sb.append(Charset.forName("utf-8").newDecoder().decode(buf.asReadOnlyBuffer()).toString());
                    if (read == 0) {
                        logger.info("receive message: {}.....", sb.toString());
                        Thread.sleep(2000);         // 模拟读取处理数据逻辑比较耗时
                        selectionKey.interestOps(SelectionKey.OP_WRITE);        // 注册写事件
                        state = HandleState.WRITING;
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private void write() {
            try {
                ByteBuffer output = ByteBuffer.wrap("Reactor server answer!".getBytes());
                socket.write(output.duplicate());
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                selectionKey.cancel();      // 整个读写流程处理完,取消事件注册,避免再次触发。
            }
        }

        private enum HandleState {
            READING, WRITING
        }
    }
}

经过 Selector.open()方法建立一个 Selector 事件通知器,ServerSocketChannel 注册到 Selector 上监听 ACCEPT 事件,并绑定对应的 Accept 事件处理器。当链接创建后触发对应事件,建立 SocketChannel 并再次注册到同一个 Selector 上,并监听读写事件,同时绑定对应的 Handler 事件处理器。等待 READ 事件触发读取数据完成业务处理后触发写事件完成写数据。整个过程当中,全部事件都由同一个 Selector 触发和处理。
运行单元测试,并行初始化 6 个 Client 链接到 Server 上,查看控制台服务器

@Test
public void test() throws InterruptedException {
    new Thread(() -> {
        // 服务端
        new ReactorServer().init();
    }).start();
    Thread.sleep(1000);

    // 并行初始化多个Client,模拟并发效果
    IntStream.range(1, 7).parallel().forEach(item -> {
        logger.info("Client No.{} init...", item);
        // 客户端
        Client.init();
    });
}


从控制台日志中能够看到,虽然 Client 是并行初始化链接到 Server 的,可是在 Server 端倒是同一个线程依次处理的,每次处理耗时 2ms。这就是单线程 Reactor 的特色,资源利用率不高,在高并发的状况下效率会很是的低,甚至会由于某些处理逻辑耗时太长致使后面的链接被拒绝。网络

3、多线程的 Reactor 模型

为了解决上面的问题,咱们能够考虑将耗时的操做放在线程里执行,这样能够避免 Selector 被阻塞。将 demo 中的 ReactorServer 改造一下多线程

// 省略部分代码

static class Handler implements Runnable {
    private Selector selector;
    private SocketChannel socket;
    private SelectionKey selectionKey;
    private HandleState state;
    private ExecutorService pool;

    public Handler(Selector selector, SocketChannel socket) {
        this.selector = selector;
        this.socket = socket;
        this.state = HandleState.READING;
        this.pool = Executors.newFixedThreadPool(4);        // 增长一个线程池
        try {
            this.socket.configureBlocking(false);
            this.selectionKey = this.socket.register(this.selector, 0);
            this.selectionKey.interestOps(SelectionKey.OP_READ);
            this.selectionKey.attach(this);
            this.selector.wakeup();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        switch (state) {
            case READING:
                state = HandleState.WORKING;        // 改变当前handler状态
                pool.execute(() -> {                // 将耗时操做放在线程池中执行
                    read();
                });
                break;
            case WRITING:
                write();
                break;
        }
    }

    private void read() {
        StringBuffer sb = new StringBuffer();
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try {
            while (true) {
                buf.clear();
                int read = socket.read(buf);
                sb.append(Charset.forName("utf-8").newDecoder().decode(buf.asReadOnlyBuffer()).toString());
                if (read == 0) {
                    logger.info("receive message: {}.....", sb.toString());
                    Thread.sleep(2000);
                    selectionKey.interestOps(SelectionKey.OP_WRITE);
                    state = HandleState.WRITING;
                    selector.wakeup();              // 唤醒Selector,让当前阻塞的selector.select()方法返回
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void write() {
        try {
            ByteBuffer output = ByteBuffer.wrap("Reactor server answer!".getBytes());
            socket.write(output.duplicate());
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            selectionKey.cancel();
        }
    }

    private enum HandleState {
        WORKING,
        READING,
        WRITING
    }
}

运行上面的单元测试,查看控制台

与以前运行的结果不一样,耗时的 Read 方法都使用线程池的线程执行,整个流程相比以前总耗时要小不少。并发

4、主从 Reactor 多线程模型

从前面的例子能够看到,即便是使用了线程池,从头到尾都是一个 Selector 在负责事件分发和处理。当在分发以前的逻辑存在耗时长的状况时,会影响到其余事件的触发。这样咱们能够将其分红两部分:监听并创建链接能够由一个独立的 Selector 负责,而读写和业务操做能够由另一个 Selector 负责,而且前一个 Selector 将创建好的链接分派给后一个 Selector。继续修改 demo,实现上述过程。app

package com.niklai.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ReactorServer {
    private static final Logger logger = LoggerFactory.getLogger(ReactorServer.class.getSimpleName());
    private Selector mainSelector;      // 主Selector
    private Selector[] subSelectors;    // 从Selector
    private int next = 0;
    private int count = 2;
    private boolean loop = true;
    private ServerSocketChannel serverChannel;

    public ReactorServer() {
        try {
            subSelectors = new Selector[count];         // 初始化多个从Selector,复用
            for (int i = 0; i < count; i++) {
                subSelectors[i] = Selector.open();
            }

            mainSelector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void init() {
        try {
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            ServerSocket socket = serverChannel.socket();
            socket.bind(new InetSocketAddress("localhost", 9999));
            SelectionKey selectionKey = serverChannel.register(mainSelector, SelectionKey.OP_ACCEPT);
            selectionKey.attach((Runnable) () -> {
                try {
                    SocketChannel client = serverChannel.accept();
                    new Handler(subSelectors[next], client);        // 每创建一个链接,就选择一个从Selector绑定
                    next++;
                    if (next == count) {
                        next = 0;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

            // 每一个从Selector使用独立的线程监听事件,避免相互阻塞
            for (int i = 0; i < count; i++) {
                int finalI = i;
                new Thread(() -> {
                    new HandlerLoop(subSelectors[finalI]).run();
                }).start();
            }

            while (loop) {
                int select = mainSelector.select();
                if (select != 0) {
                    Set<SelectionKey> readKeys = mainSelector.selectedKeys();
                    Iterator<SelectionKey> iterator = readKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        Runnable runnable = (Runnable) key.attachment();
                        runnable.run();
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static class HandlerLoop implements Runnable {
        private Selector selector;

        public HandlerLoop(Selector selector) {
            this.selector = selector;
        }

        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    int select = selector.select();
                    if (select != 0) {
                        Set<SelectionKey> readKeys = selector.selectedKeys();
                        Iterator<SelectionKey> iterator = readKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey key = iterator.next();
                            Runnable runnable = (Runnable) key.attachment();
                            runnable.run();
                            iterator.remove();
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class Handler implements Runnable {
        // 不变,省略代码
    }
}

运行上面的单元测试,查看控制台

在这里只有两个从 Selector,却同时处理了六个 Socket 链接,能够达到使用较少的线程来处理大量的请求。

进一步的优化

在上面的例子里,每 accept 一个 SocketChannel,就会建立一个 handler,虽然多个 SocketChannel 共享了一个 Selector,可是每一个 SocketChannel 对应的 handler 里都有一个独立的线程池。在高并发下,会建立大量的线程池,消耗大量的资源。为了解决这个问题,咱们能够给每个 Selector 分配一个线程池,用同一个线程池的线程来处理同一个 Selector 关联下的多个 SocketChannel。

public class ReactorServer {

    // 省略代码

    private ExecutorService[] pools;        // 线程池数组

    public ReactorServer() {
        try {
            subSelectors = new Selector[count];
            pools = new ExecutorService[count];                     // 初始化线程池组,数组大小与从Selector数组大小相同
            for (int i = 0; i < count; i++) {
                subSelectors[i] = Selector.open();
                pools[i] = Executors.newFixedThreadPool(10);        // 初始化线程池
            }

            mainSelector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void init() {
        try {

            // 省略代码

            selectionKey.attach((Runnable) () -> {
                try {
                    SocketChannel client = serverChannel.accept();
                    new Handler(subSelectors[next], client, pools[next]);       // 选择一个线程池给Handler
                    next++;
                    if (next == count) {
                        next = 0;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

            // 省略代码

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static class Handler implements Runnable {

        // 省略代码

        public Handler(Selector selector, SocketChannel socket) {
            this(selector, socket, Executors.newFixedThreadPool(4));
        }

        public Handler(Selector selector, SocketChannel socket, ExecutorService pool) {
            this.selector = selector;
            this.socket = socket;
            this.state = HandleState.READING;
            this.pool = pool;
            try {
                this.socket.configureBlocking(false);
                this.selectionKey = this.socket.register(this.selector, 0);
                this.selectionKey.interestOps(SelectionKey.OP_READ);
                this.selectionKey.attach(this);
                this.selector.wakeup();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        // 省略代码
    }
}
相关文章
相关标签/搜索