在讲解dubboTCP端的设计时,先了解下一些类的关系图。它们是如何组织在一块儿的,每一个功能又是什么,接着在进一步深刻了解其内涵。html
一、Exchangers(交换器工具类) 用来建立TCP服务(bind)和创建客户端链接(connect)辅助类bootstrap
二、Transporters(数据流传输工具类)用来建立TCP服务(bind)和创建客户端链接(connect)辅助类,Exchangers的底层内容依赖于Transporters,而且Transporters会根据SPI扩展,来适配合适的tcp通信框架,好比netty,mina等。服务器
三、Exchanger(交换器) 用来建立TCP连接,经过工具类Exchangers完成,该接口是一个SPI扩展,目前惟一仅有就是HeaderExchanger。从名字的含义能够获得,该协议是具备自定义协议头的交换器,因此取名HeaderExchanger。多线程
四、Transporter(数据传输层) 用来建立TCP链接,经过工具类Transporters完成。它也是一个SPI扩展,好比NettyTransporter,MinaTransporter。app
五、ExchangeClient (交换器客户端),Exchanger的connect()方法返回,即创建了TCP链接后,返回的客户端,接着就是经过该客户端与服务端通讯,实例有HeaderExchangeClient、LazyConnectExchangeClient、ReferenceCountExchangeClient。以后分别讲解这3个,Exchangers工具类创建的链接客户端是HeaderExchangeClient。框架
六、ExchangeServer (交换器服务端端) Exchanger的bind()方法返回,即服务端监听的服务端实例,它监听这某个具体的tcp端口。默认实现是HeaderExchangeServer。dom
七、RemotingServer(远程的TCP服务端),ExchangeServer类也实现了该接口,表明其也是一个远程服务器,具体的实现有NettyServer,由Transporter的bind()方法返回,具体的Transporter返回相应的远程服务端。好比NettyTransporter#bind()返回NettyServer。socket
八、Client(TCP客户端),ExchangeClient类也实现了该接口,表明其也是一个TCP客户端,具体实现有NettyClient,由Transporter的connect()方法返回,具体的Transporter返回相应的TCP客户端。好比NettyTransporter#connect()返回NettyClient。tcp
九、Channel (通讯通道) ,每创建一个TCP连接就相应建立一个Channel。好比Netty创建链接后,就有一个Channel。这里的Channel指的是dubbo本身定义的一个channel。它与netty的channel创建关联,经过NettyChannel类,框架操做的是NettyChannel,而NettyChannel内部持有一个netty的channel对象。ide
十、HeaderExchangeChannel(交换器Channel,ExchangeChannel属于交换器Channel),它被HeaderExchangeClient客户端所持有,客户端就是经过HeaderExchangeChannel进行通讯的,HeaderExchangeChannel内部持有一个具体的Channel。
十一、ChannelHandler (通道处理器) 用来处理创建链接、发送请求、结束请求等操做的具体抽象。
十二、ChannelHandlers(通道处理器工具类) 主要用来包裹封装具体的Channel,它的做用是经过消息类型,根据Dispatcher返回不一样的
1三、Dispatcher(消息派发器)
类型 | Dispatcher | Channelhandler | 做用 |
---|---|---|---|
All | AllDispatcher | AllChannelHandler | 全部的消息类型所有经过业务线程池处理 |
Connection | ConnectionOrderedDispatcher | ConnectionOrderedChannelHandler | 链接、断开消息单独经过一个线程池池来处理,其余的读写等消息经过业务线程池处理 |
Direct | DirectDispatcher | DirectChannelHandler | 全部的消息都经过IO线程池处理,不放到业务线程池中 |
Execution | ExecutionDispatcher | ExecutionChannelHandler | 请求消息在业务线程池处理,其余消息在IO线程池。 |
Message | MessageOnlyDispatcher | MessageOnlyChannelHandler | 请求和响应消息在业务线程池处理,其余心跳,链接等消息在IO线程池处理 |
@Data @ToString public class SampleMessage { private String threadName; private String id; private String desc; }
public class SampleEncoder extends MessageToByteEncoder<SampleMessage> { protected void encode(ChannelHandlerContext channelHandlerContext, SampleMessage sampleMessage, ByteBuf byteBuf) throws Exception { String threadName = sampleMessage.getThreadName(); String id = sampleMessage.getId(); String desc = sampleMessage.getDesc(); byteBuf.writeInt(threadName.getBytes().length); byteBuf.writeBytes(threadName.getBytes()); byteBuf.writeInt(id.getBytes().length); byteBuf.writeBytes(id.getBytes()); byteBuf.writeInt(desc.getBytes().length); byteBuf.writeBytes(desc.getBytes()); String str = sampleMessage.getThreadName() + ":" + sampleMessage.getDesc() + ":" + sampleMessage.getId(); System.out.println(str); } }
public class SampleDecoder extends ByteToMessageDecoder { protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { byteBuf.markReaderIndex(); String threadName = read(byteBuf); if (threadName == null) { byteBuf.resetReaderIndex(); return; } String id = read(byteBuf); if (id == null) { byteBuf.resetReaderIndex(); return; } String desc = read(byteBuf); if (desc == null) { byteBuf.resetReaderIndex(); return; } SampleMessage sampleMessage = new SampleMessage(); sampleMessage.setId(id); sampleMessage.setThreadName(threadName); sampleMessage.setDesc(desc); list.add(sampleMessage); } private String read(ByteBuf byteBuf) { if (canReadInt(byteBuf)) { int readInt = byteBuf.readInt(); if (canReadN(byteBuf, readInt)) { byte[] bytes = new byte[readInt]; byteBuf.readBytes(bytes); return new String(bytes); } } return null; } private boolean canReadInt(ByteBuf byteBuf) { return canReadN(byteBuf, 4); } private boolean canReadN(ByteBuf byteBuf, int n) { if (!byteBuf.isReadable()) { return false; } return byteBuf.readableBytes() >= n; } }
public class PrintChannelHandlers extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof SampleMessage) { SampleMessage sampleMessage = (SampleMessage) msg; System.out.println(sampleMessage.getThreadName() + ":" + sampleMessage.getId() + ":" + sampleMessage.getDesc()); } } }
public class NettyServerMain { public static void main(String[] args) { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(12)) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // .addLast("log",new LoggingHandler(LogLevel.INFO)) .addLast("decoder", new SampleDecoder()) .addLast("encoder", new SampleEncoder()) .addLast("handler", new PrintChannelHandlers()); } }); ChannelFuture channelFuture = serverBootstrap.bind(8888); channelFuture.syncUninterruptibly(); System.out.println("连接前"); Channel channel = channelFuture.channel(); System.out.println("连接后"); } }
public class NettyClientMain { public static void main(String[] args) { NettyClientMain nettyClientMain = new NettyClientMain(); nettyClientMain.open(); } public void open() { Bootstrap bootstrap = new Bootstrap(); bootstrap = new Bootstrap(); bootstrap.group(new NioEventLoopGroup(10)) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .channel(NioSocketChannel.class); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", new SampleDecoder()) .addLast("encoder", new SampleEncoder()); //.addLast("handler", new PrintChannelHandlers()); } }); SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8888); ChannelFuture future = bootstrap.connect(socketAddress); boolean ret = future.awaitUninterruptibly(3000, MILLISECONDS); if (ret && future.isSuccess()) { Channel newChannel = future.channel(); doProcess(newChannel); } } private void doProcess(Channel channel) { AtomicLong atomicLong = new AtomicLong(); for (int i = 0; i < 15; i++) { final char ch = (char) (i + 65); final String id = "id" + i; Thread t = new Thread(new Runnable() { @Override public void run() { while (true) { SampleMessage sampleMessage = new SampleMessage(); sampleMessage.setThreadName(Thread.currentThread().getName()); sampleMessage.setDesc(getdes(ch)); sampleMessage.setId("id" + sampleMessage.getDesc().length() + "-" + atomicLong.getAndIncrement()); channel.writeAndFlush(sampleMessage); } } }); t.start(); } } private String getdes(char a) { Random random = new Random(); StringBuffer buffer = new StringBuffer(); for (int i = 0; i < random.nextInt(500) + 1; i++) { buffer.append(a); } return buffer.toString(); } }
结果符合预期,dubbo 也是经过服务底层公用一条TCP连接,多线程进行调用该链路channel。