本系列为本人Java编程方法论 响应式解读系列的Webflux
部分,现分享出来,前置知识Rxjava2 ,Reactor的相关解读已经录制分享视频,并发布在b站,地址以下:java
Rxjava源码解读与分享:https://www.bilibili.com/video/av34537840react
Reactor源码解读与分享:https://www.bilibili.com/video/av35326911web
NIO源码解读相关视频分享: https://www.bilibili.com/video/av43230997编程
NIO源码解读视频相关配套文章:bootstrap
BIO到NIO源码的一些事儿之NIO 下 之 Selectoride
BIO到NIO源码的一些事儿之NIO 下 Buffer解读 上oop
BIO到NIO源码的一些事儿之NIO 下 Buffer解读 下
Java编程方法论-Spring WebFlux篇 01 为何须要Spring WebFlux 上
Java编程方法论-Spring WebFlux篇 01 为何须要Spring WebFlux 下
Java编程方法论-Spring WebFlux篇 Reactor-Netty下HttpServer 的封装
其中,Rxjava与Reactor做为本人书中内容将不对外开放,你们感兴趣能够花点时间来观看视频,本人对着两个库进行了全面完全细致的解读,包括其中的设计理念和相关的方法论,也但愿你们能够留言纠正我其中的错误。
本书主要针对Netty
服务器来说,因此读者应具有有关Netty
的基本知识和应用技能。接下来,咱们将对Reactor-netty
从设计到实现的细节一一探究,让你们真的从中学习到好的封装设计理念。本书在写时所参考的最新版本是Reactor-netty 0.7.8.Release
这个版本,但如今已有0.8
版本,并且0.7
与0.8
版本在源码细节有不小的变更,这点给你们提醒下。我会针对0.8
版本进行全新的解读并在将来出版的书中进行展现。
这里,咱们会首先解读Reactor Netty
是如何针对Netty
中Bootstrap
的ChildHandler
进行封装以及响应式拓展等一些细节探究。接着,咱们会涉及到HttpHandler
的引入,以此来对接咱们上层web服务。
由于这是咱们切入自定义逻辑的地方,因此,咱们首先来关注下与其相关的ChannelHandler
,以及前文并未提到的,服务器究竟是如何启动以及如何经过响应式来作到优雅的关闭,首先咱们会接触关闭服务器的设定。
咱们再回到reactor.ipc.netty.http.server.HttpServer#HttpServer
这个构造器中,由上一章咱们知道请求是HTTP
层面的(应用层),必须依赖于TCP
的链接实现,因此这里就要有一个TCPServer
的实现,其实就是Channel
上Pipeline
的操做。
//reactor.ipc.netty.http.server.HttpServer#HttpServer
private HttpServer(HttpServer.Builder builder) {
HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder();
...
this.options = serverOptionsBuilder.build();
this.server = new TcpBridgeServer(this.options);
}
复制代码
这里的话在DiscardServer Demo
中,TCPServer
咱们主要针对childHandler
的内容的封装,也就是以下内容:
b.group(bossGroup, workerGroup)
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
...
复制代码
那childHandler
到底表明什么类型,咱们能够在io.netty.bootstrap.ServerBootstrap
找到其相关定义:
//io.netty.bootstrap.ServerBootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
public ServerBootstrap() { }
private ServerBootstrap(ServerBootstrap bootstrap) {
super(bootstrap);
childGroup = bootstrap.childGroup;
childHandler = bootstrap.childHandler;
synchronized (bootstrap.childOptions) {
childOptions.putAll(bootstrap.childOptions);
}
synchronized (bootstrap.childAttrs) {
childAttrs.putAll(bootstrap.childAttrs);
}
}
...
}
复制代码
由字段定义可知,childHandler
表明的是ChannelHandler
,顾名思义,是关于Channel
的一个处理类,这里经过查看其定义可知它是用来拦截处理Channel
中的I/O
事件,并经过Channel
下的ChannelPipeline
将处理后的事件转发到其下一个处理程序中。 那这里如何实现DiscardServer Demo
中的b.childHandler(xxx)
行为,经过DiscardServer Demo
咱们能够知道,咱们最关注的实际上是ch.pipeline().addLast(new DiscardServerHandler());
中的DiscardServerHandler
实现,可是咱们发现,这个核心语句是包含在ChannelInitializer
内,其继承了ChannelInboundHandlerAdapter
,它的最顶层的父类接口就是ChannelHandler
,也就对应了io.netty.bootstrap.ServerBootstrap
在执行b.childHandler(xxx)
方法时,其须要传入ChannelHandler
类型的设定。这里就能够分拆成两步来作,一个是b.childHandler(xxx)
行为包装,一个是此ChannelHandler
的定义拓展实现。 那么,为了API
的通用性,咱们先来看Netty的客户端的创建的一个Demo(摘自本人RPC项目的一段代码):
private Channel createNewConChannel() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new RpcDecoder(10 * 1024 * 1024))
.addLast(new RpcEncoder())
.addLast(new RpcClientHandler())
;
}
});
try {
final ChannelFuture f =
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.TCP_NODELAY, true)
.connect(ip, port).sync(); // <1>
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
LOGGER.info("Connect success {} ", f);
}
});
final Channel channel = f.channel();
channel.closeFuture().addListener((ChannelFutureListener) future -> LOGGER.info("Channel Close {} {}", ip, port));
return channel;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
复制代码
将Netty
的客户端与服务端的创建进行对比,咱们能够发现b.childHandler(xxx)
与相应的启动(Server
端的话是serverBootstrap.bind(port).sync();
,客户端的话是上述Demo中<1>
处的内容)均可以抽取出来做为一个接口来进行功能的聚合,而后和相应的Server
(如TcpServer
)或Client
(如TcpClient
)进行其特有的实现。在Reactor Netty
内的话,就是定义一个reactor.ipc.netty.NettyConnector
接口,除了作到上述的功能以外,为了适配响应式的理念,也进行了响应式的设计。即在netty
客户端与服务端在启动时,能够保存其状态,以及提供结束的对外接口方法,这种在响应式中能够很优雅的实现。接下来,咱们来看此reactor.ipc.netty.NettyConnector
的接口定义:
//reactor.ipc.netty.NettyConnector
public interface NettyConnector<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> {
Mono<? extends NettyContext> newHandler(BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> ioHandler);
...
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
BlockingNettyContext start(T handler) {
return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());
}
}
...
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
void startAndAwait(T handler, @Nullable Consumer<BlockingNettyContext> onStart) {
BlockingNettyContext facade = new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());
facade.installShutdownHook();
if (onStart != null) {
onStart.accept(facade);
}
facade.getContext()
.onClose()
.block();
}
复制代码
其中,newHandler
能够是咱们上层web处理,里面包含了INBOUND, OUTBOUND
,具体的话就是request,response
,后面会专门来涉及到这点。 接着就是提供了一个启动方法start
,其内建立了一个BlockingNettyContext
实例,而逻辑的核心就在其构造方法内,就是要将配置好的服务器启动,整个启动过程仍是放在newHandler(handler)
中,其返回的Mono<? extends NettyContext>
中的NettyContext
类型元素是管理io.netty.channel.Channel
上下文信息的一个对象,这个对象更多的是一些无状态的操做,并不会对此对象作什么样的改变,也是经过对此对象的一个Mono<? extends NettyContext>
包装而后经过block
产生订阅,来作到sync()
的效果,经过,经过block
产生订阅后返回的NettyContext
对象,可使中断关闭服务器的操做也能够作到更优雅:
public class BlockingNettyContext {
private static final Logger LOG = Loggers.getLogger(BlockingNettyContext.class);
private final NettyContext context;
private final String description;
private Duration lifecycleTimeout;
private Thread shutdownHook;
public BlockingNettyContext(Mono<? extends NettyContext> contextAsync, String description) {
this(contextAsync, description, Duration.ofSeconds(45));
}
public BlockingNettyContext(Mono<? extends NettyContext> contextAsync, String description, Duration lifecycleTimeout) {
this.description = description;
this.lifecycleTimeout = lifecycleTimeout;
this.context = contextAsync
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be started within " + lifecycleTimeout.toMillis() + "ms")))
.doOnNext(ctx -> LOG.info("Started {} on {}", description, ctx.address()))
.block();
}
...
/** * Shut down the {@link NettyContext} and wait for its termination, up to the * {@link #setLifecycleTimeout(Duration) lifecycle timeout}. */
public void shutdown() {
if (context.isDisposed()) {
return;
}
removeShutdownHook(); //only applies if not called from the hook's thread
context.dispose();
context.onClose()
.doOnError(e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e))
.doOnTerminate(() -> LOG.info("Stopped {} on {}", description, context.address()))
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
.block();
}
...
}
复制代码
这里,咱们来接触下在Reactor中并无深刻接触的blockXXX()
操做,其实整个逻辑仍是比较简单的,这里拿reactor.core.publisher.Mono#block()
来说,就是获取并返回这个下发的元素:
//reactor.core.publisher.Mono#block()
@Nullable
public T block() {
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet();
}
//reactor.core.publisher.BlockingMonoSubscriber
final class BlockingMonoSubscriber<T> extends BlockingSingleSubscriber<T> {
@Override
public void onNext(T t) {
if (value == null) {
value = t;
countDown();
}
}
@Override
public void onError(Throwable t) {
if (value == null) {
error = t;
}
countDown();
}
}
//reactor.core.publisher.BlockingSingleSubscriber
abstract class BlockingSingleSubscriber<T> extends CountDownLatch implements InnerConsumer<T>, Disposable {
T value;
Throwable error;
Subscription s;
volatile boolean cancelled;
BlockingSingleSubscriber() {
super(1);
}
...
@Nullable
final T blockingGet() {
if (Schedulers.isInNonBlockingThread()) {
throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
}
if (getCount() != 0) {
try {
await();
}
catch (InterruptedException ex) {
dispose();
throw Exceptions.propagate(ex);
}
}
Throwable e = error;
if (e != null) {
RuntimeException re = Exceptions.propagate(e);
//this is ok, as re is always a new non-singleton instance
re.addSuppressed(new Exception("#block terminated with an error"));
throw re;
}
return value;
}
...
@Override
public final void onComplete() {
countDown();
}
}
复制代码
能够看到,此处使用的CountDownLatch
的一个特性,在元素下发赋值以后,等待数值减1,这里恰好也就这一个限定(由super(1)
定义),解除所调用的blockingGet
中的等待,获得所需的值,这里,为了保证block()
的语义,其onComplete
方法也调用了countDown();
,即当上游为Mono<Void>
时,作到匹配。