dubbo中的切换不一样transport怎么作到的?处理消息的handler链是咋样的?在哪里唤醒以前阻塞在发送request之后的业务线程?

server和client都是如下方法获得的,Exchanger这个接口只有这么一个实现,未来可能其余更加复杂得到server和cliet方式,如下这种是目前惟一的异步

 

 

 

 

 

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

  



 


connect和 bind获得的最终的server和client,Transporters.connect调用方法是如下两个:







public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
    }

    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

  

 

 

 

经过spi的自适应扩展做为生产实例的中间工厂,这个工厂根据url参数获得不一样的transport,若是url里面指定netty4,那么就能够获得netty4的client
切换不一样transport怎么作到的?那就是经过自适应扩展加url自由切换。

回到最上面的,经过bind已经拿到最终的nettyserver,继续包裹了一层HeaderExchangeServer,这里面主要处理心跳、channel、future的封装,以及屏蔽不一样类型的server(netty、netty4等等)。



return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
继续看这里,这个handler很长,对于dubbo来讲,最里面这个handler是经过CreateServer方法中的server = Exchangers.bind(url, requestHandler)这个传入进来的,
这个requestHandler就是dubboprotocl里面内部的,这个handler只有reply方法,做用就是执行doinvoe,也就是真正履行provider义务的地方,也只会在request来的时候才会被调用。

HeaderExchangeHandler用来处理request、response的,decodehandler用来解码。这handler到这里是否是已经结束了?显然不是,netty4server初始化的时候:





public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}


protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
            .getAdaptiveExtension().dispatch(handler, url)));
}

  




日志里面常常看到的 “DubboServerHandler”就是这个SERVER_THREAD_POOL_NAME。handler链上又有下面这几个:
MultiMessageHandler:针对multimessage类型消息,在received作拦截。
HeartbeatHandler:针对心跳在received作拦截。
经过dispatch出来的默认的allchannelhandler:对全部io事件作处理,大部分任务都扔到线程池里面作异步处理,防止阻塞netty线程。这个业务线程池的类型、个数也是url指定。

因此这个handler链路上面从外到内:MultiMessageHandler HeartbeatHandler Allchannelhandler DecodeHandler HeadExchangeHandler dubboprotocol里面自带的带有reply方法的handler。

对于netty来讲,消息在到达这些handler处理之前,已经被netty的编解码handler处理了,因此DecodeHandler无关紧要

HeadExchangeHandler看起来意义不大,其实这个是用来阻断传递到dubbo里面的handler的,它是最后一道防线,用来决定要不要丢给dubbo作reply、doinvoke操做的,最重要的方法:




public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

  

若是two-way、对方给我request,须要返回response,那么handleRequest会调用dubboprotocol的reply处理这个request
若是对方给个人response,那么调用handleResponse(channel, (Response) message);
static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

  

 

 

public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

 

 

 收到response之后,在这里找到对应的future,经过future唤醒以前阻塞在发送request之后的业务线程:ide

 

private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
相关文章
相关标签/搜索