目标:介绍基于netty4的来实现的远程通讯、介绍dubbo-remoting-netty4内的源码解析。
netty4对netty3兼容性不是很好,而且netty4在不少的术语和api也发生了改变,致使升级netty4会很艰辛,网上应该有不少相关文章,高版本的总有高版本的优点所在,因此dubbo也须要与时俱进,又新增了基于netty4来实现远程通信模块。下面讲解的,若是跟上一篇文章有重复的地方我就略过去了。关键仍是要把远程通信的api那几篇看懂,看这几篇实现才会很简单。java
下面是包的结构:git
该类继承了AbstractChannel,是基于netty4的通道实现类github
/** * 通道集合 */ private static final ConcurrentMap<Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>(); /** * 通道 */ private final Channel channel; /** * 属性集合 */ private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
属性跟netty3实现的通道类属性几乎同样,我就不讲解了。bootstrap
static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) { if (ch == null) { return null; } // 首先从集合中取通道 NettyChannel ret = channelMap.get(ch); // 若是为空,则新建 if (ret == null) { NettyChannel nettyChannel = new NettyChannel(ch, url, handler); // 若是通道还活跃着 if (ch.isActive()) { // 加入集合 ret = channelMap.putIfAbsent(ch, nettyChannel); } if (ret == null) { ret = nettyChannel; } } return ret; }
该方法是得到通道,若是集合中没有找到对应通道,则建立一个,而后加入集合。segmentfault
@Override public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { // 写入数据,发送消息 ChannelFuture future = channel.writeAndFlush(message); // 若是已经发送过 if (sent) { // 得到超时时间 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 等待timeout的链接时间后查看是否发送成功 success = future.await(timeout); } // 得到异常 Throwable cause = future.cause(); // 若是异常不为空,则抛出异常 if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } }
该方法是发送消息,调用了channel.writeAndFlush方法,与netty3的实现只是调用的api不一样。api
@Override public void close() { try { super.close(); } catch (Exception e) { logger.warn(e.getMessage(), e); } try { // 移除通道 removeChannelIfDisconnected(channel); } catch (Exception e) { logger.warn(e.getMessage(), e); } try { // 清理属性集合 attributes.clear(); } catch (Exception e) { logger.warn(e.getMessage(), e); } try { if (logger.isInfoEnabled()) { logger.info("Close netty channel " + channel); } // 关闭通道 channel.close(); } catch (Exception e) { logger.warn(e.getMessage(), e); } }
该方法就是操做了四个步骤,比较清晰。服务器
该类继承了ChannelDuplexHandler,是基于netty4实现的客户端通道处理实现类。这里的设计与netty3实现的通道处理器有所不一样,netty3实现的通道处理器是被客户端和服务端统一使用的,而在这里服务端和客户端使用了两个不一样的Handler来处理。而且netty3的NettyHandler是基于netty3的SimpleChannelHandler设计的,而这里是基于netty4的ChannelDuplexHandler。ide
/** * url对象 */ private final URL url; /** * 通道 */ private final ChannelHandler handler;
该类的属性只有两个,下面实现的方法也都是调用了handler的方法,我就举一个例子:oop
@Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise future) throws Exception { // 得到通道 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { // 断开链接 handler.disconnected(channel); } finally { // 从集合中移除 NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
能够看到分了三部,得到通道对象,调用handler方法,最后检测一下通道是否活跃。其余方法也是差很少。源码分析
该类继承了ChannelDuplexHandler,是基于netty4实现的服务端通道处理实现类。
/** * 链接该服务器的通道数 key为ip:port */ private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel> /** * url对象 */ private final URL url; /** * 通道处理器 */ private final ChannelHandler handler;
该类有三个属性,比NettyClientHandler多了一个属性channels,下面的实现方法也是同样的,都是调用了handler方法,来看一个例子:
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 激活事件 ctx.fireChannelActive(); // 得到通道 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { // 若是通道不为空,则加入集合中 if (channel != null) { channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel); } // 链接该通道 handler.connected(channel); } finally { // 若是通道不活跃,则移除通道 NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }
该方法是通道活跃的时候调用了handler.connected,差很少也是常规套路,就多了激活事件和加入到通道中。其余方法也差很少。
该类继承了AbstractClient,是基于netty4实现的客户端实现类。
/** * NioEventLoopGroup对象 */ private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true)); /** * 客户端引导类 */ private Bootstrap bootstrap; /** * 通道 */ private volatile Channel channel; // volatile, please copy reference to use
属性里的NioEventLoopGroup对象是netty4中的对象,什么用处请看netty的解析。
@Override protected void doOpen() throws Throwable { // 建立一个客户端的通道处理器 final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); // 建立一个引导类 bootstrap = new Bootstrap(); // 设置可选项 bootstrap.group(nioEventLoopGroup) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout()) .channel(NioSocketChannel.class); // 若是链接超时时间小于3s,则设置为3s,也就是说最低的超时时间为3s if (getConnectTimeout() < 3000) { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); } else { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout()); } // 建立一个客户端 bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { // 编解码器 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); // 加入责任链 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyClientHandler); } }); }
该方法仍是作了建立客户端,而且打开的操做,其中不少的参数设置操做。
其余方法跟 dubbo源码解析(十六)远程通讯——Netty3中写到的NettyClient实现同样。
该类继承了AbstractServer,实现了Server。是基于netty4实现的服务器类
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); /** * 链接该服务器的通道集合 key为ip:port */ private Map<String, Channel> channels; // <ip:port, channel> /** * 服务器引导类 */ private ServerBootstrap bootstrap; /** * 通道 */ private io.netty.channel.Channel channel; /** * boss线程组 */ private EventLoopGroup bossGroup; /** * worker线程组 */ private EventLoopGroup workerGroup;
属性相较netty3而言,新增了两个线程组,一样也是由于netty3和netty4的设计不一样。
@Override protected void doOpen() throws Throwable { // 建立服务引导类 bootstrap = new ServerBootstrap(); // 建立boss线程组 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); // 建立worker线程组 workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); // 建立服务器处理器 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); // 得到通道集合 channels = nettyServerHandler.getChannels(); // 设置ventLoopGroup还有可选项 bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 编解码器 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); // 增长责任链 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyServerHandler); } }); // bind 绑定 ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); // 等待绑定完成 channelFuture.syncUninterruptibly(); // 设置通道 channel = channelFuture.channel(); }
该方法是建立服务器,而且开启。若是熟悉netty4点朋友应该以为仍是很好理解的。其余方法跟《 dubbo源码解析(十六)远程通讯——Netty3》中写到的NettyClient实现同样,处理close中要多关闭两个线程组
该类跟 《dubbo源码解析(十六)远程通讯——Netty3》中的NettyTransporter同样的实现。
该类是基于netty4的编解码器。
/** * 编码器 */ private final ChannelHandler encoder = new InternalEncoder(); /** * 解码器 */ private final ChannelHandler decoder = new InternalDecoder(); /** * 编解码器 */ private final Codec2 codec; /** * url对象 */ private final URL url; /** * 通道处理器 */ private final com.alibaba.dubbo.remoting.ChannelHandler handler;
属性跟基于netty3实现的编解码同样。
private class InternalEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { // 建立缓冲区 com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out); // 得到通道 Channel ch = ctx.channel(); // 得到netty通道 NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); try { // 编码 codec.encode(channel, buffer, msg); } finally { // 检测通道是否活跃 NettyChannel.removeChannelIfDisconnected(ch); } } }
该内部类是编码器的抽象,主要的编码仍是调用了codec.encode。
private class InternalDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception { // 建立缓冲区 ChannelBuffer message = new NettyBackedChannelBuffer(input); // 得到通道 NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); Object msg; int saveReaderIndex; try { // decode object. do { // 记录读索引 saveReaderIndex = message.readerIndex(); try { // 解码 msg = codec.decode(channel, message); } catch (IOException e) { throw e; } // 拆包 if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) { message.readerIndex(saveReaderIndex); break; } else { //is it possible to go here ? if (saveReaderIndex == message.readerIndex()) { throw new IOException("Decode without read data."); } // 读取数据 if (msg != null) { out.add(msg); } } } while (message.readable()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } }
该内部类是解码器的抽象类,其中关键的是调用了codec.decode。
该类是缓冲区类。
/** * 缓冲区 */ private ByteBuf buffer;
其中的方法几乎都调用了该属性的方法。而ByteBuf是netty4中的字节数据的容器。
这两个类是用于用于格式化的,是从netty4中复制出来的,其中而且稍微作了一下改动。我就再也不讲解了。
该部分相关的源码解析地址: https://github.com/CrazyHZM/i...
该文章讲解了基于netty4的来实现的远程通讯、介绍dubbo-remoting-netty4内的源码解析,关键须要对netty4有所了解。下一篇我会讲解基于zookeeper实现远程通讯部分。