public class Server {
private ServerBootstrap serverBootstrap;
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workGroup;
public static void main(String[] args) throws InterruptedException {
System.out.println("服务启动");
Server server = new Server();
server.start();
}
private void start() throws InterruptedException {
try {
serverBootstrap=new ServerBootstrap();
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup(4);
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new InitHandler())
.childHandler(new IOChannelInitialize());
ChannelFuture future = serverBootstrap.bind(8802).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private class IOChannelInitialize extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel");
ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0));
ch.pipeline().addLast(new IOHandler());
}
}
}
复制代码
步骤说明java
1.1 建立 ServerBootstrap 实例,它是 netty 的启动辅助类,提供了一系列的方法用于设置服务 端启动相关的参数。底层经过门面模式对各类能力进行抽象和封装,尽可能不须要用户跟过 多的底层 API 打交道,下降用户的开发难度react
1.2 NioEventLoopGroup 是 netty Reactor 线程池,bossGroup 监听和 accept 客户端链接,workGroup 则处理 IO ,编解码promise
1.3 绑定服务端 NioServerSocketChannel安全
1.4 设置一些参数bash
1.5 初始化 pipeline 并绑定 handler ,pipeline 是一个负责处理网络事件的职责链,负责管理和执行 ChannelHandler ,设置系统提供的 IdleStateHandler 和自定义 IOHandler网络
1.6 serverBootstrap.bind(8802) 这里才是启动服务端绑定端口异步
1.7 future.channel().closeFuture().sync(); 等待服务端关闭socket
1.8 优雅关闭ide
NioEventLoopGroup 不单单是 I/O 线程,除了负责 I/O 的读写,还负责系统 Task 和定时任务oop
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
复制代码
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
复制代码
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
}
复制代码
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
复制代码
继续,如下是精简代码
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
...
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nThreads; i ++) {
...
children[i] = newChild(threadFactory, args);
...
}
复制代码
MultithreadEventExecutorGroup 实现了线程的建立和线程的选择,咱们看看 newChild 方法( NioEventLoopGroup 类的方法),newChild 实例化线程
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
复制代码
建立了一个 NioEventLoop
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
复制代码
跟着 super
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
复制代码
代码有精简,继续
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
SingleThreadEventExecutor.this.run();
}
}
});
复制代码
在这里实例化了一个线程,并在 run 中调用 SingleThreadEventExecutor 的 run 方法,这个线程在哪里启动的呢,咱们继续往下看
总结:
NioEventLoopGroup 实际就是 Reactor 线程池,负责调度和执行客户端的接入、网络读写事件的处理、用户自定义任务和定时任务的执行。
ServerBootstrap 是服务端的启动辅助类,父类是 AbstractBootstrap ,与之相对应的客户端启动辅助类是 Bootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
volatile EventLoopGroup group;
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress;
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
private volatile ChannelHandler handler;
}
复制代码
将 bossGroup 传给父类,workGroup 赋值给 serverBootstrap 的 childGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
复制代码
serverBootstrap.channel(NioServerSocketChannel.class)
复制代码
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}
复制代码
继续跟 new BootstrapChannelFactory
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
BootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
复制代码
BootstrapChannelFactory 是一个继承了 ChannelFactory 的内部类,从名称上就能看出,这是一个 channel 工厂类,重写了父类的 newChannel 方法,经过反射建立 NioServerSocketChannel 实例,后面会告诉你是在哪里调用到的
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
复制代码
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
复制代码
这里的 option 方法是父类 AbstractBootstrap 的方法,options 是一个有序的非线程安全的双向链表,加锁添加
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
if (childOption == null) {
throw new NullPointerException("childOption");
}
if (value == null) {
synchronized (childOptions) {
childOptions.remove(childOption);
}
} else {
synchronized (childOptions) {
childOptions.put(childOption, value);
}
}
return this;
}
复制代码
childOption 是子类 serverBootstrap 的方法
childOption 和 option 的区别:
option : 主要是设置 ServerChannel 的一些选项
childOption : 主要设置 ServerChannel 的子 channel 的选项,即 option
针对的是 boss 线程而 childOption 针对的是 work 线程池
serverBootstrap.handler(new InitHandler())
复制代码
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return (B) this;
}
复制代码
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
复制代码
handler 和 childHandler 的区别
Handler 是属于服务端 NioServerSocketChannel ,只会建立一次 childHandler 是属于每个新建的 NioSocketChannel ,每当有一个链接上来,都会调用
serverBootstrap.bind(8802).sync()
复制代码
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
复制代码
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
复制代码
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
复制代码
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
init(channel);
ChannelFuture regFuture = group().register(channel);
return regFuture;
}
复制代码
channelFactory 是 serverBootstrap.channel() 时建立的,在这里调用反射建立 NioServerSocketChannel 实例
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
复制代码
options() 是 serverBootstrap.option() 赋值的 AbstractBootstrap 类的 options 双向链表成员变量,在这里将 options 和 attrs 注入 channel 中 P.addLast() 为 NioServerSocketChannel 加入新的 handler (处理器),这里 pipeline 相似于 Servlet 的过滤器,管理全部 handler
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
复制代码
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
复制代码
eventLoop.inEventLoop() 用来判断启动线程与当前线程是否相同,相同表示已经启动,不一样则有两种可能:未启动或者线程不一样。
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
复制代码
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
thread.start();
}
}
}
复制代码
咱们在最开始2.1里面 SingleThreadEventExecutor 构造方法内的 thread 就是在这里启动的,咱们再回到2.1的
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
SingleThreadEventExecutor.this.run();
}
}
});
复制代码
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
复制代码
在这里异步执行,轮询 select 客户端的 accept ,而且 runAllTasks 全部的任务
@Override
public ChannelFuture register(Channel channel) {
...
channel.unsafe().register(this, promise);
return promise;
}
复制代码
如下是精简后的代码(位于 AbstractChannel 类的 AbstractUnsafe 内部类)
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
...
}
复制代码
private void register0(ChannelPromise promise) {
...
doRegister();
...
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
...
}
复制代码
继续(位于 AbstractNioChannel 类)
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
...
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
...
}
}
复制代码
将 NioServerSocketChannel 注册到 boss 线程池 NioEventLoop 的 Selector 上。
在这里应该注册 OP_ACCEPT(16) 到多路复用器上
注册0的缘由:
(1)注册方法是多态的,它既能够被 NioServerSocketChannel 用来监听客户端的链接接入,也能够注册 SocketChannel 用来监听网络读或写操做
(2)经过 SelectionKey 的 interestOps(int ops) 方法能够方便地修改监听操做位
再看 pipeline.fireChannelActive()
@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
复制代码
@Override
public Channel read() {
pipeline.read();
return this;
}
复制代码
@Override
public ChannelPipeline read() {
tail.read();
return this;
}
复制代码
@Override
public ChannelHandlerContext read() {
...
next.invokeRead();
...
}
复制代码
private void invokeRead() {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
复制代码
进到 HeadContext 的 read
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
复制代码
@Override public final void beginRead() { ... doBeginRead(); ... }
@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
复制代码
最终在这里将 selectionKey 的监听操做位改成 OP_READ
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
复制代码
将方法丢到 reactor 线程池任务队列中执行,会先判断注册是否成功,成功则继续执行bind方法
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
复制代码
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
复制代码
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
...
next.invokeBind(localAddress, promise);
...
}
复制代码
因为 bind 事件是出站事件,寻找出站的 handler ,执行 invokeBind( ) 方法
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
复制代码
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
复制代码
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
doBind(localAddress);
...
}
复制代码
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
复制代码
通过多层 bind 深刻,最后在这里能够看到,仍是会调用Java底层的nio进行 socket bind 自此,服务端启动流程解析完毕,咱们总结一下
① 经过 ServerBootstrap 辅助启动类,配置了 reactor 线程池,服务端 Channel ,一些配置参数,客户端链接后的 handler
② 将 ServerBootstrap 的值初始化,并注册 OP_ACCEPT 到多路复用器
③ 启动 reactor 线程池,不断循环监听链接,处理任务 ④ 绑定端口