【Dubbo源码阅读系列】服务暴露之远程暴露

引言

什么叫 远程暴露 ?试着想象着这么一种场景:假设咱们新增了一台服务器 A,专门用于发送短信提示给指定用户。那么问题来了,咱们的 Message 服务上线以后,应该如何告知调用方服务器,服务器 A 提供了 Message 功能?那么咱们是否是能够把目前已提供的服务暴露在一个地方,让调用方知道某台机器提供了某个特定功能?带着这样的假设,咱们今天就来聊聊 Dubbo 服务暴露之远程暴露!!java

服务远程暴露

先回顾一下上篇文章,上篇文章咱们聊到了 ServiceConfig 的 export() 方法,而且对服务的本地暴露内容进行了分析,今天咱们接着这块内容讲讲服务暴露之远程暴露。apache

// export to remote if the config is not local (export to local only when config is local)
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    ...
    if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
        if (logger.isInfoEnabled()) {
            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
        }
        if (registryURLs != null && !registryURLs.isEmpty()) {
            for (URL registryURL : registryURLs) {
                url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                // 为了帮助你们阅读,省略部分代码...
                Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
        } else {
            ...
        }
    }
    ...
}
复制代码

这里咱们只关注核心代码:bootstrap

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
复制代码

invoker 对象的构建

先来看看 invoker 对象是怎么建立的!这里涉及到了 Dubbo SPI 机制,调用流程大体为
StubProxyFactoryWrapper.getInvoker() ==> JavassistProxyFactory.getInvoker()
详细看下 JavassistProxyFactory 类的 getInvoker 方法缓存

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
	// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
	final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
	return new AbstractProxyInvoker<T>(proxy, type, url) {
		@Override
		protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
			return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
		}
	};
}
复制代码

值得咱们重点注意的是 Wrapper 类的 getWrapper() 方法!!bash

public static Wrapper getWrapper(Class<?> c) {
	while (ClassGenerator.isDynamicClass(c)) // can not wrapper on dynamic class.
	{
		c = c.getSuperclass();
	}

	if (c == Object.class) {
		return OBJECT_WRAPPER;
	}

	Wrapper ret = WRAPPER_MAP.get(c);
	if (ret == null) {
		ret = makeWrapper(c);
		WRAPPER_MAP.put(c, ret);
	}

	return ret;
}
复制代码

这里会使用参数 c 做为 key 值从 WRAPPER_MAP 缓存中取值,若是没有对应的 value 值,会调用 makeWrapper() 方法借助 javassist 技术构建一个 Wrapper 包装类。假设当前参数 c 的值为 demoService,那么最后生成的动态类为:服务器

public class Wrapper0 extends Wrapper implements DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    public Wrapper0() {
    }

    public Class getPropertyType(String var1) {
        return (Class)pts.get(var1);
    }

    public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
        DemoService var5;
        try {
            var5 = (DemoService)var1;
        } catch (Throwable var8) {
            throw new IllegalArgumentException(var8);
        }

        try {
            if("sayHello".equals(var2) && var3.length == 1) {
                return var5.sayHello((String)var4[0]);
            }
        } catch (Throwable var9) {
            throw new InvocationTargetException(var9);
        }

        throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class org.apache.dubbo.demo.DemoService.");
    }

    public String[] getPropertyNames() {
        return pns;
    }

    public Object getPropertyValue(Object var1, String var2) {
        try {
            DemoService var3 = (DemoService)var1;
        } catch (Throwable var5) {
            throw new IllegalArgumentException(var5);
        }

        throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class org.apache.dubbo.demo.DemoService.");
    }

    public void setPropertyValue(Object var1, String var2, Object var3) {
        try {
            DemoService var4 = (DemoService)var1;
        } catch (Throwable var6) {
            throw new IllegalArgumentException(var6);
        }

        throw new NoSuchPropertyException("Not found property \"" + var2 + "\" field or setter method in class org.apache.dubbo.demo.DemoService.");
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public boolean hasProperty(String var1) {
        return pts.containsKey(var1);
    }

    public String[] getMethodNames() {
        return mns;
    }
}
复制代码

最后再回到 JavassistProxyFactory 类的 getInvoker 方法,能够看到它实际返回的是 AbstractProxyInvoker 对象,当调用 AbstractProxyInvoker 类的 doInvoke() 方法时,实际调用的是 wrapper 类的 invokeMethod() 方法!这个知识点十分重要!在咱们讲 Dubbo 远程调用的时候会再次回顾这块内容!网络

exporter 对象的构建

Exporter<?> exporter = protocol.export(wrapperInvoker);
复制代码

再来看看后半句代码。这里最后会调用 RegistryProtocol 类的 export() 方法,若对此有疑问请看系列文章第一篇:【Dubbo源码阅读系列】之 Dubbo SPI 机制,后文再也不赘述。 直接看看 RegistryProtocol 的 export() 方法:app

RegistryProtocol.export()

public class RegistryProtocol implements Protocol {
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    	//export invoker
    	final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
    
    	URL registryUrl = getRegistryUrl(originInvoker);
    
    	//registry provider
    	final Registry registry = getRegistry(originInvoker);
    	final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
    
    	//to judge to delay publish whether or not
    	boolean register = registeredProviderUrl.getParameter("register", true);
    
    	ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
    
    	if (register) {
    		register(registryUrl, registeredProviderUrl);
    		ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
    	}
    
    	// Subscribe the override data
    	// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
    	final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
    	final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    	overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
    	registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
    	//Ensure that a new exporter instance is returned every time export
    	return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
    }
}
复制代码

RegistryProtocol.export() 方法很是重要!!能够说是服务远程暴露的核心了。废话很少说,让咱们逐行来看看吧!tcp

doLocalExport()

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
	// 获取 providerUrl ,取 originInvoker url.parameters 键值对中 key 为 export 的值
	String key = getCacheKey(originInvoker);
	ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
	if (exporter == null) {
		synchronized (bounds) {
			exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
			if (exporter == null) {
				final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
				exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
				bounds.put(key, exporter);
			}
		}
	}
	return exporter;
}
复制代码

先来看看 doLocalExport() 方法作了什么:ide

  1. 从 getCacheKey() 方法中获取到的,键 export 对应的 value 在以下代码中被添加到 url 的 parameters 集合中。而后咱们在这里取出对应的值。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
复制代码
  1. 尝试从 bounds 缓存中取对应当前键 key 的 exporter。
  2. 若是缓存为 null,新建 exporter 并返回。这里的 protocl 对象为 Protocol$Adaptive。不难分析最后执行的实际是 DubboProtocol 的 export() 方法。

总结一下:doLocalExport() 用 ExporterChangeableWrapper 代理类包装了 protocol.export() 方法返回的 exporter 对象,最后放到了 bounds 集合中缓存。

DubboPrtocol.export()

DubboProtocol.java
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
	URL url = invoker.getUrl();

	// export service.
	String key = serviceKey(url);
	DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
	exporterMap.put(key, exporter);

	//export an stub service for dispatching event
	Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
	Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
	if (isStubSupportEvent && !isCallbackservice) {
		String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
		if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
			if (logger.isWarnEnabled()) {
				logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
						"], has set stubproxy support event ,but no stub methods founded."));
			}
		} else {
			stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
		}
	}

	openServer(url);
	optimizeSerialization(url);
	return exporter;
}
复制代码

接着上文继续看 DubboProtocol.export() 方法是如何建立 exporter 对象的:

  1. 调用 serviceKey() 方法构建服务的 key 值,最后的得到的 key 值形式相似 group/path:version:port
  2. 新建 DubboExporter
  3. openServer(url),此时的 url 为 RegistryProtocol 传递过来的 providerUrl,openServer() 用途咱们在后文分析;
  4. optimizeSerialization(url) 序列化操做,本文不作具体分析 DubboProtocol.export() 返回的对象为 DubboExporter。值得咱们注意是后面的 openServer() 方法!

openServer()

private void openServer(URL url) {
	// find server.
	String key = url.getAddress();
	//client can export a service which's only for server to invoke
	boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
	if (isServer) {
		ExchangeServer server = serverMap.get(key);
		if (server == null) {
			synchronized (this) {
				server = serverMap.get(key);
				if (server == null) {
					serverMap.put(key, createServer(url));
				}
			}
		} else {
			// server supports reset, use together with override
			server.reset(url);
		}
	}
}
复制代码

openServer() 光从方法名看起来像是开启服务链接的。方法比较简单,取 url 的 address 做为 key,尝试从 serverMap 获取对应的 value 值。若是 value 值为 null 则调用 createServer(url) 方法建立 server 后添加到 serverMap 中。 createServer() 方法的流程比较冗长,咱们这里经过一张时序图来给出该方法内部调用流程:

上图省略了从 ServiceConfigRegistryProtocol 以及从 RegistryProtocolDubboProtocol 的转换过程。这部份内容涉及到 Dubbo SPI 机制,若有疑问能够详见:【Dubbo源码阅读系列】之 Dubbo SPI 机制。这里给出简单的转换流程

  • ServiceConfig 到 RegistryProtocol
    Protocol$Adaptive ==》ProtocolFilterWrapper ==》ProtocolListenerWrapper ==》RegistryProtocol
  • RegistryProtocol 到 DubboProtocol
    Protocol$Adaptive ==》ProtocolFilterWrapper ==》ProtocolListenerWrapper ==》DubboProtocol

最后重点关注下 NettyServer 的 doOpen() 方法:

protected void doOpen() throws Throwable {
	NettyHelper.setNettyLoggerFactory();
	ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
	ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
	ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
	bootstrap = new ServerBootstrap(channelFactory);

	final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
	channels = nettyHandler.getChannels();
	// https://issues.jboss.org/browse/NETTY-365
	// https://issues.jboss.org/browse/NETTY-379
	// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
	bootstrap.setOption("child.tcpNoDelay", true);
	bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
		@Override
		public ChannelPipeline getPipeline() {
			NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
			ChannelPipeline pipeline = Channels.pipeline();
			/*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/
			pipeline.addLast("decoder", adapter.getDecoder());
			pipeline.addLast("encoder", adapter.getEncoder());
			pipeline.addLast("handler", nettyHandler);
			return pipeline;
		}
	});
	// bind
	channel = bootstrap.bind(getBindAddress());
}
复制代码

能够看到这段代码是比较经典的 Netty 服务端启动代码...也就是说 openServer() 方法用于 Netty 服务端启动。 咱们知道 Netty 经常使用于客户端和服务端之间的通信。在这里咱们开启了服务端,那么在何处会开启对应的客户端呢?他们之间到底会进行什么交互呢?这个疑问咱们先留着待后续文章讲解。

服务的暴露

上面讲了这么多,感受仍是和服务远程暴露没有沾多大的边?到底咱们的服务是如何被其它机器感知的?别人是怎么知道咱们某某台机器提供了短信服务的?其实揭秘的序幕已经拉开了!让咱们继续娓娓道来! 回顾一下 RegistryProtocol.export() 方法:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
	//export invoker
	final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

	URL registryUrl = getRegistryUrl(originInvoker);

	//registry provider
	final Registry registry = getRegistry(originInvoker);
	final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);

	//to judge to delay publish whether or not
	boolean register = registeredProviderUrl.getParameter("register", true);

	ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);

	if (register) {
		register(registryUrl, registeredProviderUrl);
		ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
	}

	// Subscribe the override data
	// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
	final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
	final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
	overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
	registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
	//Ensure that a new exporter instance is returned every time export
	return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
复制代码

上面咱们已经聊完了 doLocalExport() 方法,继续看 export() 方法的后半部分:

RegistryProtocol.java
final Registry registry = getRegistry(originInvoker);

private Registry getRegistry(final Invoker<?> originInvoker) {
	URL registryUrl = getRegistryUrl(originInvoker);
	return registryFactory.getRegistry(registryUrl);
}
复制代码

这里的 registryFactory 为 RegistryFactory$Adaptive(Dubbo 源码中充斥了大量 SPI 扩展机制的使用,这里再也不赘述)。总之咱们获取到的扩展类为 ZookeeperRegistryFactory ,ZookeeperRegistryFactory 继承自 AbstractRegistryFactory 类。所以最后调用的是 AbstractRegistryFactory 类的 getRegistry() 方法。

@Override
public Registry getRegistry(URL url) {
	url = url.setPath(RegistryService.class.getName())
			.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
			.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
	String key = url.toServiceStringWithoutResolving();
	// Lock the registry access process to ensure a single instance of the registry
	LOCK.lock();
	try {
		Registry registry = REGISTRIES.get(key);
		if (registry != null) {
			return registry;
		}
		registry = createRegistry(url);
		if (registry == null) {
			throw new IllegalStateException("Can not create registry " + url);
		}
		REGISTRIES.put(key, registry);
		return registry;
	} finally {
		// Release the lock
		LOCK.unlock();
	}
}
复制代码

方法比较简单,直接看重点方法 createRegistry(url)。createRegistry() 是一个抽象方法,会根据 url 来调用具体的实现方法,这里咱们用 ZookeeperRegistryFactory 类进行分析。

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
	...
	public Registry createRegistry(URL url) {
		return new ZookeeperRegistry(url, zookeeperTransporter);
	}
	...
}

public class ZookeeperRegistry extends FailbackRegistry {
	...
	public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
		super(url);
		if (url.isAnyHost()) {
			throw new IllegalStateException("registry address == null");
		}
		String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
		if (!group.startsWith(Constants.PATH_SEPARATOR)) {
			group = Constants.PATH_SEPARATOR + group;
		}
		this.root = group;
		zkClient = zookeeperTransporter.connect(url);
		zkClient.addStateListener(new StateListener() {
			@Override
			public void stateChanged(int state) {
				if (state == RECONNECTED) {
					try {
						recover();
					} catch (Exception e) {
						logger.error(e.getMessage(), e);
					}
				}
			}
		});
	}
	...
}
复制代码

ZookeeperRegistryFactory 类的 createRegistry() 方法会调用 ZookeeperRegistry 类的构造方法新建 ZookeeperRegistry 实例并返回。而 ZookeeperRegistry 类的构造方法会先调用父类 FailbackRegistry 的构造方法再执行后续操做。先看 FailbackRegistry 构造方法:

public abstract class FailbackRegistry extends AbstractRegistry {
	...
	public FailbackRegistry(URL url) {
        super(url);
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
					// 延迟重试
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }
	...
}
复制代码

在 FailbackRegistry 构造方法中有一个延迟重试方法 retry(),若是发现失败集合 failedRegistered、failedUnregistered、failedSubscribed、failedUnsubscribed、failedNotified 不为空,会进行重试操做。 继续看 ZookeeperRegistry 类的构造方法:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
	...
	zkClient = zookeeperTransporter.connect(url);
	zkClient.addStateListener(new StateListener() {
		@Override
		public void stateChanged(int state) {
			if (state == RECONNECTED) {
				try {
					recover();
				} catch (Exception e) {
					logger.error(e.getMessage(), e);
				}
			}
		}
	});
}
复制代码

这里的 ZookeeperTransporter.connect() 通过 SPI 转换实际调用为 CuratorZookeeperTransporter.connect()。

public class CuratorZookeeperTransporter implements ZookeeperTransporter {
    @Override
    public ZookeeperClient connect(URL url) {
        return new CuratorZookeeperClient(url);
    }
}

public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
    private final CuratorFramework client;

    public CuratorZookeeperClient(URL url) {
        super(url);
        try {
            int timeout = url.getParameter(Constants.TIMEOUT_KEY, 5000);
            CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                    .connectString(url.getBackupAddress())
                    .retryPolicy(new RetryNTimes(1, 1000))
                    .connectionTimeoutMs(timeout);
            String authority = url.getAuthority();
            if (authority != null && authority.length() > 0) {
                builder = builder.authorization("digest", authority.getBytes());
            }
            client = builder.build();
            client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework client, ConnectionState state) {
                    if (state == ConnectionState.LOST) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                    } else if (state == ConnectionState.CONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                    } else if (state == ConnectionState.RECONNECTED) {
                        CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                    }
                }
            });
            client.start();
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}
复制代码

上面这段代码使用 CuratorFrameworkFactory 工厂类建立了一个 CuratorFramework 实例,并启动该实例建立了一个与 zookeeper 的链接。

再回到 RegistryProtocol 中的 getRegistry() 方法。咱们发现它经过层层调用最终建立了一个到 ZookeeperRegistry 实例。这个实例中的 ziClient 对象创建了到 zookeeper 的链接。 咱们知道 ZooKeeper 常常被用做注册中心Ok。那咱们如今已经链接上了 ZooKeeper 了,是否是该往 Zookeeper 上写点啥了?继续往下看,好戏要来啦!!~

register() 注册方法

RegistryProtocol.java
	public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
		...
		if (register) {
			register(registryUrl, registeredProviderUrl);
			ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
		}
		...
	}
    public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }
复制代码

在这里 register() 方法最终会调用 FailbackRegistry 类的 register() 方法(不想再赘述为何!!!!)。

public abstract class FailbackRegistry extends AbstractRegistry {
	...
	public void register(URL url) {
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            doRegister(url);
        } catch (Exception e) {
            // ...
        }
    }
	...
}

public class ZookeeperRegistry extends FailbackRegistry {
	protected void doRegister(URL url) {
        try {
            String str = toUrlPath(url);
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}
复制代码

划重点啊筒子们!!! doRegister() 方法!!这里调用链也比较长。画个简图总结下:

小结:总之最后的目的是在 ZooKeeper 上建立经过 url 解析生成的 path 节点。大概长这个样子:dubbo%3A%2F%2F10.137.32.54%3A20880%2Forg.apache.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dorg.apache.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D4264%26side%3Dprovider%26timestamp%3D1546848704035
最后还有一个地方须要注意下:这里调用 zkClient.create() 方法时,若是 dynamic 为空,默认会建立 zookeeper 临时节点。临时节点的好处在于若是客户端和 zookeeper 集群断开链接,对应的临时节点则会自动被删除。这样一来,是否是对咱们的调用方好处多多呢?

End

碍于篇幅限制,今天就先介绍这么多。回顾一下,咱们在 RegistryProtocol.export() 方法里面建立了一个 DubboExporter 对象、开启了 Netty 服务端,同时还往注册中心 zookeeper 上建立了一个和服务有关的临时节点!关于 RegistryProtocol.export() 方法剩余的内容,咱们之后有机会再说吧!

本BLOG上原创文章未经本人许可,不得用于商业用途及传统媒体。网络媒体转载请注明出处,不然属于侵权行为。