不管服务端或客户端启动时都用到了
NioEventLoopGroup
,从名字就能够看出来它是NioEventLoop
的组合,是Netty多线程的基石。git
NioEventLoopGroup
继承自
MultithreadEventLoopGroup
,多提供了两个方法
setIoRatio
和
rebuildSelectors
,一个用于设置
NioEventLoop
用于IO处理的时间占比,另外一个是从新构建Selectors,来处理epoll空轮询致使CPU100%的bug。这个两个的用处在介绍
NioEventLoop
的时候在详细介绍。其它的方法都在接口中有定义,先看下
EventExecutorGroup
。
EventExecutorGroup
继承自ScheduledExecutorService
和Iterable
。这意味着EventExecutorGroup
拥有定时处理任务的能力,同时自己能够迭代。它提供的方法有:github
/**
* 是否全部事件执行器都处在关闭途中或关闭完成
*/
boolean isShuttingDown();
/**
* 优雅关闭
*/
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
/**
* 返回线程池终止时的异步结果
*/
Future<?> terminationFuture();
void shutdown();
List<Runnable> shutdownNow();
/**
* 返回一个事件执行器
*/
EventExecutor next();
复制代码
其中shutdown
和shutdownNow
被标记为过期,不建议使用。EventExecutorGroup
还重写ScheduledExecutorService
接口的方法,用于返回自定义的Future
。数组
EventLoopGroup
继承自EventExecutorGroup
,它和EventExecutorGroup
想比多了注册Channel
和ChannelPromise
,同时从新next方法返回EventLoop
。bash
建立NioEventLoopGroup
时,最终都会调用MultithreadEventExecutorGroup
的构造方法。多线程
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 线程数必须大于0
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 没指定Executor就建立新的Executor
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 建立EventExecutor数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i++) {
// 建立结果标识
boolean success = false;
try {
// 建立EventExecutor对象
children[i] = newChild(executor, args);
// 设置建立成功
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 建立失败,关闭全部已建立的EventExecutor
if (!success) {
// 关闭全部已建立的EventExecutor
for (int j = 0; j < i; j++) {
children[j].shutdownGracefully();
}
// 确保全部已建立的EventExecutor已关闭
for (int j = 0; j < i; j++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 建立EventExecutor选择器
chooser = chooserFactory.newChooser(children);
// 建立监听器,用于EventExecutor终止时的监听
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// 当EventExecutor所有关闭时
if (terminatedChildren.incrementAndGet() == children.length) {
// 设置结果,并通知监听器们。
terminationFuture.setSuccess(null);
}
}
};
// 给每一个EventExecutor添加上监听器
for (EventExecutor e : children) {
e.terminationFuture().addListener(terminationListener);
}
// 建立只读的EventExecutor集合
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
复制代码
整个构造方法作的就是EventExecutor
的建立,包括建立的异常处理,成功通知等。 AbstractEventExecutorGroup
、MultithreadEventLoopGroup
和NioEventLoopGroup
内部没有特殊之处,就不拓展了。异步
文中帖的代码注释全在:KAMIJYOUDOUMA, 有兴趣的童鞋能够关注一下。ide
本篇到此结束,若是读完以为有收获的话,欢迎点赞、关注、加公众号【贰级天災】,查阅更多精彩历史!!! oop