Reactor pattern(反应器模式)是用于处理经过一个或多个输入同时传递给服务器的服务请求的事件处理模式。服务处理程序复用传入的请求,并将它们同步分派给关联的handler。 关键几点:java
负责响应事件,将事件分发绑定了该事件的Handler处理。对应netty 的NioEventLoop.run(),processSelectedKeys()。react
事件处理器,绑定了某类事件,负责执行对应事件的任务对事件进行处理。对应netty的IdleStateHandler等。promise
Acceptor属于handler中的一种,由于更加特殊,独立出来说,是reactor的事件接收类,负责初始化selector和接收缓冲队列。对应netty的ServerBootstrapAcceptor。bash
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup(4);
复制代码
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
children = new SingleThreadEventExecutor[nThreads];
...
for (int i = 0; i < nThreads; i ++) {
...
children[i] = newChild(threadFactory, args);
...
}
}
复制代码
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
复制代码
在这里建立了mainReactor和subReactor线程池,而且建立了eventLoop线程服务器
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
复制代码
每一个eventLoop线程都会有自身的selector,这里的eventLoop线程还未启动,后面启动以后,会执行run()里面的selector.select。socket
ChannelFuture regFuture = group().register(channel); 这里的group()是bossGroupide
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
复制代码
next()执行函数
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
复制代码
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[childIndex.getAndIncrement() & children.length - 1];
}
}
复制代码
从线程池中取出第一个eventLoopoop
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
...
channel.unsafe().register(this, promise);
return promise;
}
复制代码
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
复制代码
这里将mainReactor的eventLoop和服务器的NioServerSocketChannel绑定。 由于刚开始启动是main线程,执行eventLoop.execute,在这里mainReactor只启动了一个线程。ui
@Override
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
...
}
...
}
复制代码
在execute中执行startThread(),正式启动mainReactor线程循环,而且将register0(promise)这个Task加入taskQueue中,让mainReactor循环执行。
private void register0(ChannelPromise promise) {
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
}
复制代码
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
...
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
...
}
}
复制代码
这里将mainReactor 中eventLoop的selector注册一个为0的操做监听位,并将服务器的NioServerSocketChannel绑定到mainSubReactor线程上 在doBind()-->doBind0()-->channel.bind()-->…-->next.invokeBind()-->HeadContext. Bind()-->unsafe.bind()-->pipeline.fireChannelActive()-->channel.read()-->…-->doBeginRead()中修改成OP_ACCEPT(16)操做监听位。
@Override
protected void doBeginRead() throws Exception {
…
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
复制代码
自此,mainReactor的eventLoop从run开始循环执行selector.select。
注:readInterestOp的值来自于建立NioServerSocketChannel的构造函数
复制代码
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
复制代码
在收到客户端链接后,将会在ServerBootstrapAcceptor中把客户端的Channel注册在subReactor线程上,并将这个channel绑定到subReactor线程的selector上,监听客户端channel的OP_READ事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
复制代码
当监听到客户端链接,执行服务器AbstractNioUnsafe的read();
@Override
public void read() {
...
int localRead = doReadMessages(readBuf);
...
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
...
pipeline.fireChannelReadComplete();
...
}
复制代码
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
...
buf.add(new NioSocketChannel(this, ch));
...
}
复制代码
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
复制代码
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
复制代码
设置客户端channel监听位的值为OP_READ(1)
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
复制代码
ServerBootstrapAcceptor不只将subReactor绑定客户端channel,还为客户端channel进行一些参数的初始化
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
复制代码
和上面的register同样,只是将mainReactor线程池改成了subReactor线程池。 在这里将从subReactor线程池中取一个线程的selector与客户端channel绑定,并监听该客户端0事件。
@Override
public ChannelPipeline fireChannelReadComplete() {
head.fireChannelReadComplete();
if (channel.config().isAutoRead()) {
read();
}
return this;
}
复制代码
read()-->tail.read()-->next.invokeRead()-->HeadContext. read()-->…--> doBeginRead()
@Override
protected void doBeginRead() throws Exception {
...
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
复制代码
在这里将监听位改成OP_READ(1)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
...
}
复制代码
进到NioByteUnsafe的read()方法
@Override
public final void read() {
...
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
...
byteBuf = allocHandle.allocate(allocator);
...
pipeline.fireChannelRead(byteBuf);
...
}
复制代码
@Override
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
复制代码
private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
复制代码
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerB: " + msg);
super.channelRead(ctx, msg);
}
}
复制代码
在这里将接收到客户端消息处理
Reactor线程池有多少个,就会建立多少个selector, mainReactor的eventLoop会与服务器的channel绑定,并只关注服务器channel的ACCEPT事件,subReactor的eventLoop会与客户端的channel绑定,并只关注客户端channel的READ事件。
mainReactor和subReactor循环各自的selector,mainReactor会循环ACCEPT事件的selector,subReactor会循环READ事件的selector,mainReactor接受到客户端链接后,会执行ServerBootstrapAcceptor的channelRead方法,将客户端链接与subReactor绑定。