seata中netty的使用源码(二)

此次咱们看的是客户端部分。bootstrap

1:在客户端咱们使用的是注解 @GlobalTransactional。会建立代理 GlobalTransactionScanner。在代理的初始化代码中,会进行TM和RM的初始化,代码以下:缓存

private void initClient() {
    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
        throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    }
    //init TM
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    //init RM
    RMClient.init(applicationId, txServiceGroup);
    registerSpringShutdownHook();
}

2:在 TMClient 或者 RMClient 的init 方法里,会建立 NettyClientBootstrap 实例。在 NettyClientBootstrap 构造过程当中,会建立 Bootstrap 实例,也会建立 NioEventLoopGroup 的客户端事件选择器。代码以下:服务器

public class NettyClientBootstrap implements RemotingBootstrap {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);
    private final NettyClientConfig nettyClientConfig;
    private final Bootstrap bootstrap = new Bootstrap();
    private final EventLoopGroup eventLoopGroupWorker;
    private EventExecutorGroup defaultEventExecutorGroup;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    
    public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
        NettyPoolKey.TransactionRole transactionRole) {
        if (nettyClientConfig == null) {
            nettyClientConfig = new NettyClientConfig();
        }
        this.nettyClientConfig = nettyClientConfig;
        int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
        this.transactionRole = transactionRole;
        this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,
            new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
            selectorThreadSizeThreadSize));
        this.defaultEventExecutorGroup = eventExecutorGroup;
}

3:建立以后,会调用 NettyClientBootstrap 的 start 方法,创建netty的客户端代码,以下:app

public void start() {
    this.bootstrap.group(this.eventLoopGroupWorker).channel( //绑定事件选择器
        nettyClientConfig.getClientChannelClazz()).option( //设置通道类型,默认是NioSocketChannel
        ChannelOption.TCP_NODELAY, true) // TCP不缓存直接发送
        .option(ChannelOption.SO_KEEPALIVE, true) // TCP进行心跳检测
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) // 设置链接超时时间
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) //设置发送缓存区大小
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); //设置接受缓冲区大小
          
    bootstrap.handler(new ChannelInitializer<SocketChannel>() { //设置通道处理器
            @Override
       public void initChannel(SocketChannel ch) {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(
                    new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), //添加通道空闲心跳处理器
                         nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                         nettyClientConfig.getChannelMaxAllIdleSeconds()))
                    .addLast(new ProtocolV1Decoder()) //通道消息解码处理器
                    .addLast(new ProtocolV1Encoder()); //通道消息编码处理器
                if (channelHandlers != null) {
                    addChannelPipelineLast(ch, channelHandlers); //添加处理器 ClientHandler
                 }
            }
        });
        if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {
            LOGGER.info("NettyClientBootstrap has started");
      }
}

4:在seata客户端,使用netty客户端的时候,使用了池化技术,其工厂类是 NettyPoolableFactory。在 makeObject 方法中去获取netty的链接通道。获取通道的代码以下:ide

public Channel getNewChannel(InetSocketAddress address) {
    Channel channel;
    ChannelFuture f = this.bootstrap.connect(address); // 链接netty服务器
    try {
        f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS); // 等待链接完成
        if (f.isCancelled()) {
            throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server.");
        } else if (!f.isSuccess()) {
            throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server.");
        } else {
            channel = f.channel(); //获取通道
        }
    } catch (Exception e) {
        throw new FrameworkException(e, "can not connect to services-server.");
    }
    return channel;
}

5:发送消息的示例代码(这是须要获取返回值的状况,若是不须要获取返回值,直接调用 channel.writeAndFlush()便可):oop

protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
    MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeoutMillis);
    futures.put(rpcMessage.getId(), messageFuture);
    channelWritableCheck(channel, rpcMessage.getBody());
    String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
    doBeforeRpcHooks(remoteAddr, rpcMessage);
    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
        if (!future.isSuccess()) {
            MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
            if (messageFuture1 != null) {
                messageFuture1.setResultMessage(future.cause());
            }
            destroyChannel(future.channel());
        }
    });
    try {
        Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        doAfterRpcHooks(remoteAddr, rpcMessage, result);
        return result;
    } catch (Exception exx) {
        if (exx instanceof TimeoutException) {
            throw (TimeoutException) exx;
        } else {
            throw new RuntimeException(exx);
        }
    }
}