Dubbo源码解析(十六)远程通讯——Netty3

远程通信——Netty3

目标:介绍基于netty3的来实现的远程通讯、介绍dubbo-remoting-netty内的源码解析。java

前言

如今dubbo默认的网络传输Transport接口默认实现的仍是基于netty3实现的网络传输,不过立刻后面默认实现就要改成netty4了。因为netty4对netty3对兼容性不是很好,因此保留了两个版本的实现。git

下面是包结构:github

netty3包结构

源码分析

(一)NettyChannel

该类继承了AbstractChannel类,是基于netty3实现的通道。算法

1.属性

/** * 通道集合 */
private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();

/** * 通道 */
private final org.jboss.netty.channel.Channel channel;

/** * 属性集合 */
private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
复制代码

2.getOrAddChannel

static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
    if (ch == null) {
        return null;
    }
    // 首先从集合中取通道
    NettyChannel ret = channelMap.get(ch);
    // 若是为空,则新建
    if (ret == null) {
        NettyChannel nc = new NettyChannel(ch, url, handler);
        // 若是通道链接着
        if (ch.isConnected()) {
            // 加入集合
            ret = channelMap.putIfAbsent(ch, nc);
        }
        if (ret == null) {
            ret = nc;
        }
    }
    return ret;
}
复制代码

该方法是得到通道,当通道在集合中没有的时候,新建一个通道。bootstrap

3.removeChannelIfDisconnected

static void removeChannelIfDisconnected(org.jboss.netty.channel.Channel ch) {
    if (ch != null && !ch.isConnected()) {
        channelMap.remove(ch);
    }
}
复制代码

该方法是当通道没有链接的时候,从集合中移除它。segmentfault

4.send

@Override
public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 写入数据,发送消息
        ChannelFuture future = channel.write(message);
        // 若是已经发送过
        if (sent) {
            // 得到超时时间
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待timeout的链接时间后查看是否发送成功
            success = future.await(timeout);
        }
        // 看是否有异常
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }

    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}
复制代码

该方法是发送消息,其中用到了channe.write方法传输消息,而且经过返回的future来判断是否发送成功。缓存

5.close

@Override
public void close() {
    try {
        super.close();
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
    try {
        // 若是通道断开,则移除该通道
        removeChannelIfDisconnected(channel);
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
    try {
        // 清空属性
        attributes.clear();
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
    try {
        if (logger.isInfoEnabled()) {
            logger.info("Close netty channel " + channel);
        }
        // 关闭通道
        channel.close();
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
}
复制代码

该方法是关闭通道,作了三个操做,分别是从集合中移除、清除属性、关闭通道。服务器

其余实现方法比较简单,我就讲解了。网络

(二)NettyHandler

该类继承了SimpleChannelHandler类,是基于netty3的通道处理器,而该类被加上了@Sharable注解,也就是说该处理器能够从属于多个ChannelPipelineapp

1.属性

/** * 通道集合,key是主机地址 ip:port */
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>

/** * url对象 */
private final URL url;

/** * 通道 */
private final ChannelHandler handler;
复制代码

该类的属性比较简单,而且该类中实现的方法都是调用了属性handler的方法,我举一个例子来说,其余的能够本身查看源码,比较简单。

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    // 得到通道实例
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    try {
        if (channel != null) {
            // 保存该通道,加入到集合中
            channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);
        }
        // 链接
        handler.connected(channel);
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
}
复制代码

该方法是通道链接的方法,其中先获取了通道实例,而后吧该实例加入到集合中,最好带哦用handler.connected来进行链接。

(三)NettyClient

该类继承了AbstractClient,是基于netty3实现的客户端类。

1.属性

private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

// ChannelFactory's closure has a DirectMemory leak, using static to avoid
// https://issues.jboss.org/browse/NETTY-424
/** * 通道工厂,用static来避免直接缓存区的一个OOM问题 */
private static final ChannelFactory channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
        Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
        Constants.DEFAULT_IO_THREADS);
/** * 客户端引导对象 */
private ClientBootstrap bootstrap;

/** * 通道 */
private volatile Channel channel; // volatile, please copy reference to use
复制代码

上述属性中ChannelFactory用了static修饰,为了不netty3中会有直接缓冲内存泄漏的现象,具体的讨论能够访问注释中的讨论。

2.doOpen

@Override
protected void doOpen() throws Throwable {
    // 设置日志工厂
    NettyHelper.setNettyLoggerFactory();
    // 实例化客户端引导类
    bootstrap = new ClientBootstrap(channelFactory);
    // config
    // @see org.jboss.netty.channel.socket.SocketChannelConfig
    // 配置选择项
    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("connectTimeoutMillis", getConnectTimeout());
    // 建立通道处理器
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    // 设置责任链路
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        /** * 得到通道 * @return */
        @Override
        public ChannelPipeline getPipeline() {
            // 新建编解码
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            // 得到管道
            ChannelPipeline pipeline = Channels.pipeline();
            // 设置解码器
            pipeline.addLast("decoder", adapter.getDecoder());
            // 设置编码器
            pipeline.addLast("encoder", adapter.getEncoder());
            // 设置通道处理器
            pipeline.addLast("handler", nettyHandler);
            // 返回通道
            return pipeline;
        }
    });
}
复制代码

该方法是建立客户端,而且打开,其中的逻辑就是用netty3的客户端引导类来建立一个客户端,若是对netty不熟悉的朋友能够先补补netty知识。

3.doConnect

@Override
protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis();
    // 用引导类链接
    ChannelFuture future = bootstrap.connect(getConnectAddress());
    try {
        // 在超时时间内是否链接完成
        boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);

        if (ret && future.isSuccess()) {
            // 得到通道
            Channel newChannel = future.getChannel();
            // 异步修改此通道
            newChannel.setInterestOps(Channel.OP_READ_WRITE);
            try {
                // Close old channel 关闭旧的通道
                Channel oldChannel = NettyClient.this.channel; // copy reference
                if (oldChannel != null) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                        }
                        // 关闭
                        oldChannel.close();
                    } finally {
                        // 移除通道
                        NettyChannel.removeChannelIfDisconnected(oldChannel);
                    }
                }
            } finally {
                // 若是客户端关闭
                if (NettyClient.this.isClosed()) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                        }
                        // 关闭通道
                        newChannel.close();
                    } finally {
                        NettyClient.this.channel = null;
                        NettyChannel.removeChannelIfDisconnected(newChannel);
                    }
                } else {
                    NettyClient.this.channel = newChannel;
                }
            }
        } else if (future.getCause() != null) {
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + ", error message is:" + future.getCause().getMessage(), future.getCause());
        } else {
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + " client-side timeout "
                    + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
        }
    } finally {
        // 若是客户端没有链接
        if (!isConnected()) {
            // 取消future
            future.cancel();
        }
    }
}
复制代码

该方法是客户端链接服务器的方法。其中调用了bootstrap.connect。后面的逻辑是用来检测是否链接,最后若是未链接,则会取消该链接任务。

4.doClose

@Override
protected void doClose() throws Throwable {
    /*try { bootstrap.releaseExternalResources(); } catch (Throwable t) { logger.warn(t.getMessage()); }*/
}
复制代码

在这里不能关闭是由于channelFactory 是静态属性,被多个 NettyClient 共用。因此不能释放资源。

(四)NettyServer

该类继承了AbstractServer,实现了Server,是基于netty3实现的服务器类。

1.属性

/** * 链接该服务器的通道集合 */
private Map<String, Channel> channels; // <ip:port, channel>

/** * 服务器引导类对象 */
private ServerBootstrap bootstrap;

/** * 通道 */
private org.jboss.netty.channel.Channel channel;
复制代码

2.doOpen

@Override
protected void doOpen() throws Throwable {
    // 设置日志工厂
    NettyHelper.setNettyLoggerFactory();
    // 建立线程池
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    // 新建通道工厂
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    // 新建服务引导类对象
    bootstrap = new ServerBootstrap(channelFactory);

    // 新建通道处理器
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    // 得到通道集合
    channels = nettyHandler.getChannels();
    // https://issues.jboss.org/browse/NETTY-365
    // https://issues.jboss.org/browse/NETTY-379
    // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
    // 禁用nagle算法,将数据当即发送出去。纳格算法是以减小封包传送量来增进TCP/IP网络的效能
    bootstrap.setOption("child.tcpNoDelay", true);
    // 设置管道工厂
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        /** * 得到通道 * @return */
        @Override
        public ChannelPipeline getPipeline() {
            // 新建编解码器
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            // 得到通道
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/
            // 设置解码器
            pipeline.addLast("decoder", adapter.getDecoder());
            // 设置编码器
            pipeline.addLast("encoder", adapter.getEncoder());
            // 设置通道处理器
            pipeline.addLast("handler", nettyHandler);
            // 返回通道
            return pipeline;
        }
    });
    // bind 绑定地址,也就是启用服务器
    channel = bootstrap.bind(getBindAddress());
}
复制代码

该方法是建立服务器,而且打开服务器。一样建立服务器的方式跟正常的用netty建立服务器方式同样,只是新加了编码器和解码器。还有一个注意点就是这里ServerBootstrap 的可选项。

3.doClose

@Override
protected void doClose() throws Throwable {
    try {
        if (channel != null) {
            // unbind.关闭通道
            channel.close();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
    try {
        // 得到全部链接该服务器的通道集合
        Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
        if (channels != null && !channels.isEmpty()) {
            // 遍历通道集合
            for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                try {
                    // 关闭通道链接
                    channel.close();
                } catch (Throwable e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
    try {
        if (bootstrap != null) {
            // release external resource. 回收资源
            bootstrap.releaseExternalResources();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
    try {
        if (channels != null) {
            // 清空集合
            channels.clear();
        }
    } catch (Throwable e) {
        logger.warn(e.getMessage(), e);
    }
}
复制代码

该方法是关闭服务器,一系列的操做很清晰,我就很少说了。

4.getChannels

@Override
public Collection<Channel> getChannels() {
    Collection<Channel> chs = new HashSet<Channel>();
    for (Channel channel : this.channels.values()) {
        // 若是通道链接,则加入集合,返回
        if (channel.isConnected()) {
            chs.add(channel);
        } else {
            channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
        }
    }
    return chs;
}
复制代码

该方法是返回链接该服务器的通道集合,而且用了HashSet保存,不会重复。

(五)NettyTransporter

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        // 建立一个NettyServer
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        // 建立一个NettyClient
        return new NettyClient(url, listener);
    }

}
复制代码

该类就是基于netty3的Transporter实现类,一样两个方法也是分别建立了NettyServer和NettyClient。

(六)NettyHelper

该类是设置日志的工具类,其中基于netty3的InternalLoggerFactory实现类一个DubboLoggerFactory。这个我就不讲解了,比较好理解,不理解也无伤大雅。

(七)NettyCodecAdapter

该类是基于netty3实现的编解码类。

1.属性

/** * 编码者 */
private final ChannelHandler encoder = new InternalEncoder();

/** * 解码者 */
private final ChannelHandler decoder = new InternalDecoder();

/** * 编解码器 */
private final Codec2 codec;

/** * url对象 */
private final URL url;

/** * 缓冲区大小 */
private final int bufferSize;

/** * 通道对象 */
private final com.alibaba.dubbo.remoting.ChannelHandler handler;

复制代码

InternalEncoder和InternalDecoder属性是该类的内部类,分别掌管着编码和解码

2.构造方法

public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
    this.codec = codec;
    this.url = url;
    this.handler = handler;
    int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
    // 若是缓存区大小在16字节之内,则设置配置大小,若是不是,则设置8字节的缓冲区大小
    this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}

复制代码

你会发现对于缓存区大小的规则都是同样的。

3.InternalEncoder

@Sharable
private class InternalEncoder extends OneToOneEncoder {

    @Override
    protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
        // 动态分配一个1k的缓冲区
        com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
        // 得到通道对象
        NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
        try {
            // 编码
            codec.encode(channel, buffer, msg);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ch);
        }
        // 基于buteBuffer建立一个缓冲区,而且写入数据
        return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
    }
}

复制代码

该内部类实现类编码的逻辑,主要调用了codec.encode。

4.InternalDecoder

private class InternalDecoder extends SimpleChannelUpstreamHandler {

    private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
            com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
        Object o = event.getMessage();
        // 若是消息不是一个ChannelBuffer类型
        if (!(o instanceof ChannelBuffer)) {
            // 转发事件到与此上下文关联的处理程序最近的上游
            ctx.sendUpstream(event);
            return;
        }

        ChannelBuffer input = (ChannelBuffer) o;
        // 若是可读数据不大于0,直接返回
        int readable = input.readableBytes();
        if (readable <= 0) {
            return;
        }

        com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
        if (buffer.readable()) {
            // 判断buffer是不是动态分配的缓冲区
            if (buffer instanceof DynamicChannelBuffer) {
                // 写入数据
                buffer.writeBytes(input.toByteBuffer());
                message = buffer;
            } else {
                // 须要的缓冲区大小
                int size = buffer.readableBytes() + input.readableBytes();
                // 动态生成缓冲区
                message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
                        size > bufferSize ? size : bufferSize);
                // 把buffer数据写入message
                message.writeBytes(buffer, buffer.readableBytes());
                // 把input数据写入message
                message.writeBytes(input.toByteBuffer());
            }
        } else {
            // 不然 基于ByteBuffer经过buffer来建立一个新的缓冲区
            message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
                    input.toByteBuffer());
        }

        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        Object msg;
        int saveReaderIndex;

        try {
            // decode object.
            do {
                saveReaderIndex = message.readerIndex();
                try {
                    // 解码
                    msg = codec.decode(channel, message);
                } catch (IOException e) {
                    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                    throw e;
                }
                // 拆包
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    message.readerIndex(saveReaderIndex);
                    break;
                } else {
                    // 若是已经到达读索引,则没有数据可解码
                    if (saveReaderIndex == message.readerIndex()) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw new IOException("Decode without read data.");
                    }
                    //
                    if (msg != null) {
                        // 将消息发送到指定关联的处理程序最近的上游
                        Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                    }
                }
            } while (message.readable());
        } finally {
            // 若是消息还有可读数据,则丢弃
            if (message.readable()) {
                message.discardReadBytes();
                buffer = message;
            } else {
                buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
            }
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        ctx.sendUpstream(e);
    }
}

复制代码

该内部类实现了解码的逻辑,其中大部分逻辑都在对数据作读写,关键的解码调用了codec.decode。

(八)NettyBackedChannelBufferFactory

该类是建立缓冲区的工厂类。它实现了ChannelBufferFactory接口,也就是实现类它的三种得到缓冲区的方法。

public class NettyBackedChannelBufferFactory implements ChannelBufferFactory {

    /** * 单例 */
    private static final NettyBackedChannelBufferFactory INSTANCE = new NettyBackedChannelBufferFactory();

    public static ChannelBufferFactory getInstance() {
        return INSTANCE;
    }


    @Override
    public ChannelBuffer getBuffer(int capacity) {
        return new NettyBackedChannelBuffer(ChannelBuffers.dynamicBuffer(capacity));
    }


    @Override
    public ChannelBuffer getBuffer(byte[] array, int offset, int length) {
        org.jboss.netty.buffer.ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(length);
        buffer.writeBytes(array, offset, length);
        return new NettyBackedChannelBuffer(buffer);
    }


    @Override
    public ChannelBuffer getBuffer(ByteBuffer nioBuffer) {
        return new NettyBackedChannelBuffer(ChannelBuffers.wrappedBuffer(nioBuffer));
    }
}

复制代码

能够看到,都是建立了一个NettyBackedChannelBuffer,下面讲解NettyBackedChannelBuffer。

(九)NettyBackedChannelBuffer

该类是基于netty3的buffer从新实现的缓冲区,它实现了ChannelBuffer接口,而且有一个属性:

private org.jboss.netty.buffer.ChannelBuffer buffer;

复制代码

那么其中的几乎全部方法都是调用了这个buffer的方法,由于我在dubbo源码解析(十一)远程通讯——Buffer中写到ChannelBuffer接口方法定义跟netty中的缓冲区定义几乎同样,连注释都几乎同样。全部知识单纯的调用了buffer的方法。具体的代码能够查看个人GitHub

后记

该部分相关的源码解析地址:github.com/CrazyHZM/in…

该文章讲解了基于netty3的来实现的远程通讯、介绍dubbo-remoting-netty内的源码解析,关键须要对netty有所了解。下一篇我会讲解基于netty4实现远程通讯部分。

相关文章
相关标签/搜索