在网络编程 - 初识Netty中,咱们看到了客户端Bootstrap调用channel(NioSocketChannel.class)
来传递所要初始化的Channel对象。
AbstractBootstrap类,能够看到传递的是ReflectiveChannelFactory,一个Channel的工厂类,赋值给Bootstrap的channelFactory。java
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
ReflectiveChannelFactory实现了ChannelFactory接口和newChannel方法,这个newChannel方法,会经过传过来的clazz建立一个Channel。这个方法的调用,在Bootstrap的connect()
方法中,后面会调用initAndRegister()
方法,Bootstrap先暂且略过,咱们看看Channel是怎么初始化的,初始化又作了哪些事情。编程
public T newChannel() { try { // 这边调用NioSocketChannel的构造方法 return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
NioSocketChannelsegmentfault
public NioSocketChannel() { // DEFAULT_SELECTOR_PROVIDER是SelectorProvider.provider() this(DEFAULT_SELECTOR_PROVIDER); } public NioSocketChannel(SelectorProvider provider) { // 经过SelectorProvider获取NIO的SocketChannel this(newSocket(provider)); } private static SocketChannel newSocket(SelectorProvider provider) { try { return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } } public NioSocketChannel(SocketChannel socket) { this(null, socket); } public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); // NioSocketChannel的属性设置 config = new NioSocketChannelConfig(this, socket.socket()); }
AbstractNioByteChannelpromise
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { // 关注OP_READ事件 super(parent, ch, SelectionKey.OP_READ); }
AbstractNioChannel网络
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { // 设置非阻塞 ch.configureBlocking(false); } catch (IOException e) { // 异常处理,关闭channel并抛异常,代码略 } }
AbstractChannelapp
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); // 建立一个NioSocketChannel.NioSocketChannelUnsafe的内部类对象 unsafe = newUnsafe(); // 建立一个Pipeline,并把当前的Channel赋值给Pipeline pipeline = newChannelPipeline(); }
至此,Channel已经建立好了,在其中,他也建立了一个unsafe对象、Pipeline对象以及NioSocketChannelConfig对象。
unsafe类图以下,能够看出,Channel中与socket交互的,其实就是这个unsafe类。
ChannelPipeline这个类,后面再详细的讲。socket
Channel初始化完成后,开始注册到EventLoop。
在AbstractBootstrap的initAndRegister方法中,初始化Channel后,开始执行ChannelFuture regFuture = config().group().register(channel)
。
MultithreadEventLoopGroup,这个next,就是经过chooserFactory.newChooser
获得的chooser,根据不一样的策略来获取EventLoop。也就是说,这个register,其实就是注册到EventLoop中。ide
public ChannelFuture register(Channel channel) { return next().register(channel); }
SingleThreadEventLoop,先是建立DefaultChannelPromise,这边传递两个参数,一个是channel,一个是this,这个this,就是当前的EventLoop,因此Promise保持了他们两个的关系。再调用register方法时,调用unsafe的register方法。oop
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
AbstractChannel.AbstractUnsafe,判断当前调用线程是不是支撑 EventLoop 的线程,若是是直接调用register0,不然,加入到队列。这样的好处,就是减小了线程的上下文切换致使的性能消耗。eventLoop的execute方法参见以前的文章。性能
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } // 赋值eventLoop,register0方法会用到 AbstractChannel.this.eventLoop = eventLoop; // 若是当前调用线程正是支撑 EventLoop 的线程,那么直接调用register0,不然,加入到队列 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
最终的注册方法。javaChannel()返回的是初始化的provider.openSocketChannel()
,eventLoop()返回的是上面register方法赋值的eventLoop,经过SelectableChannel的register方法,把Channel的SocketChannel注册到eventLoop的Selector中。
private void register0(ChannelPromise promise) { // 部分代码略 doRegister(); // 部分代码略 } protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { // 部分代码略 } } }