Reactor-Netty 版本:java
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.8.10.RELEASE</version>
</dependency>
复制代码
示例程序:react
public class TcpServerApplication {
public static void main(String[] args) {
DisposableServer server = TcpServer
.create()
.host("127.0.0.1")
.port(8080)
.handle((inbound, outbound) ->
inbound.receive().asString().log().then()
)
.bindNow();
server.onDispose()
.block();
}
}
public class TcpClientApplication {
public static void main(String[] args) throws InterruptedException {
TcpClient client = TcpClient.create() // 1 TcpClientConnect
.host("127.0.0.1") // 2 TcpClientBootstrap
.port(8080) // 3 TcpClientBootstrap
.handle((inbound, outbound) -> outbound.sendString(Mono.just("Hello World!")).then()); // 4 TcpClientDoOn
client.connectNow(); // 5 Connection
Thread.sleep(3000);
}
}
复制代码
TcpServerApplication 输出结果:bootstrap
[ INFO] (reactor-tcp-nio-2) onSubscribe(FluxHandle.HandleSubscriber)
[ INFO] (reactor-tcp-nio-2) request(unbounded)
[ INFO] (reactor-tcp-nio-2) onNext(Hello World!)
[ INFO] (reactor-tcp-nio-2) cancel()
复制代码
基本逻辑是:Server 端绑定 8080 端口并监听请求;Client 端链接上端口后发送字符串 Hello World!
;Server 端口收到请求后打印出来。app
下面进行具体源码分析。tcp
public static TcpClient create() {
return create(TcpResources.get());
}
/** * 最终返回的是 TcpClientConnect * 从入参可知,TcpClientConnect 关注的是链接管理 ConnectionProvider */
public static TcpClient create(ConnectionProvider provider) {
return new TcpClientConnect(provider);
}
public class TcpResources implements ConnectionProvider, LoopResources {
final ConnectionProvider defaultProvider;
final LoopResources defaultLoops;
protected TcpResources(LoopResources defaultLoops, ConnectionProvider defaultProvider) {
this.defaultLoops = defaultLoops;
this.defaultProvider = defaultProvider;
}
/** * 该静态方法最终返回的是 TcpResources,包括: * ConnectionProvider: 管理链接 * LoopResources: 管理线程 */
public static TcpResources get() {
// 若是不存在,那么建立 TcpResources;不然,直接返回 TcpResources
return getOrCreate(tcpResources, null, null, ON_TCP_NEW, "tcp");
}
复制代码
/** * 1. 最终返回的是 TcpClientBootstrap * 2. TcpClientBootstrap 类有一个 bootstrapMapper, 是一个 Function: b -> TcpUtils.updateHost(b, host),关注两个地方:b 是一个 Bootstrap 对象,b 什么时候生成?Function 接口的 apply 方法何时被执行?能够看到 TcpClientBootstrap 类的 configure() 方法同时知足了上面 2 个地方,所以只须要关注该方法什么时候被调用便可。 */
public final TcpClient host(String host) {
Objects.requireNonNull(host, "host");
return bootstrap(b -> TcpUtils.updateHost(b, host));
}
public final TcpClient bootstrap(Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) {
return new TcpClientBootstrap(this, bootstrapMapper);
}
final class TcpClientBootstrap extends TcpClientOperator {
final Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper;
TcpClientBootstrap(TcpClient client,
Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) {
super(client);
this.bootstrapMapper = Objects.requireNonNull(bootstrapMapper, "bootstrapMapper");
}
@Override
public Bootstrap configure() {
return Objects.requireNonNull(bootstrapMapper.apply(source.configure()), "bootstrapMapper");
}
}
复制代码
/** * 和 host(String host) 方法相似 */
public final TcpClient port(int port) {
return bootstrap(b -> TcpUtils.updatePort(b, port));
}
复制代码
/** * 最终返回的是 TcpClientDoOn; * handler 的入参是 BiFunction,而且在 doOnConnected 方法中直接调用了 apply 方法; * BiFunction 返回的 Publisher 也直接调用了 subscribe 方法; * 所以,只须要关注 doOnConnected 方法的入参 Consumer 什么时候被调用便可 */
public final TcpClient handle(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler) {
Objects.requireNonNull(handler, "handler");
return doOnConnected(c -> {
if (log.isDebugEnabled()) {
log.debug(format(c.channel(), "Handler is being applied: {}"), handler);
}
Mono.fromDirect(handler.apply((NettyInbound) c, (NettyOutbound) c))
.subscribe(c.disposeSubscriber());
});
}
public final TcpClient doOnConnected(Consumer<? super Connection> doOnConnected) {
Objects.requireNonNull(doOnConnected, "doOnConnected");
return new TcpClientDoOn(this, null, doOnConnected, null);
}
final class TcpClientDoOn extends TcpClientOperator implements ConnectionObserver {
final Consumer<? super Bootstrap> onConnect;
// onConnected 即 handle 方法中调用的 doOnConnected 的 Consumer
final Consumer<? super Connection> onConnected;
final Consumer<? super Connection> onDisconnected;
TcpClientDoOn(TcpClient client,
@Nullable Consumer<? super Bootstrap> onConnect,
@Nullable Consumer<? super Connection> onConnected,
@Nullable Consumer<? super Connection> onDisconnected) {
// 继承上一个 TcpClient
super(client);
this.onConnect = onConnect;
this.onConnected = onConnected;
this.onDisconnected = onDisconnected;
}
@Override
public Bootstrap configure() {
Bootstrap b = source.configure();
ConnectionObserver observer = BootstrapHandlers.connectionObserver(b);
// 注意:这里设置了 ConnectionObserver,后面会讲到
BootstrapHandlers.connectionObserver(b, observer.then(this));
return b;
}
@Override
public Mono<? extends Connection> connect(Bootstrap b) {
if (onConnect != null) {
return source.connect(b)
.doOnSubscribe(s -> onConnect.accept(b));
}
return source.connect(b);
}
@Override
public void onStateChange(Connection connection, State newState) {
// onConnected 在这里被调用,即 connection 状态改变时
if (onConnected != null && newState == State.CONFIGURED) {
onConnected.accept(connection);
return;
}
if (onDisconnected != null) {
if (newState == State.DISCONNECTING) {
connection.onDispose(() -> onDisconnected.accept(connection));
}
else if (newState == State.RELEASED) {
onDisconnected.accept(connection);
}
}
}
}
复制代码
// 设置超时 45s
public final Connection connectNow() {
return connectNow(Duration.ofSeconds(45));
}
public final Connection connectNow(Duration timeout) {
Objects.requireNonNull(timeout, "timeout");
try {
// 这里 connect() 方法返回的是 Mono
return Objects.requireNonNull(connect().block(timeout), "aborted");
}
catch (IllegalStateException e) {
...
}
}
// 返回的是 Mono
public final Mono<? extends Connection> connect() {
...
return connect(b);
}
// block 方法中直接开始订阅
public T block(Duration timeout) {
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS);
}
final T blockingGet(long timeout, TimeUnit unit) {
...
if (getCount() != 0) {
try {
if (!await(timeout, unit)) {
dispose(); // 超时取消订阅
throw new IllegalStateException("Timeout on blocking read for " + timeout + " " + unit);
}
}
catch (InterruptedException ex) {
dispose();
RuntimeException re = Exceptions.propagate(ex);
//this is ok, as re is always a new non-singleton instance
re.addSuppressed(new Exception("#block has been interrupted"));
throw re;
}
}
...
}
复制代码
由以上分析可知,在最后的 connectNow() 方法中,才开始真正的订阅执行。下面继续分析 connect 方法。ide
public final Mono<? extends Connection> connect() {
Bootstrap b;
try {
// 1. 获取默认的 Bootstrap
b = configure();
}
catch (Throwable t) {
Exceptions.throwIfJvmFatal(t);
return Mono.error(t);
}
// 2. connect(b)
return connect(b);
}
public Bootstrap configure() {
return DEFAULT_BOOTSTRAP.clone();
}
static final Bootstrap DEFAULT_BOOTSTRAP =
new Bootstrap().option(ChannelOption.AUTO_READ, false) .remoteAddress(InetSocketAddressUtil.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT));
复制代码
继续看 connect(Bootstrap b) 方法:函数
// 这是一个抽象方法,不少继承类都实现了该方法。根据以前的代码分析,首先调用的应该是 TcpClientDoOn 类
public abstract Mono<? extends Connection> connect(Bootstrap b);
// TcpClientDoOn 类
public Mono<? extends Connection> connect(Bootstrap b) {
if (onConnect != null) {
return source.connect(b)
.doOnSubscribe(s -> onConnect.accept(b));
}
// 往上传递,source 表明上一个 TcpClient;最终传递到初始的 TcpClientConnect
return source.connect(b);
}
// TcpClientConnect 类
final ConnectionProvider provider;
public Mono<? extends Connection> connect(Bootstrap b) {
// 填充 b 的属性
if (b.config()
.group() == null) {
TcpClientRunOn.configure(b,
LoopResources.DEFAULT_NATIVE,
TcpResources.get(),
maxConnections != -1);
}
// 最终调用这个方法
return provider.acquire(b);
}
复制代码
上面讲到 connect 方法最终调用的是 ConnectionProvider 类中的方法。ConnectionProvider 在以前的分析中出现过,即TcpResources.get() 方法返回的 TcpResources 对象中包含这个属性。oop
// 建立默认的 TcpResources
static <T extends TcpResources> T create(@Nullable T previous, @Nullable LoopResources loops, @Nullable ConnectionProvider provider, String name, BiFunction<LoopResources, ConnectionProvider, T> onNew) {
if (previous == null) {
loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
// 建立 ConnectionProvider
provider = provider == null ? ConnectionProvider.elastic(name) : provider;
}
else {
loops = loops == null ? previous.defaultLoops : loops;
provider = provider == null ? previous.defaultProvider : provider;
}
return onNew.apply(loops, provider);
}
}
static ConnectionProvider elastic(String name) {
// 这里的第 2 个入参 PoolFactory 又是一个函数式接口,所以对象的生成时间点在于什么时候调用 PoolFactory.newPool 方法; 生成的 ChannelPool 类型为 SimpleChannelPool。
return new PooledConnectionProvider(name,
(bootstrap, handler, checker) -> new SimpleChannelPool(bootstrap,
handler,
checker,
true,
false));
}
final class PooledConnectionProvider implements ConnectionProvider {
interface PoolFactory {
ChannelPool newPool(Bootstrap b, ChannelPoolHandler handler, ChannelHealthChecker checker);
}
final ConcurrentMap<PoolKey, Pool> channelPools;
final String name;
final PoolFactory poolFactory;
final int maxConnections;
PooledConnectionProvider(String name, PoolFactory poolFactory) {
this.name = name;
this.poolFactory = poolFactory;
this.channelPools = PlatformDependent.newConcurrentHashMap();
this.maxConnections = -1;
}
...
}
复制代码
如今回到 provider.acquire(b) 方法,能够知道调用的是 PooledConnectionProvider 类中的方法,继续分析:源码分析
// Map 结构,每一个 (remote address, handler) 组合都有一个链接池
final ConcurrentMap<PoolKey, Pool> channelPools;
final String name;
// 经过 poolFactory 生成 ChannelPool
final PoolFactory poolFactory;
final int maxConnections;
/** * 主要做用是从链接池中获取链接 * 首先须要找到对应的链接池, 经过 channelPools.get(holder) * 若是不存在,那么建立新的链接池,并加入到 channelPools 中 * 最后调用 disposableAcquire(sink, obs, pool, false); */
public Mono<Connection> acquire(Bootstrap b) {
return Mono.create(sink -> {
Bootstrap bootstrap = b.clone();
// TODO:
ChannelOperations.OnSetup opsFactory =
BootstrapHandlers.channelOperationFactory(bootstrap);
// TODO: 链接生命周期的监听器
ConnectionObserver obs = BootstrapHandlers.connectionObserver(bootstrap);
// 懒加载,这里须要设置 bootstrap 的 remote address(ip:port)
NewConnectionProvider.convertLazyRemoteAddress(bootstrap);
// 每一个 (remote address, handler) 都有一个 Pool
ChannelHandler handler = bootstrap.config().handler();
PoolKey holder = new PoolKey(bootstrap.config().remoteAddress(),
handler != null ? handler.hashCode() : -1);
Pool pool;
for (; ; ) {
// 直接获取
pool = channelPools.get(holder);
if (pool != null) {
break;
}
// 不存在则建立新的链接池
pool = new Pool(bootstrap, poolFactory, opsFactory);
if (channelPools.putIfAbsent(holder, pool) == null) {
if (log.isDebugEnabled()) {
log.debug("Creating new client pool [{}] for {}",
name,
bootstrap.config()
.remoteAddress());
}
break;
}
// 关闭多建立的 pool
pool.close();
}
disposableAcquire(sink, obs, pool, false);
});
}
Pool(Bootstrap bootstrap,
PoolFactory provider,
ChannelOperations.OnSetup opsFactory) {
this.bootstrap = bootstrap;
this.opsFactory = opsFactory;
// 建立新的链接池
this.pool = provider.newPool(bootstrap, this, this);
this.defaultGroup = bootstrap.config()
.group();
HEALTHY = defaultGroup.next()
.newSucceededFuture(true);
UNHEALTHY = defaultGroup.next()
.newSucceededFuture(false);
}
复制代码
继续 disposableAcquire 方法,ui
static void disposableAcquire(MonoSink<Connection> sink, ConnectionObserver obs, Pool pool, boolean retried) {
// 获取 Channel
Future<Channel> f = pool.acquire();
DisposableAcquire disposableAcquire =
new DisposableAcquire(sink, f, pool, obs, retried);
// 设置监听器, 该方法最终会调用 disposableAcquire.operationComplete() 方法,operationComplete() 方法会调用 disposableAcquire.run()
f.addListener(disposableAcquire);
sink.onCancel(disposableAcquire);
}
final static class DisposableAcquire implements Disposable, GenericFutureListener<Future<Channel>>, ConnectionObserver , Runnable {
final Future<Channel> f;
final MonoSink<Connection> sink;
final Pool pool;
final ConnectionObserver obs;
final boolean retried;
DisposableAcquire(MonoSink<Connection> sink,
Future<Channel> future,
Pool pool,
ConnectionObserver obs,
boolean retried) {
this.f = future;
this.pool = pool;
this.sink = sink;
this.obs = obs;
this.retried = retried;
}
// 当链接的状态改变时,调用 obs.onStateChange;而这里的 obs 就是咱们在 TcpClientDoOn.configure() 方法中设置的;因此一旦链接状态改变,就会调用 TcpClient.handle 中的方法
@Override
public void onStateChange(Connection connection, State newState) {
if (newState == State.CONFIGURED) {
sink.success(connection);
}
obs.onStateChange(connection, newState);
}
...
}
复制代码
DisposableAcquire 是一个监听器,监听的是链接,即上面代码中的 Future f = pool.acquire()。那么这个 f 是什么类型呢?以前的代码分析中已经知道 pool 为 SimpleChannelPool 类型。
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, boolean releaseHealthCheck, boolean lastRecentUsed) {
this.handler = checkNotNull(handler, "handler");
this.healthCheck = checkNotNull(healthCheck, "healthCheck");
this.releaseHealthCheck = releaseHealthCheck;
// Clone the original Bootstrap as we want to set our own handler
this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone();
this.bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
assert ch.eventLoop().inEventLoop();
// 当新建链接时,会调用该方法
handler.channelCreated(ch);
}
});
this.lastRecentUsed = lastRecentUsed;
}
}
public void channelCreated(Channel ch) {
inactiveConnections.incrementAndGet();
...
// 这里把 ch 包装成了一下, PooledConnection 这个类同时实现了 Connection 以及 ConnectionObserver 接口,也就是说既是一个 channel,又是一个 listener。后续若是 channel 的状态发生改变,会调用 PooledConnection 的 onStateChange 方法。
PooledConnection pooledConnection = new PooledConnection(ch, this);
pooledConnection.bind();
Bootstrap bootstrap = this.bootstrap.clone();
BootstrapHandlers.finalizeHandler(bootstrap, opsFactory, pooledConnection);
ch.pipeline()
.addFirst(bootstrap.config()
.handler());
}
复制代码
下面继续看 PooledConnection 的 onStateChange 方法。
public void onStateChange(Connection connection, State newState) {
if (newState == State.DISCONNECTING) {
...
}
// 其余状态走这里
owner().onStateChange(connection, newState);
}
ConnectionObserver owner() {
ConnectionObserver obs;
for (;;) {
obs = channel.attr(OWNER)
.get();
if (obs == null) {
obs = new PendingConnectionObserver();
}
else {
return obs;
}
// 设置 channel.attr(OWNER) 为新建立的 PendingConnectionObserver
// 以后再次调用 own() 方法时直接返回该 PendingConnectionObserver
if (channel.attr(OWNER)
.compareAndSet(null, obs)) {
return obs;
}
}
}
final static class PendingConnectionObserver implements ConnectionObserver {
final Queue<Pending> pendingQueue = Queues.<Pending>unbounded(4).get();
@Override
public void onUncaughtException(Connection connection, Throwable error) {
pendingQueue.add(new Pending(connection, error, null));
}
@Override
public void onStateChange(Connection connection, State newState) {
// 把状态变动放入了等待队列,其余什么都不作
pendingQueue.add(new Pending(connection, null, newState));
}
static class Pending {
final Connection connection;
final Throwable error;
final State state;
Pending(Connection connection, @Nullable Throwable error, @Nullable State state) {
this.connection = connection;
this.error = error;
this.state = state;
}
}
}
复制代码
从上面代码可知,Channel 的状态变动最终放入了一个等待队列,缺乏了通知各个监听器的调用。继续回到 DisposableAcquire 类,发现同时实现了 Runnable 接口。
final static class DisposableAcquire implements Disposable, GenericFutureListener<Future<Channel>>, ConnectionObserver , Runnable {
final Future<Channel> f;
final MonoSink<Connection> sink;
final Pool pool;
final ConnectionObserver obs;
final boolean retried;
@Override
public void onStateChange(Connection connection, State newState) {
if (newState == State.CONFIGURED) {
sink.success(connection);
}
obs.onStateChange(connection, newState);
}
@Override
public void run() {
Channel c = f.getNow();
pool.activeConnections.incrementAndGet();
pool.inactiveConnections.decrementAndGet();
// 以前 owner() 方法设置了 PendingConnectionObserver
ConnectionObserver current = c.attr(OWNER)
.getAndSet(this);
if (current instanceof PendingConnectionObserver) {
PendingConnectionObserver pending = (PendingConnectionObserver)current;
PendingConnectionObserver.Pending p;
current = null;
// 监听链接关闭
registerClose(c, pool);
// 依次处理等待队列中的事件(链接状态变动)
while((p = pending.pendingQueue.poll()) != null) {
if (p.error != null) {
onUncaughtException(p.connection, p.error);
}
else if (p.state != null) {
// 通知各个监听器
onStateChange(p.connection, p.state);
}
}
}
else if (current == null) {
registerClose(c, pool);
}
// TODO: 什么状况会走这边?
if (current != null) {
Connection conn = Connection.from(c);
if (log.isDebugEnabled()) {
log.debug(format(c, "Channel acquired, now {} active connections and {} inactive connections"),
pool.activeConnections, pool.inactiveConnections);
}
obs.onStateChange(conn, State.ACQUIRED);
PooledConnection con = conn.as(PooledConnection.class);
if (con != null) {
ChannelOperations<?, ?> ops = pool.opsFactory.create(con, con, null);
if (ops != null) {
ops.bind();
obs.onStateChange(ops, State.CONFIGURED);
sink.success(ops);
}
else {
//already configured, just forward the connection
sink.success(con);
}
}
else {
//already bound, just forward the connection
sink.success(conn);
}
return;
}
//Connected, leave onStateChange forward the event if factory
...
if (pool.opsFactory == ChannelOperations.OnSetup.empty()) {
sink.success(Connection.from(c));
}
}
}
复制代码
至此,TcpClient 示例程序中的几行代码差很少就算是分析完了。