#1 系列目录java
#2 NIO通讯层的抽象apache
目前dubbo已经集成的有netty、mina、grizzly。先来经过案例简单了解下netty、mina编程(grizzly没有了解过)编程
##2.1 netty和mina的简单案例服务器
netty本来是jboss开发的,后来单独出来了,因此会有两种版本就是org.jboss.netty和io.netty两种包类型的,而dubbo内置的是前者。目前还不是很熟悉,可能稍有差异,可是总体大概都是同样的。微信
咱们先来看下io.netty的案例:session
public static void main(String[] args){ EventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap=new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TcpServerHandler()); } }); ChannelFuture f=serverBootstrap.bind(8080).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }
mina的案例:多线程
public static void main(String[] args) throws IOException{ IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter( new TextLineCodecFactory(Charset.forName("UTF-8"),"\r\n", "\r\n"))); acceptor.setHandler(new TcpServerHandler()); acceptor.bind(new InetSocketAddress(8080)); }
二者都是使用Reactor模型结构。而最原始BIO模型以下:并发
每来一个Socket链接都为该Socket建立一个线程来处理。因为总线程数有限制,致使Socket链接受阻,因此BIO模型并发量并不大app
Rector多线程模型以下,更多信息见Netty系列之Netty线程模型:框架
用一个boss线程,建立Selector,用于不断监听Socket链接、客户端的读写操做等
用一个线程池即workers,负责处理Selector派发的读写操做。
因为boss线程能够接收更多的Socket链接,同时能够充分利用线程池中的每一个线程,减小了BIO模型下每一个线程为单独的socket的等待时间。
##2.2 服务器端如何集成netty和mina
先来简单总结下上述netty和mina的类似之处,而后进行抽象归纳成接口
1 各自有各自的编程启动方式
2 都须要各自的ChannelHandler实现,用于处理各自的Channel或者IoSession的链接、读写等事件。
对于netty来讲: 须要继承org.jboss.netty.channel.SimpleChannelHandler(或者其余方式),来处理org.jboss.netty.channel.Channel的链接读写事件
对于mina来讲:须要继承org.apache.mina.common.IoHandlerAdapter(或者其余方式),来处理org.apache.mina.common.IoSession的链接读写事件
为了统一上述问题,dubbo须要作以下事情:
1 定义dubbo的com.alibaba.dubbo.remoting.Channel接口
1.1 针对netty,上述Channel的实现为NettyChannel,内部含有一个netty本身的org.jboss.netty.channel.Channel channel对象,即该com.alibaba.dubbo.remoting.Channel接口的功能实现所有委托为底层的org.jboss.netty.channel.Channel channel对象来实现
1.2 针对mina,上述Channel实现为MinaChannel,内部包含一个mina本身的org.apache.mina.common.IoSession session对象,即该com.alibaba.dubbo.remoting.Channel接口的功能实现所有委托为底层的org.apache.mina.common.IoSession session对象来实现
2 定义本身的com.alibaba.dubbo.remoting.ChannelHandler接口,用于处理com.alibaba.dubbo.remoting.Channel接口的链接读写事件,以下所示
public interface ChannelHandler { void connected(Channel channel) throws RemotingException; void disconnected(Channel channel) throws RemotingException; void sent(Channel channel, Object message) throws RemotingException; void received(Channel channel, Object message) throws RemotingException; void caught(Channel channel, Throwable exception) throws RemotingException; }
2.1 先定义用于处理netty的NettyHandler,须要按照netty的方式继承netty的org.jboss.netty.channel.SimpleChannelHandler,此时NettyHandler就能够委托dubbo的com.alibaba.dubbo.remoting.ChannelHandler接口实现来完成具体的功能,在交给com.alibaba.dubbo.remoting.ChannelHandler接口实现以前,须要先将netty本身的org.jboss.netty.channel.Channel channel转化成上述的NettyChannel,见NettyHandler
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { if (channel != null) { channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel); } handler.connected(channel); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } @Override public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress())); handler.disconnected(channel); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } }
2.2 先定义用于处理mina的MinaHandler,须要按照mina的方式继承mina的org.apache.mina.common.IoHandlerAdapter,此时MinaHandler就能够委托dubbo的com.alibaba.dubbo.remoting.ChannelHandler接口实现来完成具体的功能,在交给com.alibaba.dubbo.remoting.ChannelHandler接口实现以前,须要先将mina本身的org.apache.mina.common.IoSession转化成上述的MinaChannel,见MinaHandler
public void sessionOpened(IoSession session) throws Exception { MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler); try { handler.connected(channel); } finally { MinaChannel.removeChannelIfDisconnectd(session); } } @Override public void sessionClosed(IoSession session) throws Exception { MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler); try { handler.disconnected(channel); } finally { MinaChannel.removeChannelIfDisconnectd(session); } }
作了上述事情以后,所有逻辑就统一到dubbo本身的com.alibaba.dubbo.remoting.ChannelHandler接口如何来处理本身的com.alibaba.dubbo.remoting.Channel接口。
这就须要看下com.alibaba.dubbo.remoting.ChannelHandler接口的实现有哪些:
3 定义Server接口用于统一你们的启动流程
先来看下总体的Server接口实现状况
如NettyServer的启动流程: 按照netty本身的API启动方式,而后依据外界传递进来的com.alibaba.dubbo.remoting.ChannelHandler接口实现,建立出NettyHandler,最终对用户的链接请求的处理所有交给NettyHandler来处理,NettyHandler又交给了外界传递进来的com.alibaba.dubbo.remoting.ChannelHandler接口实现。
至此就将全部底层不一样的通讯实现所有转化到了外界传递进来的com.alibaba.dubbo.remoting.ChannelHandler接口的实现上了。
而上述Server接口的另外一个分支实现HeaderExchangeServer则充当一个装饰器的角色,为全部的Server实现增添了以下功能:
向该Server全部的Channel依次进行心跳检测:
##2.3 客户端如何集成netty和mina
服务器端了解了以后,客户端就也很是清楚了,总体类图以下:
如NettyClient在使用netty的API开启客户端以后,仍然使用NettyHandler来处理。仍是最终转化成com.alibaba.dubbo.remoting.ChannelHandler接口实现上了。
HeaderExchangeClient和上面的HeaderExchangeServer很是相似,就再也不提了。
咱们能够看到这样集成完成以后,就彻底屏蔽了底层通讯细节,将逻辑所有交给了com.alibaba.dubbo.remoting.ChannelHandler接口的实现上了。从上面咱们也能够看到,该接口实现也会通过层层装饰类的包装,才会最终交给底层通讯。
如HeartbeatHandler装饰类:
public void sent(Channel channel, Object message) throws RemotingException { setWriteTimestamp(channel); handler.sent(channel, message); } public void received(Channel channel, Object message) throws RemotingException { setReadTimestamp(channel); if (isHeartbeatRequest(message)) { Request req = (Request) message; if (req.isTwoWay()) { Response res = new Response(req.getId(), req.getVersion()); res.setEvent(Response.HEARTBEAT_EVENT); channel.send(res); if (logger.isInfoEnabled()) { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); if(logger.isDebugEnabled()) { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); } } } return; } if (isHeartbeatResponse(message)) { if (logger.isDebugEnabled()) { logger.debug( new StringBuilder(32) .append("Receive heartbeat response in thread ") .append(Thread.currentThread().getName()) .toString()); } return; } handler.received(channel, message); }
就会拦截那些上述提到的心跳检测请求。更新该Channel的最后读写时间。
##2.4 同步调用和异步调用的实现
首先设想一下咱们目前的通讯方式,使用netty mina等异步事件驱动的通讯框架,将Channel中信息都分发到Handler中去处理了,Handler中的send方法只负责不断的发送消息,receive方法只负责不断接收消息,这时候就产生一个问题:
客户端如何对应同一个Channel的接收的消息和发送的消息之间的匹配呢?
这也很简单,就须要在发送消息的时候,必需要产生一个请求id,将调用的信息连同id一块儿发给服务器端,服务器端处理完毕后,再将响应信息和上述请求id一块儿发给客户端,这样的话客户端在接收到响应以后就能够根据id来判断是针对哪次请求的响应结果了。
来看下DubboInvoker中的具体实现
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); }
能够看到的是它把ResponseFuture设置到与当前线程绑定的RpcContext中了,若是咱们要获取异步结果,则须要经过RpcContext来获取当前线程绑定的RpcContext,而后就能够获取Future对象。以下所示:
String result1 = helloService.hello("World"); System.out.println("result :"+result1); System.out.println("result : "+RpcContext.getContext().getFuture().get());
当设置成异步请求的时候,result1则为null,而后经过RpcContext来获取相应的值。
而后咱们来看下异步请求的整个实现过程,即上述currentClient.request方法的具体内容:
public ResponseFuture request(Object request, int timeout) throws RemotingException { // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); }catch (RemotingException e) { future.cancel(); throw e; } return future; }
第一步:建立出一个request对象,建立过程当中就自动产生了requestId,以下
public class Request { private final long mId; private static final AtomicLong INVOKE_ID = new AtomicLong(0); public Request() { mId = newId(); } private static long newId() { // getAndIncrement()增加到MAX_VALUE时,再增加会变为MIN_VALUE,负数也能够作为ID return INVOKE_ID.getAndIncrement(); } }
第二步:根据request请求封装成一个DefaultFuture对象,经过该对象的get方法就能够获取到请求结果。该方法会阻塞一直到请求结果产生。同时DefaultFuture对象会被存至DefaultFuture类以下结构中:
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
key就是请求id
第三步:将上述请求对象发送给服务器端,同时将DefaultFuture对象返给上一层函数,即DubboInvoker中,而后设置到当前线程中
第四步:用户经过RpcContext来获取上述DefaultFuture对象来获取请求结果,会阻塞至服务器端返产生结果给客户端
第五步:服务器端产生结果,返回给客户端会在客户端的handler的receive方法中接收到,接收到以后判别接收的信息是Response后,
static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
就会根据response的id从上述FUTURES结构中查出对应的DefaultFuture对象,并把结果设置进去。此时DefaultFuture的get方法则再也不阻塞,返回刚刚设置好的结果。
至此异步通讯大体就了解了,可是咱们会发现一个问题:
当某个线程屡次发送异步请求时,都会将返回的DefaultFuture对象设置到当前线程绑定的RpcContext中,就会形成了覆盖问题,以下调用方式:
String result1 = helloService.hello("World"); String result2 = helloService.hello("java"); System.out.println("result :"+result1); System.out.println("result :"+result2); System.out.println("result : "+RpcContext.getContext().getFuture().get()); System.out.println("result : "+RpcContext.getContext().getFuture().get());
即异步调用了hello方法,再次异步调用,则前一次的结果就被冲掉了,则就没法获取前一次的结果了。必需要调用一次就立马将DefaultFuture对象获取走,以避免被冲掉。即这样写:
String result1 = helloService.hello("World"); Future<String> result1Future=RpcContext.getContext().getFuture(); String result2 = helloService.hello("java"); Future<String> result2Future=RpcContext.getContext().getFuture(); System.out.println("result :"+result1); System.out.println("result :"+result2); System.out.println("result : "+result1Future.get()); System.out.println("result : "+result2Future.get());
最后来张dubbo的解释图片:
#3 通讯层与dubbo的结合
从上面能够了解到如何对不一样的通讯框架进行抽象,屏蔽底层细节,统一将逻辑交给ChannelHandler接口实现来处理。而后咱们就来了解下如何与dubbo的业务进行对接,也就是在什么时机来使用上述通讯功能:
##3.1 服务的发布过程使用通讯功能
如DubboProtocol在发布服务的过程当中:
1 DubboProtocol中有一个以下结构
Map<String, ExchangeServer> serverMap
在发布一个服务的时候会先根据服务的url获取要发布的服务所在的host和port,以此做为key来从上述结构中寻找是否已经有对应的ExchangeServer(上面已经说明)。
2 若是没有的话,则会建立一个,建立过程以下:
ExchangeServer server = Exchangers.bind(url, requestHandler);
其中requestHandler就是DubboProtocol自身实现的ChannelHandler。
获取一个ExchangeServer,它的实现主要是Server的装饰类,依托外部传递的Server来实现Server功能,而本身加入一些额外的功能,如ExchangeServer的实现HeaderExchangeServer,就是加入了心跳检测的功能。
因此此时咱们能够自定义扩展功能来实现Exchanger。接口定义以下:
@SPI(HeaderExchanger.NAME) public interface Exchanger { @Adaptive({Constants.EXCHANGER_KEY}) ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException; @Adaptive({Constants.EXCHANGER_KEY}) ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException; }
默认使用的就是HeaderExchanger,它建立的ExchangeServer是HeaderExchangeServer以下所示:
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
HeaderExchangeServer仅仅是一个Server接口的装饰类,须要依托外部传递Server实现来完成具体的功能。此Server实现能够是netty也能够是mina等。因此咱们能够自定义Transporter实现来选择不一样底层通讯框架,接口定义以下:
@SPI("netty") public interface Transporter { @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind(URL url, ChannelHandler handler) throws RemotingException; @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException; }
默认采用netty实现,以下:
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } }
至此就到了咱们上文介绍的内容了。同时DubboProtocol的ChannelHandler实现通过层层装饰器包装,最终传给底层通讯Server。
客户端发送请求给服务器端时,底层通讯Server会将请求通过层层处理最终传递给DubboProtocol的ChannelHandler实现,在该实现中,会根据请求参数找到对应的服务器端本地Invoker,而后执行,再将返回结果经过底层通讯Server发送给客户端。
##3.2 客户端的引用服务使用通讯功能
在DubboProtocol引用服务的过程当中:
1 使用以下方式建立client
ExchangeClient client=Exchangers.connect(url ,requestHandler);
requestHandler仍是DubboProtocol中ChannelHandler实现。
和Server相似,咱们能够经过自定义Exchanger实现来建立出不一样功能的ExchangeClient。默认的Exchanger实现是HeaderExchanger
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
建立出来的ExchangeClient是HeaderExchangeClient,它也是Client的包装类,仅仅在Client外层加上心跳检测的功能,向它所链接的服务器端发送心跳检测。
HeaderExchangeClient须要外界给它传一个Client实现,这是由Transporter接口实现来定的,默认是NettyTransporter
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } }
建立出来的的Client实现是NettyClient。
同时DubboProtocol的ChannelHandler实现通过层层装饰器包装,最终传给底层通讯Client。
客户端的DubboInvoker调用远程服务的时候,会将调用信息经过ExchangeClient发送给服务器端,而后返回一个ResponseFuture,根据客户端选择的同步仍是异步方式,来决定阻塞仍是直接返回,这一部分在上文同步调用和异步调用的实现中已经详细说过了。
#4 结束语
本篇文章主要介绍了集成netty和mina的那一块的通讯接口及实现的设计,下篇主要介绍编解码的过程
欢迎关注微信公众号:乒乓狂魔