netty的客户端引导类是Bootstrap,咱们看一下spark的rpc中客户端部分对Bootstrap的初始化过程java
只须要贴出Bootstrap初始化部分的代码ios
// 客户端引导对象 Bootstrap bootstrap = new Bootstrap(); // 设置各类参数 bootstrap.group(workerGroup) .channel(socketChannelClass) // Disable Nagle's Algorithm since we don't want packets to wait // 关闭Nagle算法 .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()) .option(ChannelOption.ALLOCATOR, pooledAllocator); // socket接收缓冲区 if (conf.receiveBuf() > 0) { bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf()); } // socket发送缓冲区 // 对于接收和发送缓冲区的设置应该用以下的公式计算: // 延迟 *带宽 // 例如延迟是1ms,带宽是10Gbps,那么缓冲区大小应该设为1.25MB if (conf.sendBuf() > 0) { bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf()); } final AtomicReference<TransportClient> clientRef = new AtomicReference<>(); final AtomicReference<Channel> channelRef = new AtomicReference<>(); // 设置handler(处理器对象) bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { TransportChannelHandler clientHandler = context.initializePipeline(ch); clientRef.set(clientHandler.getClient()); channelRef.set(ch); } }); // Connect to the remote server long preConnect = System.nanoTime(); // 与服务端创建链接,启动方法 ChannelFuture cf = bootstrap.connect(address);
分为几个主要的步骤:git
接下来,咱们主要经过两条线索来分析Bootstrap的启动过程,即构造器和connect两个方法,而对于设置参数的过程仅仅是给内部的一些成员变量赋值,因此不须要详细展开。github
Bootstrap继承了AbstractBootstrap,看了一下他们的无参构造方法,都是个空方法。。。。。。因此这一步,咱们就省了,瞬间感受飞起来了有没有^_^算法
public ChannelFuture connect(SocketAddress remoteAddress) { // 检查非空 ObjectUtil.checkNotNull(remoteAddress, "remoteAddress"); // 一样是对一些成员变量检查非空,主要检查EventLoopGroup,ChannelFactory,handler对象 validate(); return doResolveAndConnect(remoteAddress, config.localAddress()); }
主要是作了一些非空检查,须要注意的是,ChannelFactory对象的设置,前面的spark中在对Bootstrap初始化设置的时候调用了.channel(socketChannelClass)方法,这个方法以下:编程
public B channel(Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") )); }
建立了一个ReflectiveChannelFactory对象,并赋值给内部的channelFactory成员。这个工厂类会根据传进来的Class对象经过反射建立一个Channel实例。bootstrap
从这个方法的逻辑中能够看出来,建立一个链接的过程分为两个主要的步骤;api
值得注意的是,initAndRegister方法返回一个Future对象,这个类型一般用于异步机制的实现。在这里,若是注册没有当即成功的话,会给返回的futrue对象添加一个监听器,在注册成功之后创建tcp链接。promise
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { // 初始化一个Channel对象并注册到EventLoop中 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { // 若是注册失败,世界返回失败的future对象 if (!regFuture.isSuccess()) { return regFuture; } return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else {// 若是注册还在进行中,须要向future对象添加一个监听器,以便在注册成功的时候作一些工做,监听器实际上就是一个回调对象 // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); // 注册成功后仍然调用doResolveAndConnect0方法完成链接创建的过程 doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; }
仍然分为两个步骤:安全
注意看源码中的一段注释,这段注释对netty的线程模型的理解颇有帮助,大体意思是说:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 经过channel工厂类建立一个channel对象
channel = channelFactory.newChannel();
// 调用init方法对channel进行一些初始化的设置
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注册到EventLoopGroup中 ChannelFuture regFuture = config().group().register(channel); // 若是发生异常,须要关闭已经创建的链接 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
DEFAULT_SELECTOR_PROVIDER是默认的SelectorProvider对象,这时jdk中定义的一个类,主要做用是生成选择器selector对象和通道channel对象
public NioSocketChannel() { this(DEFAULT_SELECTOR_PROVIDER); }
newSocket中经过调用provider.openSocketChannel()方法建立了一个SocketChannel对象,它的默认实现是SocketChannelImpl。
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
而后通过几回调用,最后调用了下面的构造器,首先调用了父类AbstractNioByteChannel的构造器,
而后建立了一个SocketChannelConfig对象,这个类有点相似于门面模式,对NioSocketChannel对象和Socket对象的一些参数设置和获取的接口进行封装。
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
咱们在接着看父类AbstractNioByteChannel的构造方法
没有作任何工做,直接调用了父类的构造方法,注意这里多了一个参数SelectionKey.OP_READ,这个参数表示channel初始时的感兴趣的事件,channel刚建立好以后对read事件感兴趣
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
主要仍是调用父类的构造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { // 父类构造方法 super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { // 设置非阻塞 ch.configureBlocking(false); } catch (IOException e) { try { // 若是发生异常,关闭该channel ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
最关键的初始化逻辑在这个最顶层的基类中,其中很重的两个对象Unsafe对象和ChannelPipeline对象,前者封装了jdk底层api的调用,后者是实现netty对事件的链式处理的核心类。
protected AbstractChannel(Channel parent) { this.parent = parent; // 建立一个ChannelId对象,惟一标识该channel id = newId(); // Unsafe对象,封装了jdk底层的api调用 unsafe = newUnsafe(); // 建立一个DefaultChannelPipeline对象 pipeline = newChannelPipeline(); }
前面一小节,咱们主要简单分析了一下NioSocketChannel的初始化过程,能够看到最主要的逻辑在AbstractChannel的构造方法中,这里咱们看到了两个重要的类的建立过程。
回到AbstractBootstrap.initAndRegister方法中,在完成经过反射调用NioSocketChannel构造方法并建立一个实例后,紧接着就要对这个新建立的Channel实例进行初始化设置工做,咱们看一下Bootstrap对新建立的Channel的初始化过程:
设置属性
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// 向ChannelPipeline中添加一个处理器,这个处理器就是咱们以前设置的处理器
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0(); // 设置参数,最终是经过调用SocketChannelConfig的一些参数设置接口设置参数 synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); // 设置属性 synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } }
}
在完成channel的建立和初始化以后,咱们就要将这个channel注册到一个EventLoop中,NioNioEventLoop继承自MultithreadEventLoopGroup, 经过调用SingleThreadEventLoop的register方法完成注册
public ChannelFuture register(Channel channel) { return next().register(channel); }
能够看到,经过next()方法选出了其中的一个EventLoop进行注册。MultithreadEventLoopGroup是对多个真正的EventLoopGroup的封装,每一个实现了实际功能的真正的EventLoopGroup运行在一个线程内,
因此咱们接下来应该看单个的EventLoopGroup的注册方法。
这里建立了一个DefaultChannelPromise对象,用于做为返回值。
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
最终调用了Unsafe的register方法将channel绑定到当前的EventLoopGroup对象上。
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
调用register0完成注册
public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 作一些非空检查 if (eventLoop == null) { throw new NullPointerException("eventLoop"); } // 若是重复注册,经过future对象抛出一个异常 // 一个channel只能注册到一个EventLoopGroup对象上 if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } // 检查channel类型和EventLoopGroup类型是否匹配 if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } // 将channel内部的eventLoop成员设置为相应的对象 // 也就是将这个channel绑定到指定顶eventLoop上 AbstractChannel.this.eventLoop = eventLoop; // 这里作了一个判断,若是当前处于eventLoop对应的线程内,那么直接执行代码 // 若是当前运行的线程与eventLoop不是同一个,那么将这个注册的任务添加到eventLoop的任务队列中 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
这个方法实现了实际的注册逻辑,
若是是第一次注册,还须要触发一个channel存活的事件,让全部的handler做出相应的处理
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop // 将ChannelPromise设置为不可取消,并检查channel是否还存活,经过内部的jdk的channel检查是否存活 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // 是否第一次注册, // TODO 说明状况下会注册屡次?? boolean firstRegistration = neverRegistered; // 完成实际的注册,即底层api的调用 // 若是对于jdk Nio的通道的注册就是调用SelectableChannel.register(Selector sel, int ops, Object att) doRegister(); // 更新标志变量 neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. // 调用全部的已添加的处理器节点的ChannelHandler.handlerAdded方法 pipeline.invokeHandlerAddedIfNeeded(); // 经过future对象已经注册成功了 safeSetSuccess(promise); // 触发一个channel注册成功的事件,这个事件会在pipeline中传播, // 全部注册的handler会依次接收到该事件并做出相应的处理 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { // 若是是第一次注册,还须要触发一个channel存活的事件,让全部的handler做出相应的处理 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 // 开始接收读事件 // 对于Nio类型的channel, 经过调用jdk的相关api注册读事件为感兴趣的事件 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
到此,咱们就完成了对channel的建立,初始化,和注册到EventLoop过程的分析,整个过程看下来,其实并不复杂,只不过代码的嵌套比较深,继承结构复杂,有些简单的功能可能要看好几层才能找到真正实现的地方,因此还须要耐心和熟悉。这里,我把主干逻辑再提炼一下,去掉全部细枝末节的逻辑,一遍能有一个总体的认识:
接下来,咱们回到Bootstrap.doResolveAndConnect方法中,继续完成创建链接的过程的分析。
链接的创建在方法doResolveAndConnect0中实现:
这个方法的主要工做就是对远程地址进行解析,好比经过dns服务器对域名进行解析,
而后使用解析后的地址进行链接的创建,链接创建调用doConnect方法
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { try { final EventLoop eventLoop = channel.eventLoop(); // 获取一个地址解析器 final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop); // 若是解析器不支持该地址或者改地址已经被解析过了,那么直接开始建立链接 if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) { // Resolver has no idea about what to do with the specified remote address or it's resolved already. doConnect(remoteAddress, localAddress, promise); return promise; } // 对远程地址进行解析 final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress); if (resolveFuture.isDone()) { final Throwable resolveFailureCause = resolveFuture.cause(); if (resolveFailureCause != null) { // Failed to resolve immediately channel.close(); promise.setFailure(resolveFailureCause); } else { // Succeeded to resolve immediately; cached? (or did a blocking lookup) // 解析成功后进行链接 doConnect(resolveFuture.getNow(), localAddress, promise); } return promise; } // Wait until the name resolution is finished. // 给future对象添加一个回调,采用异步方法进行链接, resolveFuture.addListener(new FutureListener<SocketAddress>() { @Override public void operationComplete(Future<SocketAddress> future) throws Exception { if (future.cause() != null) { channel.close(); promise.setFailure(future.cause()); } else { doConnect(future.getNow(), localAddress, promise); } } }); } catch (Throwable cause) { promise.tryFailure(cause); } return promise; }
调用channel的connect方法完成链接过程。
也许是以前看scala代码习惯了,回过头来看java代码感受很冗余,一大堆代码就表达了那一点逻辑,感受信息密度过低,如今有不少人认为java会渐渐的没落,而最优可能取代java的语言中,scala绝对是强有力的竞争者之一,没有对比就没有伤害,跟java比,scala语言真的是简洁太多了,几句话就能把所要表达的逻辑精准而又直接地表达出来。好像向声明式编程更靠近了一点。
private static void doConnect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. final Channel channel = connectPromise.channel(); channel.eventLoop().execute(new Runnable() { @Override public void run() { if (localAddress == null) { // 调用 channel.connect方法完成链接 channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return pipeline.connect(remoteAddress, promise); }
这里稍微说明一下,tail是整个链条的尾节点,若是对netty比较熟悉的话,应该知道netty对于io事件的处理采用责任链的模式,即用户能够设置多个处理器,这些处理器组成一个链条,io事件在这个链条上传播,被特定的一些处理器所处理,而其中有两个特殊的处理器head和tail,他们分别是这个链条的头和尾,他们的存在主要是为了实现一些特殊的逻辑。
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { return tail.connect(remoteAddress, promise); }
中间通过几个调用以后,最终调用该方法。这里有一句关键代码findContextOutbound(MASK_CONNECT),这个方法的代码我就不贴了,大概说一下它的做用,更为具体的机制等后面分析Channelpipeline是在详细说明。这个方法会在处理器链中从后向前遍历,直到找到可以处理connect事件的处理器,可否处理某种类型的事件是经过比特位判断的,每一个AbstractChannelHandlerContext对象内部有一个int型变量用于存储标志各类类型事件的比特位。通常,connect事件会有头结点head来处理,也就是DefaultChannelPipeline.HeadContext类,因此咱们直接看DefaultChannelPipeline.HeadContext.connect方法
public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; } // 找到下一个可以进行connect操做的,这里用比特位来标记各类不一样类型的操做, final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { // 调用AbstractChannelHandlerContext.invokeConnect next.invokeConnect(remoteAddress, localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeConnect(remoteAddress, localAddress, promise); } }, promise, null); } return promise; }
public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { unsafe.connect(remoteAddress, localAddress, promise); }
unsafe对象的赋值:
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, HeadContext.class); unsafe = pipeline.channel().unsafe(); setAddComplete(); }
因此咱们直接看unsafe.connect
主要逻辑:
可见创建链接的核心方法是doConnect,这是一个抽象方法,咱们看NioSocketChannel,也就是tcp链接的创建过程,查看AbstractNioChannel的实现类发现还有UDP,SCTP等协议
public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // 检查promise状态,channel存活状态 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } try { // 防止重复链接 if (connectPromise != null) { // Already a connect in process. throw new ConnectionPendingException(); } boolean wasActive = isActive(); // 调用doConnect方法进行链接 if (doConnect(remoteAddress, localAddress)) { // 若是当即就链接成功了,那么将future对象设置为成功 fulfillConnectPromise(promise, wasActive); } else { connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); // 若是超时大于0,那么会在超时到达后检查是否链接成功 if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); // 若是connectPromise可以标记为失败,说明此时尚未链接成功,也就是链接超时了 // 此时须要关闭该通道 if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } // 向future对象添加一个回调,在future被外部调用者取消时将通道关闭 promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); } }
调用SocketUtils.connect创建链接
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
// 绑定指定的本地地址
if (localAddress != null) {
doBind0(localAddress);
}
// 这个变量标记创建链接的动做是否发起成功 // 成功发起创建链接的工做并不表示链接已经成功创建 boolean success = false; try { // 实际创建链接的语句 boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; // 返回链接是否已经成功创建 return connected; } finally { if (!success) { doClose(); } }
}
能够看到,最终是经过调用jdk的api来实现链接的创建,也就是SocketChannel.connect方法
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { // 调用jdk api创建链接,SocketChannel.connect return socketChannel.connect(remoteAddress); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
一句话,这代码是真的很深! 很是不直接,初次看的话,若是没有一个代码框架图在旁边参考,很容易迷失在层层的继承结构中,不少代码层层调用,真正有用的逻辑隐藏的很深,因此看这中代码必需要有耐心,有毅力,要有打破砂锅问到底的决心。不过这样的复杂的代码结构好处也是显而易见的,那就是良好的扩展性,你能够在任意层级进行扩展。
总结一下创建链接的过程,我认为能够归结为三个主要的方面: