Dubbo Provider启动流程源码分析

简单的官方demo:

provider的java代码:php

public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"});
        context.start();

        System.in.read(); // 按任意键退出
    }

provider的spring配置:html

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- 提供方应用信息,用于计算依赖关系 -->
    <dubbo:application name="demo-provider"/>

    <!-- 使用multicast广播注册中心暴露服务地址 -->
    <dubbo:registry address="multicast://224.5.6.7:1234"/>

    <!-- 用dubbo协议在20880端口暴露服务 -->
    <dubbo:protocol name="dubbo" port="20880"/>

    <!-- 和本地bean同样实现服务 -->
    <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>

    <!-- 声明须要暴露的服务接口 -->
    <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>

</beans>

经过配置文件,以及spring的mvc初始化流程,咱们能够假设下服务启动流程:
1. spring配置文件的加载以及初始化
2. bean的单例生成
3. dubbo管理全部服务实现单例对象
4. 向配置中心注册服务信息java

spring自定义标签
dubbo标签初始化实现了Spring提供的NamespaceHandler接口,因此下面先看看DubboNamespaceHandler类:web

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    public void init() {
        // DubboBeanDefinitionParser定义了如何解析dubbo节点信息
        // DubboBeanDefinitionParser的第一个参数是beanclass

        // 应用相关配置
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        // 模块相关配置
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        // 注册中心相关配置
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        // 
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        // 服务提供者配置
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        // 服务消费者配置
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        // 网络协议相关配置
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        // 服务bean配置
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        // 
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        //
        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
    }

}

在DubboBeanDefinitionParser中的parse中,解析设置了大部分配置信息以及服务信息。
咱们能够关注下其中beanclass的源码,由于这章主要分析的是provider,这里从provider进行分析:
首先是beanName叫作ServiceBean的bean实例。spring

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {
    ...
    // 初始化bean的时候执行
    public void afterPropertiesSet() throws Exception {
        ...// 初始化各类配置
        // 发布服务
        if (!isDelay()) {
            export();
        }
    }
}

发布代码:缓存

public synchronized void export() {
        ...
        // 延迟导出
        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }

导出代码:网络

protected synchronized void doExport() {
        ...
        doExportUrls();
    }
    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

    // method => invoker => exporter
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        ...
        // 导出服务
        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }

        // 注册中心能够是zk,consul等
        // 注册中心host
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        // 注册中心port
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        // 获取暴露范围配置
        String scope = url.getParameter(Constants.SCOPE_KEY);
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
            // 若是暴露本地服务
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // 若是暴露远端服务,走服务发现流程
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        // 注册地址
                        URL monitorUrl = loadMonitor(registryURL);
                        // 动态代理,将class+method包装位invoker,ref是服务的具体实例对象obj,invoker是个可执行对象
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        // invoker=>exporter,最终在服务端保存下来的是exporter,对服务的暴露和引用都是经过这个对象实现的,而这个对象的实现由协议决定
                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

invoker 生成,动态代理的过程

定义代码:mvc

ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

@SPI("javassist") //使用javassist字节码技术生成对象
public interface ProxyFactory {

    /** * create proxy. * 定义生成代理对象的方法 * * @param invoker * @return proxy */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    /** * create invoker. * getProxy调用的参数生成 * * @param <T> * @param proxy * @param type * @param url * @return invoker */
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

// jdk 动态代理
public class JdkProxyFactory extends AbstractProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }

}

// javassist动态代理
public class JavassistProxyFactory extends AbstractProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper类不能正确处理带$的类名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

public abstract class AbstractProxyInvoker<T> implements Invoker<T> {

    private final T proxy; //在proxyFactory.getInvoker的时候被设置,即

    private final Class<T> type;

    private final URL url;

    public Result invoke(Invocation invocation) throws RpcException {
        try {
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } catch (InvocationTargetException e) {
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    // 调用的是jdkproxyfactory和javassistproxyfactory定义的方法
    protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;

}

实际函数调用:app

// 泛型调用
public class ExtensionLoader<T> {
    // 缓存
    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
    // class->value
    private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();

    // 获取loader,优先缓存
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }
    // 每一个class绑定了一个value,获取这个value,优先缓存
    public T getAdaptiveExtension() {
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError == null) {
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                    }
                }
            }
        }
        return (T) instance;
    }
}

exporter 的生成

抽象类ide

public abstract class AbstractProtocol implements Protocol { // 一个协议对应着多个exporter protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>(); // 对应着一堆invoker protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>(); }

对应的实现类
以thrift做为交换数据协议为例

public class ThriftProtocol extends AbstractProtocol {

    // thrift port
    public static final int DEFAULT_PORT = 40880;

    // 对应的数据交换方
    // ip:port -> ExchangeServer
    private final ConcurrentMap<String, ExchangeServer> serverMap =
            new ConcurrentHashMap<String, ExchangeServer>();

    // 服务发布的函数调用
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 只能使用 thrift codec
        URL url = invoker.getUrl().addParameter(Constants.CODEC_KEY, ThriftCodec.NAME);
        // find server.
        String key = url.getAddress();
        //client 也能够暴露一个只有server能够调用的服务。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer && !serverMap.containsKey(key)) {
            serverMap.put(key, getServer(url));
        }
        // export service.
        key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        // 缓存起来
        exporterMap.put(key, exporter);

        return exporter;
    }

    public void destroy() {
        // 销毁invoker
        super.destroy();
        // 移除消费ip,关闭server
        for (String key : new ArrayList<String>(serverMap.keySet())) {
            ExchangeServer server = serverMap.remove(key);
            if (server != null) {
                server.close(getServerShutdownTimeout());
            } // ~ end of if ( server != null )
        } // ~ end of loop serverMap
    } // ~ end of method destroy

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        ThriftInvoker<T> invoker = new ThriftInvoker<T>(type, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }
}