一.基本概念java
1.channel:channel是tcp连接的抽象,进行一些读写操做。git
2.buffer:其实就是一块内存区域,channel从buffer中读数据,或者往里面写数据。github
3.selector:selector的做用是一个线程来操做多个channel,在运用时须要将channel注册到selector中。数组
4.Bootstrap:它是Netty 提供的一个便利的工厂类, 咱们能够经过它来完成 Netty 的客户端或服务器端的 Netty 初始化.promise
5.EventLoopGroup:是一个线程组。服务器
6.EventLoop:能够理解成一个抽象的线程。app
7.channelpipeline:是一个处理链,这里面体现的是责任链模式。在这个链里面咱们能够把咱们的处理逻辑加进去从而实现咱们的业务逻辑。socket
8.unsafe:是channel中底层Soceket操做封装成的一个对象。tcp
二.流程分析ide
1.客户端:
这里分析的是项目中的示例中的EchoClient。
客户端代码以下:
public final class EchoClient { static final boolean SSL = System.getProperty("ssl") != null; static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure SSL.git final SslContext sslCtx; if (SSL) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT)); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } }
首先分析初始化操做:
1.NioSocketChannel初始化:
初始化须要一个channelfactory,其中客户端代码中b.group().channel()就返回了一个niochannelfactory代码以下:
b.group(group) .channel(NioSocketChannel.class) public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } @SuppressWarnings({ "unchecked", "deprecation" }) public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { return channelFactory((ChannelFactory<C>) channelFactory); } //最终在AbstractBootstrap类中设置了一个channelfactory @Deprecated public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }
该接口中只有一个方法
@Deprecated public interface ChannelFactory<T extends Channel> { /** * Creates a new channel. */ T newChannel(); }
这个方法就是生成channel的方法,下一步找到调用处。
ChannelFuture f = b.connect(HOST, PORT).sync();
这段代码会生成一个channel,调用链以下:
public ChannelFuture connect(String inetHost, int inetPort) { return connect(InetSocketAddress.createUnresolved(inetHost, inetPort)); } public ChannelFuture connect(SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doResolveAndConnect(remoteAddress, config.localAddress()); } private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { //此处生成了channel final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }
final ChannelFuture regFuture = initAndRegister(); final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
经过newChannel方法生成一个niochannel,因此会调用NioSocketChannel的默认构造器。流程以下:
//第一步: private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public NioSocketChannel() { this(DEFAULT_SELECTOR_PROVIDER); } public NioSocketChannel(SelectorProvider provider) { this(newSocket(provider)); } private static SocketChannel newSocket(SelectorProvider provider) { try { return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } } public NioSocketChannel(SocketChannel socket) { this(null, socket); } public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } //第二步:调用父类 protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); } //第三步:再调用父类 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } //第四步:接着调用父类 protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
这里一个channel的初始化就已经所有完成。
总结:
经过NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 产生一个新的Java Nio SocketChannel。
AbstractChannel:
parent属性为空
unsafe经过newUnsafe() 实例化一个 unsafe 对象
pipeline是经过new DefaultChannelPipeline(this)建立的实例,这里也告诉了咱们channel与channelpipeline是一一对应的关系。
AbstractNioChannel:
SelectableChannel 是咱们产生的实例
readInterestOp是SelectionKey.OP_READ
SelectableChannel被设置成非阻塞的。
NioSocketChannel:
SocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket())
unsafe初始化:
@Override protected AbstractNioUnsafe newUnsafe() { return new NioSocketChannelUnsafe(); }
pipeline初始化:
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
pipeline实际上是一个双向链表,这里面开始有一个头和尾,你本身的逻辑handler就是放入到这个链表里面,而后进行处理的。须要注意的是,header 是一个 outboundHandler, 而 tail 是一个inboundHandler。
2.EventLoop初始化:
EventLoop初始化最终调用的是
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
初始化时:
EventLoopGroup group = new NioEventLoopGroup();
当没有传初始化个数时,且io.netty.eventLoopThreads属性值没有设置,则取处理器核心数*2。
这里的逻辑是:首先建立一个大小为nThreads 的数组,再调用newCild方法来初始化这个数组。其实netty里面的group就是基于这个的,这里面维护了一个EventExecutor数组,当netty须要一个EventLoop时就调用next()方法获取一个EventLoop。(其实EventLoopGroup能够理解为MultithreadEventExecutorGroup)。在EventLoop中第一次执行execute方法时,会调用startThread方法,接着调用doStartThread方法,这个方法会把当前线程赋值给SingleThreadEventExecutor里面的thread属性,这样每一个eventloop就有了一个跟它绑定的线程。而且调用下方法
SingleThreadEventExecutor.this.run();
时期一直处在运行状态中,处理任务。
3.channel注册过程:
调用链以下:
initAndRegister(); ChannelFuture regFuture = config().group().register(channel); @Override public ChannelFuture register(Channel channel) { return next().register(channel); } @Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register
在unsafe类中:
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { //将得到到的eventloop赋值给channel中的eventloop值 AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
最终调用的是AbstractNioChannel.doRegister这个方法,把channel与eventLoop中的selector关联起来。这里也作了另一件事就是将当前 Channel 做为 attachment。
注册过程:AbstractBootstrap.initAndRegister->group().register(channel)->MultithreadEventLoopGroup.register->SingleThreadEventLoop.register->channel.unsafe().register(this, promise)-> AbstractUnsafe.register->register0->AbstractNioChannel.doRegister->javaChannel().register(eventLoop().selector, 0, this)。
注册结束后调用
pipeline.fireChannelRegistered();
从而把咱们本身定义的业务handler加载到pipeline中。
连接成功后调用
pipeline().fireChannelActive();
4.handler添加过程:
Bootstrap.handler方法就是实现了handler的添加功能。
调用方法链为:
register
register0
pipeline.fireChannelRegistered();
AbstractChannelHandlerContext.invokeChannelRegistered(head);
next.invokeChannelRegistered();
((ChannelInboundHandler) handler()).channelRegistered(this);
initChannel(ctx)
initChannel((C) ctx.channel());
到了这里就会调用咱们重写的handler加载逻辑,最后再调用
remove(ctx);
remove(ctx)调用是由于initAndRegister()方法中调用了init(channel)方法,从而把咱们重写的ChannelInitializer加载到了pipeline中。
netty的事件传播机制:例子
public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("Connected!"); ctx.fireChannelActive(); } }
当调用了channelActive时,若是但愿继续传播下去,须要调用ctx.fireChannelActive()。