dubbo-源码分析Provider

Dubbo provider启动原理:java

当咱们的dubbo启动咱们的spring容器时spring 初始化容器的时候会查找META-INF/spring.handles文件查找对应的NamespaceHandle,dubbo在其jar包下配置了DubboNamespaceHandle,该类下有如下配置项:redis

registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true)); registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());

意思就是当spring 在解析容器的时候遇到指定配置会使用对应的Parser去解析配置项。spring

provider

咱们提供者主要会配置对应的application、registry、protocol、service因此咱们一个一个来看,咱们先来看service 根据上面所说,咱们配置了<dubbo:service>这个配置项的话就会生成ServiceBean对象,并注册到容器里,那咱们来看下serviceBean这个类:apache

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware

 

该类主要实现了上面几个接口,咱们来看其中最主要的InitializingBean,该类会在类实例化后调用其中的afterPropertiesSet方法 ,因此咱们来看下:app

public void afterPropertiesSet() throws Exception { ... //上面一大堆代码都是判空去从新赋值的代码,咱们不关注他们,最主要是下面这个export方法
    if (!supportedApplicationListener) { export(); } }

 

public synchronized void export() { checkAndUpdateSubConfigs(); ​ if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } ​ //当设置了延时发布时用定时器延时发布
    if (delay != null && delay > 0) { delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS); } else { //不然的话直接发布
 doExport(); } }

 

export方法最主要是判断是有配置了延时发布,是的话就用schedule去延时发布,否的话doExport发布,在spring中真正干活的都是do开头的方法,咱们再继续查看doExport方法dom

 1 protected synchronized void doExport() {  2     if (unexported) {  3         throw new IllegalStateException("Already unexported!");  4  }  5     if (exported) {  6         return;  7  }  8     exported = true;  9 10     if (path == null || path.length() == 0) { 11         path = interfaceName; 12  } 13     //生成惟一serviceName group/interfaceClass 如group/com.xx.xxx 14     //ref 接口实现bean 服务的真正提供者 15     //interfaceClass 须要发布的接口服务
16     ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass); 17  ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); 18     //发布url
19  doExportUrls(); 20 }

 

这里主要查看doExportUrls()方法,上面的是把服务信息存到本地map里:jvm

@SuppressWarnings({"unchecked", "rawtypes"}) private void doExportUrls() { //这里会获取到注册中心的列表若有配置多个的话, // 格式:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=user-service&dubbo=2.0.2&pid=16232&registry=zookeeper&release=2.7.0&timestamp=1553883173387
    List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }

 

因此这里说明dubbo是支持多协议的多注册中心的,提早预告,下面这个doExportUrlsFor1Protocol方法会很长很复杂:ide

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { //获取协议的名称如dubbo
    String name = protocolConfig.getName(); if (name == null || name.length() == 0) { //默认是dubbo
        name = Constants.DUBBO; } ​ Map<String, String> map = new HashMap<String, String>(); //组装map 再填充到url上
 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); appendRuntimeParameters(map); appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); //这里省略了一堆设置url的代码,主要是把接口的配置方法加到参数列表里若是method 重试次数
 .... if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } ​ String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } } //是否injvm也就是本地发布不上注册中心
    if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // export service
    String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } ​ //获取绑定的有效IP地址
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map); //得到一个绑定端口
    Integer port = this.findConfigedPorts(protocolConfig, name, map); ​ //建立一个url //dubbo://192.168.1.2:20882/com.lin.service.UserService?anyhost=true&application=user-service // &bean.name=com.lin.service.UserService&bind.ip=192.168.1.2&bind.port=20882&dubbo=2.0.2 // &generic=false&group=userGroup&interface=com.lin.service.UserService&methods=add,findUserByName,findUserById // &pid=16232&release=2.7.0&side=provider&timestamp=1553884198155
    URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); ​ if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } ​ String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured //当scope为none的时候不发布
    if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) { ​ // export to local if the config is not remote (export to remote only when config is remote) //当scope不是remote的时候发布本地服务
        if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) //当scope不是local的时候发布远程服务
        if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && !registryURLs.isEmpty()) { //当注册中心有多个的时候会发布到多个注册中心
                for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); //监控url
                    URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } ​ // For providers, this is used to enable custom proxy to generate invoker
                    String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } ​ //proxyFactory是一个自适应的扩展点,因此是一个proxyFactory$Adaptive //默认会有这几个 //stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper //jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory //javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory //当url里有proxy=xxx的时候就取xxxProxyFactory,若是没有的话默认就是JavassistProxyFactory
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //建立一个Invoker的包装类DelegateProviderMetaDataInvoker
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); //protocol是一个自动扩展点,因此会返回一个ivoker.getURL().protocol/Protocol的一个对象 如RegistryProtocol、DubboProtocol
                    Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); ​ Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } /** * @since 2.7.0 * ServiceData Store */ MetadataReportService metadataReportService = null; if ((metadataReportService = getMetadataReportService()) != null) { //上报元数据中心
 metadataReportService.publishProvider(url); } } } this.urls.add(url); }

 

这里涉及到一个自适应扩展点的概念,具体什么是自适应扩展点能够到dubbo官网上看,那里介绍的很详细this

由于这里要注册的URL是registry://192.168.xxxx这样格式的,又由于protocol又是一个Protocol自适应扩展点url

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); ​ ​ Exporter<?> exporter = protocol.export(wrapperInvoker);

 

因此咱们能获得protocol.export这里里面是调用的RegistryProtocol里面的export方法,因此咱们再来看下这个方法:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //得到注册地址 //zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=user-service...
    URL registryUrl = getRegistryUrl(originInvoker); // url to export locally //获取提供者发布url //dubbo://192.168.1.2:20882/com.lin.service.UserService?anyhost=true&application=user-service&bean.name=...
    URL providerUrl = getProviderUrl(originInvoker); ​ // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); ​ providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //export invoker //本地发布服务 也就是服务发布到netty容器里
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); ​ // url to registry //得到注册中心地址
    final Registry registry = getRegistry(originInvoker); //得到要注册的提供者URL
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish
    boolean register = registeredProviderUrl.getParameter("register", true); if (register) { //注册到注册中心
 register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } ​ // Deprecated! Subscribe to override rules in 2.6.x or before. //订阅url
 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); ​ exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter); }

 

咱们一步一步来分析上面的代码,首先我这边的注册中心用的是zookeeper因此拿到的注册地址是zookeeper协议的,服务提供者也是用的默认的dubbo协议,那咱们下一步来看下服务真正发布的方法doLocalExport方法:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { ​ final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl); //这里protocol又是一个自适应扩展点,因此里面会调用invoker.getUrl.getProtocol+"Protocol"的export()方法 //如 DubboProtocol.export();
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }

 

一开始就是一个双重检查锁,咱们无论他,直接关注咱们的protocol.export方法,跟上面的同样,protocol也是一个自适应扩展点,因此里面实际用的是咱们invoker.getUrl.getProtocol+"Protocol"的export()方法,咱们invoker的url呢又是咱们上面包装进去的providerUrl也就是dubbo://xxxx这个url,因此最终调用的就是DubboProtocol的export方法:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); ​ // export service.
    String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); ​ //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } ​ openServer(url); optimizeSerialization(url); return exporter; }

 

上面那些代码咱们不关心先不看了,咱们直接能够看到有一个openServer(url)的方法,根据名字咱们就能够猜到这里就是开启咱们服务的地方了,咱们来看下:

private void openServer(URL url) { // find server.
    String key = url.getAddress(); //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { synchronized (this) { server = serverMap.get(key); if (server == null) { //建立服务
 serverMap.put(key, createServer(url)); } } } else { // server supports reset, use together with override
 server.reset(url); } } }
private ExchangeServer createServer(URL url) { ... url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } ... return server; }

 

咱们能够看到这里Exchangers.bind(url,requestHandler),这里呢最终会调到:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } //这里getTransporter也是得到一个自适应的扩展点,若是没有配置的话默认就是用的NettyTransporter
    return getTransporter().bind(url, handler); }

 

再里面就是发布到Netty容器了,有兴趣的能够本身去看下,如今这里咱们的服务就已经发布了,下面还有注册到注册中心,咱们再看下咱们上面的RegistryProtocol的export方法里面的注册服务的代码:

// url to registry //得到注册中心地址
final Registry registry = getRegistry(originInvoker); //得到要注册的提供者URL
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true); if (register) { //注册到注册中心
 register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); }

 

主要仍是register这个方法:

public void register(URL registryUrl, URL registeredProviderUrl) { //由于registryFacotry是一个自适应的扩展点,因此会返回一个zookeeperRegistry,若是是redis://的话就返回一个RedisRegistry
    Registry registry = registryFactory.getRegistry(registryUrl); //注册到注册中心
 registry.register(registeredProviderUrl); }

 

根据环境咱们会得到一个ZookeeperRegistry因此咱们再看下zookeeperRegistry的register方法:

由于register这个方法zookeeperRegistry并无去实现它,因此必定是在父类的register咱们继续看他父类FailbackRegistry的register方法:

public void register(URL url) { super.register(url); removeFailedRegistered(url); removeFailedUnregistered(url); try { // Sending a registration request to the server side //注册服务 是模版模式,因此会在子类实现
 doRegister(url); } catch (Exception e) { Throwable t = e; ​ // If the startup detection is opened, the Exception is thrown directly.
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } ​ // Record a failed registration request to a failed list, retry regularly
 addFailedRegistered(url); } }

 

这方法主要注册的就是doRegister方法了,由于是模版模式,因此这个方法zookeeperRegistry本身实现了这个方法:

@Override public void doRegister(URL url) { try { //建立一个临时节点
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }

 

因此这里就是最终注册的地方了,这里会根据url去建立一个服务的临时节点,到这服务的发布和注册就已经完成了,其余地方有兴趣的能够本身去看下源码,dubbo里不少地方都用到了自适应扩展点这个概念,因此若是要看源码就要先去理解什么是自适应扩展点。

相关文章
相关标签/搜索