Dubbo的服务注册与发布

Dubbo的服务注册与发布的入口来源于dubbo-config模块中的dubbo-config-springjava

基于 dubbo.jar 内的 META-INF/spring.handlers 配置,Spring 在遇到 dubbo 名称空间时,会回调 DubboNamespaceHandlerspring

全部 dubbo 的标签,都统一用 DubboBeanDefinitionParser 进行解析,基于一对一属性映射,将 XML 标签解析为 Bean 对象。apache

spring.handlers:json

http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler

DubboNamespaceHandler#init:缓存

 

 // 须要重点关注如下两行代码   

 registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));     registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); 
ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
        ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
        ApplicationEventPublisherAware
public interface InitializingBean {
		// bean初始化的时候被调用
    void afterPropertiesSet() throws Exception;
		}
DisposableBean,
	public interface DisposableBean {
// 销毁
	void destroy() throws Exception;
	}

	
ApplicationListener<ContextRefreshedEvent>, 
	// 监听 (事件发送器发布的),
	public interface ApplicationListener<E extends ApplicationEvent> extends 			EventListener {
		void onApplicationEvent(E var1);
	}

BeanNameAware,
	public interface BeanNameAware extends Aware {
		 void setBeanName(String var1);
	}

ApplicationEventPublisherAware    
	// 事件发送器
	public interface ApplicationEventPublisherAware extends Aware {
		  void setApplicationEventPublisher(ApplicationEventPublisher var1);
	}



// 实现(spring上下文刷新的时候触发)
public void onApplicationEvent(ContextRefreshedEvent event) {
  if (!this.isExported() && !this.isUnexported()) {
      if (logger.isInfoEnabled()) {
          logger.info("The service ready on spring started. service: " + this.getInterface());
      }

      this.export(); // 发布并导出
  }

}
// super
public synchronized void export() {
  this.checkAndUpdateSubConfigs();
 // 可经过@Service(delay = 1000,export = false)配置
   if (!shouldExport()) { // 当前服务是否须要发布
            return;
        }

        if (shouldDelay()) { // 是否须要延迟发布
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            doExport(); // 后续会走到doExportUrls()方法
        }
}
private void doExportUrls() {
		// registry://ip:port/com.xxx.xxxService?.....
 /**
     *  registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?			application=springboot-						dubbo&dubbo=2.0.2&pid=6672&qos.enable=false&registry=zookeeper&release=2.7.3&ti			mestamp=1571066801904
     *  registryURLs的值如上所示
     * loadRegistries()方法中是在组装这样一个URL地址

     * // 默认
     * <dubbo:protocol name="dubbo" valid="true" id="dubbo" prefix="dubbo.protocols." />
  **/
  List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }

}
// 方法作了简化
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {

// 当前服务发布的主机地址
host = this.findConfigedHosts(protocolConfig, registryURLs, map);
 // 服务端口号
  Integer port = this.findConfigedPorts(protocolConfig, name, map);
 // 组成最终的URL
 
  URL url = new URL(name, host, port, (String)this.getContextPath(protocolConfig).map((p) -> {
      return p + "/" + this.path;
  }).orElse(this.path), map);
 
}
 // scope: local/remote两种
// 若是服务的调用都在同一个jvm中完成走本地调用(此时不须要走远程调用)
 String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
               // 非远程则走本地服务
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                
                 // 远程
                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
                       /**
                         *   @Adaptive({PROXY_KEY})
                         *   <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
                         *   ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
                         *   自适应扩展
                         *   最终必定会生成一个ProxyFactory$Adaptive#getInvoker // 默认使用javassist
                         *
                         *   e.g.:
                         *   package org.apache.dubbo.rpc;
                         * import org.apache.dubbo.common.extension.ExtensionLoader;
                         * public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
                         * public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache
                         * .dubbo.rpc.RpcException {
                         * if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker
                         * argument == null");
                         * if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc
                         * .Invoker argument getUrl() == null");
                         * org.apache.dubbo.common.URL url = arg0.getUrl();
                         * String extName = url.getParameter("proxy", "javassist");
                         * if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache
                         * .dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
                         * org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)
                         * ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension
                         * (extName);
                         * return extension.getProxy(arg0);
                         * }
                         * public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws
                         * org.apache.dubbo.rpc.RpcException {
                         * if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker
                         * argument == null");
                         * if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc
                         * .Invoker argument getUrl() == null");
                         * org.apache.dubbo.common.URL url = arg0.getUrl();
                         * String extName = url.getParameter("proxy", "javassist");
                         * if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache
                         * .dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
                         * org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)
                         * ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension
                         * (extName);
                         * return extension.getProxy(arg0, arg1);
                         * }
                         * public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class
                         * arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
                         * if (arg2 == null) throw new IllegalArgumentException("url == null");
                         * org.apache.dubbo.common.URL url = arg2;
                         * String extName = url.getParameter("proxy", "javassist");
                         * if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache
                         * .dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
                         * org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)
                         * ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension
                         * (extName);
                         * return extension.getInvoker(arg0, arg1, arg2);
                         * }
                         * }
                         *
                         */
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        /**
                         * wrapperInvoker = RegistryProtocol
                         * protocol =  ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
                         * protocol是自适应扩展点所以会生成一个Protocol$Adaptive的代理类
                         * 调用的export方法是基于方法级别的适配
                         * @Adaptive
                         * <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
                         * ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry") -- > 会获得RegistryProtocol
                         */

                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                }
        }
RegistryProtocol:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        /**
         *e.g.:
         * zookeeper:ip:2181/org.apache.dubbo.registry.ReigistryService?....
         */
        URL registryUrl = getRegistryUrl(originInvoker);

        // url to export locally
        /**
         * dubbo://ip:2181/com.xxx.xxxService?...
         */
        URL providerUrl = getProviderUrl(originInvoker);

       

        //export invoker
        // 启动一个netty服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // url to registry
        // 将dubbo://...的URL注册到zookeeper上
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        //
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            /**
             * 实现服务的注册
             * registryUrl => zookeeper://ip:port/org.apache.dubbo.RegistryService?...
             * 
             */
            register(registryUrl, registeredProviderUrl); 
            // registeredProviderUrl => dubbo:ip:port/com.xxx.xxxService?...
            providerInvokerWrapper.setReg(true);
        }

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

public void register(URL registryUrl, URL registeredProviderUrl) {
        /**
         *  @SPI("dubbo")
         *  public interface RegistryFactory
         *  RegistryFactory被SPI标注因此必定会有一个以RegistryFactory命名的扩展文件
         *  经过@Adaptive({"protocol"})注解标注在方法上,因此这里必定会生成一个RegistryFactory$Adaptive的代理类
         *  由于咱们这边使用zookeeper做为注册中心最终会使用zk的工厂,不过这里没有zk的实现,因此会找到抽象工厂AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL);
         *
         *  Registry getRegistry(URL url);
         */
        Registry registry = registryFactory.getRegistry(registryUrl);
        // zk => ZookeeperRegistry extends FailbackRegistry ->FailbackRegistry#register
        registry.register(registeredProviderUrl);
    }


public Registry getRegistry(URL url) {
       //create registry by spi/ioc
            // zookeeper
            registry = createRegistry(url);
    }

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    /**
     * 经过set方法注入一个对象
     */
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

   // 
    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}
@SPI("curator") // 扩展点

public interface ZookeeperTransporter {
    // 仍是自适应扩展点,那么确定能找到这个文件META-INF/dubbo/internal/org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter
    // curator=org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperTransporter
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url); // 在这里完成注册

}
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
//           originInvoker-> DelegateProviderMetaDataInvoker
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            //protocol-> Protocol$Adaptive -> QosProtocolWrapper(ProtocolListenerWrapper(ProtocolFilterWrapper(DubboProtocol(invoker))))
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

DubboProtocol:springboot

private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            // 缓存,一个key对应一个ExchangeServer
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        // 建立服务
                        serverMap.put(key, createServer(url));
                    }
                }
            } 
        }
    }

private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        // 获取当前应该采用何种方式发布服务,netty3,netty4,mina,grizzy(Transporters类的实现)
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            // 绑定服务
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return server;
    }

   public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
       

        /**
         * 自适应扩展点,默认= HeaderExchanger
         *
         */
        return getExchanger(url).bind(url, handler);
    }


    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    public static Exchanger getExchanger(String type) {
        // 基于扩展点实现,在META-INF/dubbo/internal/能够找到Exchanger
//        header=org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }


// org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind

 public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        /**
         *  建立一个链 new DecodeHandler(new HeaderExchangeHandler(handler))
         *  经过Transporters.bind绑定一个服务
         *  new HeaderExchangeServer
         */
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

// org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)

  public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        /**
         * getTransporter() => ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
         *  @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
         *  Server bind(URL url, ChannelHandler handler) throws RemotingException;
         *  经过方法层面的自适应扩展点生成Transporter$Adapter的代理类
         *  bind => Transporter$Adapter#bind, => 默认采用netty4
         */
        return getTransporter().bind(url, handler);
    }
相关文章
相关标签/搜索