文章有点长,亲,要慢慢看!html
整个registry下的模块 java
api是注册中心全部的API和抽象类实现正则表达式
default是注册中心的内存实现redis
zookeeper、redis、nacos就是基于不一样的组件的实现apache
multicast是经过广播实现api
这张图相信只要是用过的都不陌生,挂在dubbo.io的官网挂了好久好久了。那么这个流程主要是说了什么呢?缓存
api层主要是注册中心全部API的抽象实现类,并非实际提供服务的组件。bash
模块关系图 服务器
类关系图 网络
目录结构
public interface RegistryService {
/** * 注册服务. * @param url 注册信息,不容许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin */
void register(URL url);
/** * 取消注册服务. * @param url 注册信息,不容许为空,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin */
void unregister(URL url);
/** * 订阅服务. * @param listener 变动事件监听器,不容许为空 */
void subscribe(URL url, NotifyListener listener);
/** * 取消订阅服务. * @param url 订阅条件,不容许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin * @param listener 变动事件监听器,不容许为空 */
void unsubscribe(URL url, NotifyListener listener);
/** * 查询注册列表,与订阅的推模式相对应,这里为拉模式,只返回一次结果。 * * @see org.apache.dubbo.registry.NotifyListener#notify(List) * @param url 查询条件,不容许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin * @return 已注册信息列表,可能为空,含义同{@link org.apache.dubbo.registry.NotifyListener#notify(List<URL>)}的参数。 */
List<URL> lookup(URL url);
}
复制代码
public interface Node {
/**
* 获取节点Url
*/
URL getUrl();
/**
* 是否可用
*/
boolean isAvailable();
/**
* 销毁节点
*/
void destroy();
}
复制代码
public interface Registry extends Node, RegistryService {
}
复制代码
// url地址分隔符,用于文件缓存,服务提供程序url分隔
private static final char URL_SEPARATOR = ' ';
// URL地址分隔的正则表达式,用于分析文件缓存中的服务提供程序URL列表
private static final String URL_SPLIT = "\\s+";
// 日志输出
protected final Logger logger = LoggerFactory.getLogger(getClass());
// 本地磁盘缓存,其中的特殊key为registies是记录注册表中心列表,其余是服务提供者的李彪
private final Properties properties = new Properties();
// 文件缓存写入执行器 提供一个线程的线程池
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
// 是否同步保存文件标志
private final boolean syncSaveFile;
// 这个是缓存的版本号
private final AtomicLong lastCacheChanged = new AtomicLong();
// 这个是已经注册的URL集合,不只仅是服务提供者的,也能够是服务消费者的
private final Set<URL> registered = new ConcurrentHashSet<URL>();
// 已订阅的url 值为url的监听器集合
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 消费者或服务治理服务获取注册信息后的缓存对象
// 内存中服务器缓存的notified对象是ConcurrentHashMap里面嵌套了一个Map,
// 外层Map的Key是消费者的URL,
// 内层的Map的key是分类,包括provider,consumer,routes,configurators四种,
// value则对应服务列表,没有服务提供者提供服务的URL,会以一个特别的empty://前缀开头
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
// 注册中心的URL
private URL registryUrl;
// 本地磁盘缓存文件保存的是注册中心的数据
private File file;
复制代码
public AbstractRegistry(URL url) {
// 设置注册中心的地址URL
setUrl(url);
// 从URL参数中获取是否同步保存的状态,URL中若是不包含,那就设置默认值为false
syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
// 获取文件路径
String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
// 开始读入文件
File file = null;
// 不存在就抛出异常
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
// 把文件对象放到属性上
this.file = file;
// 加载文件中的参数放入Properties,Properties继承HashTable。
loadProperties();
// 通知监听器 URL变化 见下面notify的源码
notify(url.getBackupUrls());
}
复制代码
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
try {
// 把文件中的key-value读进来
in = new FileInputStream(file);
// Properties是一个继承HashTable的类.
// 这个地方就是按行读入,util里面的类,里面调用了一个load0 方法会把key和value作分割而后放入Properties中,。
properties.load(in);
if (logger.isInfoEnabled()) {
logger.info("Load registry store file " + file + ", data: " + properties);
}
} catch (Throwable e) {
logger.warn("Failed to load registry store file " + file, e);
} finally {
// 关闭流
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
}
复制代码
@Override
public List<URL> lookup(URL url) {
// 查找的结果数据
List<URL> result = new ArrayList<URL>();
// 获取注册信息中的分类服务列表信息
Map<String, List<URL>> notifiedUrls = getNotified().get(url);
// 若是该消费者订阅了服务
if (notifiedUrls != null && notifiedUrls.size() > 0) {
for (List<URL> urls : notifiedUrls.values()) {
for (URL u : urls) {
// 把非空的加入结果集中
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
} else {
// 若是没有订阅服务
// 使用原子类以保证在获取注册在注册中心的服务url时可以保证是最新的url集合
final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
// 通知监听器。当收到服务变动通知时触发
NotifyListener listener = new NotifyListener() {
@Override
public void notify(List<URL> urls) {
reference.set(urls);
}
};
// 添加这个服务的监听器
subscribe(url, listener); // Subscribe logic guarantees the first notify to return
List<URL> urls = reference.get();
// 而后把非空结果放入结果集中
if (urls != null && !urls.isEmpty()) {
for (URL u : urls) {
if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
result.add(u);
}
}
}
}
return result;
}
复制代码
@Override
public void register(URL url) {
if (url == null) {
throw new IllegalArgumentException("register url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Register: " + url);
}
registered.add(url);
}
@Override
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " + url);
}
registered.remove(url);
}
复制代码
protected void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) return;
// 遍历已订阅的URL
for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
URL url = entry.getKey();
if (!UrlUtils.isMatch(url, urls.get(0))) {
continue;
}
// 通知URL对应的监听器
Set<NotifyListener> listeners = entry.getValue();
if (listeners != null) {
for (NotifyListener listener : listeners) {
try {
// 通知监听器,看下方代码注释
notify(url, listener, filterEmpty(url, urls));
} catch (Throwable t) {
logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
}
}
}
}
}
复制代码
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.isEmpty())
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
// 将url进行分类
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
// 根据不一样的category分别放到不一样List中处理 以category的值作分类
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}
// 没有分类结果就直接return
if (result.size() == 0) {
return;
}
// 得到消费者被通知的url的Map
Map<String, List<URL>> categoryNotified = notified.get(url);
// 若是没有 就建立一个
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
// 建立事后再获取
categoryNotified = notified.get(url);
}
// 发送URL变化给监听器
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
// 把分类标实和分类后的列表放入notified的value中 覆盖到 `notified`
// 当分类的数据为空,依然有urls 。不过其中的urls[0].protocol是empty,以此来处理全部服务提供者为空时的状况。
categoryNotified.put(category, categoryList);
// 保存一份到文件缓存中 中间作的 就是解析出参数而后同步或者异步保存到文件中
saveProperties(url);
// 通知监听器
listener.notify(categoryList);
}
}
复制代码
@Override
public void subscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("subscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("subscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Subscribe: " + url);
}
// 得到url已经订阅的服务的监听器集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners == null) {
subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = subscribed.get(url);
}
// 而后把listener添加到上
listeners.add(listener);
}
@Override
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " + url);
}
// 得到url已经订阅的服务的监听器集合
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
// 而后移除
listeners.remove(listener);
}
}
复制代码
protected void recover() throws Exception {
// register
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
//调用的上面的注册方法
register(url);
}
}
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
// 调用上面的订阅方法
subscribe(url, listener);
}
}
}
}
复制代码
@Override
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
// 获取以注册的URL
Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<URL>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
// 取消注册
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
// 获取已订阅的URL以及监听器
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
// 去取消订阅
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
}
复制代码
// Scheduled executor service
// 通过固定时间后(默认是5s),调用FailbackRegistry#retry方法
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
// 失败重试计时器,按期检查是否有失败请求,若是有,则无限制重试
private final ScheduledFuture<?> retryFuture;
// 注册失败的集合
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
// 取消注册失败的集合
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
// 发起订阅失败的监听器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 取消订阅失败的监听器集合
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
// 通知失败的URL集合
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
/** * The time in milliseconds the retryExecutor will wait * RetryExecutor将等待的时间(毫秒) */
private final int retryPeriod;
复制代码
public FailbackRegistry(URL url) {
super(url);
// 获取重试的时间 若是没有就设置成默认的 DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;
this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
// 设置重试任务 里面就是调用retry方法 见下方retry方法的解析
this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
复制代码
@Override
public void register(URL url) {
// 缓存等注册操做 见AbstractRegistry
super.register(url);
// 在失败集合中将这个移除
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
// 向服务器发送注册请求
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// 开启了启动时就检测,直接抛异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
// 记录失败的url
failedRegistered.add(url);
}
}
复制代码
@Override
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list, retry regularly
// 将失败的注册请求记录到失败列表,按期重试
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
// 注意 这个是调用父类的
super.notify(url, listener, urls);
}
复制代码
@Override
protected void recover() throws Exception {
// register
// 把已注册的添加到失败重试的列表中
Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
if (!recoverRegistered.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover register url " + recoverRegistered);
}
for (URL url : recoverRegistered) {
// 添加到失败重试注册列表
failedRegistered.add(url);
}
}
// subscribe
// 把已订阅的添加到失败重试的列表中
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Recover subscribe url " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
// 添加到失败重试订阅列表
addFailedSubscribed(url, listener);
}
}
}
}
复制代码
// Retry the failed actions
protected void retry() {
if (!failedRegistered.isEmpty()) {
// 不为空就把他URL拿到
Set<URL> failed = new HashSet<URL>(failedRegistered);
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry register " + failed);
}
try {
// 而后遍历它 作对应的操做
for (URL url : failed) {
try {
// 作注册操做
doRegister(url);
// 移除失败集合中URL
failedRegistered.remove(url);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
......
}
复制代码
@Override
public void destroy() {
// 调用父类的方法
super.destroy();
try {
// 取消执行任务
retryFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
ExecutorUtil.gracefulShutdown(retryExecutor, retryPeriod);
}
复制代码
// ==== Template method ====
protected abstract void doRegister(URL url);
protected abstract void doUnregister(URL url);
protected abstract void doSubscribe(URL url, NotifyListener listener);
protected abstract void doUnsubscribe(URL url, NotifyListener listener);
复制代码
@SPI("dubbo")
public interface RegistryFactory {
// 这个接口方法实际上就是获取对注册中心的链接,而后返回不一样注册中心的不一样Regsitry的实现对象,
// 注解就是根据设置不一样的protocol(协议)来选择不一样的实现,
// 好比Zookeeper,就会去使用Zookeeper的ZookeeperRegistryFactory,具体怎么选择,后续博客再写
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
复制代码
// 注册中心获取过程的锁
private static final ReentrantLock LOCK = new ReentrantLock();
// 注册中心Map<注册地址,registry> 一个类的缓存。
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();
复制代码
/** * Get all registries * 获取全部的registry对象 * @return all registries */
public static Collection<Registry> getRegistries() {
//获得一个集合的镜像,它的返回结果不可直接被改变,不然会报错
return Collections.unmodifiableCollection(REGISTRIES.values());
}
复制代码
/** * Close all created registries * 关闭全部已建立的registry对象 */
// TODO: 2017/8/30 to move somewhere else better
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
//对注册中心关闭操做加锁
LOCK.lock();
try {
// 遍历全部的注册中心的操做类,而后调用destroy来销毁。
for (Registry registry : getRegistries()) {
try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
// 而后清除集合
REGISTRIES.clear();
} finally {
// Release the lock
LOCK.unlock();
}
}
复制代码
@Override
public Registry getRegistry(URL url) {
// 经过URL来获取到注册中心的类型
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceStringWithoutResolving();
// 锁定注册中心访问进程以确保注册表的单个实例
LOCK.lock();
try {
// 经过key来拿到对应的注册中心的操做类
Registry registry = REGISTRIES.get(key);
// 有就直接返回
if (registry != null) {
return registry;
}
// 没有就建立对应的注册中心操做类
registry = createRegistry(url);
// 若是建立失败,报错
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
// 建立成功就放到结合中
REGISTRIES.put(key, registry);
// 而后再返回
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
复制代码
protected abstract Registry createRegistry(URL url);
复制代码
// invoker对象
private Invoker<T> invoker;
// 原始的URL地址
private URL originUrl;
// 注册中心的地址
private URL registryUrl;
// 消费者的地址
private URL consumerUrl;
// 注册中心的Directory
private RegistryDirectory registryDirectory;
复制代码
// invoker对象
private Invoker<T> invoker;
// 原始的URL地址
private URL originUrl;
// 注册中心的地址
private URL registryUrl;
// 提供者的地址
private URL providerUrl;
// 是否注册
private volatile boolean isReg;
复制代码
// 服务提供者的Invokers集合
public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
// 服务消费者的Invokers集合
public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
复制代码
@Override
public Status check() {
// 获取全部的注册中心的对象
Collection<Registry> registries = AbstractRegistryFactory.getRegistries();
if (registries.isEmpty()) {
return new Status(Status.Level.UNKNOWN);
}
Status.Level level = Status.Level.OK;
StringBuilder buf = new StringBuilder();
// 遍历
for (Registry registry : registries) {
if (buf.length() > 0) {
buf.append(",");
}
// 把地址拼接到一块儿
buf.append(registry.getUrl().getAddress());
// 若是注册中心的某个节点不可用就把状态设置成error
if (!registry.isAvailable()) {
level = Status.Level.ERROR;
buf.append("(disconnected)");
} else {
buf.append("(connected)");
}
}
// 而后返回价差的结果对象
return new Status(level, buf.toString());
}
复制代码
不知道你们看到这里有没有忘记这张图
模块关系图
全部的注册中心实现FailbackRegistry 和 AbstractRegistryFactory来实现对应的功能。
那么Zookeeper也是如此。Zookeeper主要就只有两个类
在Dubbo框架启动时会根据咱们所写的服务相关的配置在注册中心建立4个目录,在providers和consumers目录中分别存储服务提供方、消费方元数据信息。包括:IP、端口、权重和应用名等数据。
目录名称 | 存储值样例 |
---|---|
/dubbo/service/providers | dubbo://192.168.1.1.20880/com.demo.DemoService?key=value&... |
/dubbo/service/consumers | dubbo://192.168.1.1.5002/com.demo.DemoService?key=value&... |
/dubbo/service/routers | condition://0.0.0.0/com.demo.DemoService?category=routers&key=value&... |
/dubbo/service/configurators | override://0.0.0.0/com.demo.DemoService?category=configurators&key=value&... |
<beans>
<!-- 适用于Zookeeper一个集群有多个节点,多个IP和端口用逗号分隔-->
<dubbo:registry protocol="zookeeper" address="ip:port;ip:port">
<!-- 适用于Zookeeper多个集群有多个节点,多个IP和端口用竖线分隔-->
<dubbo:registry protocol="zookeeper" address="ip:port|ip:port">
</beans>
复制代码
// Zookeeper的默认端口号
private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
// Dubbo在Zookeeper中注册的默认根节点
private final static String DEFAULT_ROOT = "dubbo";
// 组的名称 或者说是 根节点的值
private final String root;
// 服务集合
private final Set<String> anyServices = new ConcurrentHashSet<String>();
// zk节点的监听器
// Dubbo底层封装了2套Zookeeper API,因此经过ChildListener抽象了监听器,
// 可是在实际调用时会经过createTargetChildListener转为对应框架的监听器实现
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
// zk的客户端, 对节点进行一些删改等操做
private final ZookeeperClient zkClient;
复制代码
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
// 调用FailbackRegistry的构造方法
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获取组名称 并复制给root
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 链接上Zookeeper
zkClient = zookeeperTransporter.connect(url);
// 添加链接状态监听器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
// 重连恢复
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
复制代码
// 发布
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
// 取消发布
@Override
protected void doUnregister(URL url) {
try {
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
复制代码
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 订阅全部数据
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
// 为空则把listeners放入到缓存的Map中
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
// 建立子节点监听器,对root下的子节点作监听,一旦有子节点发生改变,
// 那么就对这个节点进行订阅.
if (zkListener == null) {
// zkListener为空说明是第一次拉取,则新建一个listener
listeners.putIfAbsent(listener, new ChildListener() {
// 节点变动时,触发通知时执行
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
// 遍历全部节点
child = URL.decode(child);
// 若是有子节点还未被订阅贼说明是新节点,
if (!anyServices.contains(child)) {
// 加入到集合中
anyServices.add(child);
//就订阅之
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
// 建立持久节点root,接下来订阅持久节点的子节点
zkClient.create(root, false);
// 添加root节点的子节点监听器,并返回当前的services
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
// 遍历全部的子节点进行订阅
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
// 增长当前节点的订阅,而且会返回改节点下全部子节点的列表
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
// 订阅类别服务
} else {
List<URL> urls = new ArrayList<URL>();
// 将url转变成
// /dubbo/com.demo.DemoService/providers
// /dubbo/com.demo.DemoService/configurators
// /dubbo/com.demo.DemoService/routers
// 根据url类别获取一组要订阅的路径
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
// 若是缓存没有,则添加到缓存中
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
// 一样若是监听器缓存中没有 则放入缓存
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// 通知节点变化
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
// 订阅并返回该节点下的子路径并缓存
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// 有子节点组装,没有那么就将消费者的协议变成empty做为url。
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 回调NotifyListener,更新本地缓存信息
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
复制代码
@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
// 经过url把监听器所有拿到
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
// 直接删除group下全部的
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
// 移除监听器
zkClient.removeChildListener(root, zkListener);
} else {
// 移除类别服务下的监听器
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
复制代码
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
复制代码
@SPI("curator")
public interface ZookeeperTransporter {
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);
}
复制代码
上面我提到过,Dubbo用Zookeeper的时候用了两种方式实现,一个是Apache Curator,另外一个是zkClient,这个类就是作看了一个转换。以下图
两个类都实现了该接口来向外提供统一的ZookeeperClient。
这个实如今remoting模块。暂时就不讲了。