分析Dubbo固然要从注册开始,2.7的注册加入了很是多的方式,已经不限于Zookeeper.java
基本上如今主流的注册模式都有了。apache
这种一看就是模版方法模式,其中Registry接口继承与两个接口Node,RegistryService,它本身自己没有任何方法。缓存
/** * Node. (API/SPI, Prototype, ThreadSafe) */ public interface Node { /** * get url. * * @return url. */ URL getUrl(); /** * is available. * * @return available. */ boolean isAvailable(); /** * destroy. */ void destroy(); }
/** * RegistryService. (SPI, Prototype, ThreadSafe) * * @see org.apache.dubbo.registry.Registry * @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL) */ public interface RegistryService { /** * 注册 */ void register(URL url); /** * 反注册 */ void unregister(URL url); /** * 订阅 */ void subscribe(URL url, NotifyListener listener); /** * 取消订阅 */ void unsubscribe(URL url, NotifyListener listener); /** * 查找 */ List<URL> lookup(URL url); }
AbstractRegistry是一个抽象类,实现了Registry接口的注册,订阅,查找等方法,咱们来看一下注册方法。安全
private final Set<URL> registered = new ConcurrentHashSet<>();
这个URL是Dubbo本身封装的一个类,其中包含了例如协议、用户名、密码、主机、端口号、路径、参数等的属性。多线程
@Override public void register(URL url) { if (url == null) { throw new IllegalArgumentException("register url == null"); } if (logger.isInfoEnabled()) { logger.info("Register: " + url); } registered.add(url); }
这个是Apache孵化后的写法。意思就是把提供者或者消费者各自的资源放到该集合中,此处是一个线程安全的集合。dom
取消注册异步
@Override public void unregister(URL url) { if (url == null) { throw new IllegalArgumentException("unregister url == null"); } if (logger.isInfoEnabled()) { logger.info("Unregister: " + url); } registered.remove(url); }
订阅ide
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
public interface NotifyListener { /** * 给一组URL发通知 */ void notify(List<URL> urls); }
NotifyListener是一个专门用监听通知的接口,它的做用就是专门用来发通知的,至于怎么发通知须要到Registry最终实现里面去具体实现。this
此处能够看到订阅对象是一个URL对应一组监听器url
@Override public void subscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("subscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("subscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Subscribe: " + url); } //若是订阅对象中没有该url的键,则建立该键对应的集合,集合添加监听器listener Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>()); listeners.add(listener); }
取消订阅
@Override public void unsubscribe(URL url, NotifyListener listener) { if (url == null) { throw new IllegalArgumentException("unsubscribe url == null"); } if (listener == null) { throw new IllegalArgumentException("unsubscribe listener == null"); } if (logger.isInfoEnabled()) { logger.info("Unsubscribe: " + url); } //找出订阅到全部监听器,在这些监听器集合中删除对应多监听器 Set<NotifyListener> listeners = subscribed.get(url); if (listeners != null) { listeners.remove(listener); } }
查找
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();
第一个URL为消费者的URL,Map的String为类型,基本为生产者,具体包含providers、consumers、routes、configurators,List<URL>为消费者订阅生产者的URL集合
lookup即为查找一个消费者订阅了哪些生产者URL
@Override public List<URL> lookup(URL url) { List<URL> result = new ArrayList<>(); //获取几组已通知的生产者url,根据String类型的不一样,可能会有几组url Map<String, List<URL>> notifiedUrls = getNotified().get(url); //若是存在这么几组已通知的生产者url if (notifiedUrls != null && notifiedUrls.size() > 0) { //遍历全部的组,获取每一组的url集合 for (List<URL> urls : notifiedUrls.values()) { //遍历集合每个url for (URL u : urls) { if (!EMPTY_PROTOCOL.equals(u.getProtocol())) { //将全部协议不为空的url添加到结果集合中 result.add(u); } } } } else { //若是不存在任何已通知的url final AtomicReference<List<URL>> reference = new AtomicReference<>(); //这段代码的意思等同与new NotifyListener() { // void notify(List<URL> urls) { // reference.set(urls); // } //} //这里就是实现了一个匿名类订阅监听器对象 NotifyListener listener = reference::set; //将该匿名类订阅监听器对象放入订阅对象中,这里是订阅了,有没有通知不知道,看多线程的 subscribe(url, listener); // Subscribe logic guarantees the first notify to return //若是有通知的url就取出 List<URL> urls = reference.get(); if (CollectionUtils.isNotEmpty(urls)) { for (URL u : urls) { if (!EMPTY_PROTOCOL.equals(u.getProtocol())) { //遍历全部有通知的url,且协议不为空协议的,添加到结果集 result.add(u); } } } } return result; }
public Map<URL, Map<String, List<URL>>> getNotified() { //返回一个不可变的映射,该映射不可修改 return Collections.unmodifiableMap(notified); }
总的来讲notified是一个内存缓存,用来保存获取的服务提供者,不用每次远程调用都要先从注册中心获取一次可调用的服务列表,对于没有服务提供者提供服务的URL,它会以特殊的empty://前缀开头。固然它还有一个磁盘文件服务缓存以及一个Properties,Properties本质是一个HashTable,线程安全的Map.
private File file;
private final Properties properties = new Properties();
咱们来看一下从硬盘加载资源到内存中的过程
private void loadProperties() { if (file != null && file.exists()) { InputStream in = null; try { in = new FileInputStream(file); //从硬盘中读取文件的内容,保存到properties中 properties.load(in); if (logger.isInfoEnabled()) { logger.info("Load registry cache file " + file + ", data: " + properties); } } catch (Throwable e) { logger.warn("Failed to load registry cache file " + file, e); } finally { if (in != null) { try { in.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }
properties保存了全部服务提供者的URL,使用URL#serviceKey()做为key,提供者列表、路由规则列表、配置规则列表等作为value。还有一个key.registies,保存全部的注册中心的地址。若是应用在启动过程当中,注册中心没法链接或宕机,Dubbo会自动经过本地缓存加载Invokers(调用程序).
有从硬盘文件导入,固然就会有保存。保存资源到文件的主入口为doSaveProperties().
private final AtomicLong lastCacheChanged = new AtomicLong(); //版本号,保证数据是最新的
private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger(); //保存文件的重试次数
private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3; //重试的最大次数
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); //用于异步保存文件的线程池,它只有1个线程
public void doSaveProperties(long version) { //当前版本号必须大于等于已保存版本号,绝对不能小于 if (version < lastCacheChanged.get()) { return; } if (file == null) { return; } // Save try { File lockfile = new File(file.getAbsolutePath() + ".lock"); if (!lockfile.exists()) { lockfile.createNewFile(); } try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); FileChannel channel = raf.getChannel()) { FileLock lock = channel.tryLock(); //当多线程操做文件时,咱们须要得到一个文件独占锁 if (lock == null) { throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties"); } //将properties的内容写入到文件中 try { if (!file.exists()) { file.createNewFile(); } try (FileOutputStream outputFile = new FileOutputStream(file)) { properties.store(outputFile, "Dubbo Registry Cache"); } } finally { lock.release(); } } } catch (Throwable e) { //当其余线程拿不到文件锁到时候,将重试次数加1,此处为原子竞争,不一样的线程不能同时加 savePropertiesRetryTimes.incrementAndGet(); //若是重试次数达到最大,将其从新置0 if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) { logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e); savePropertiesRetryTimes.set(0); return; } //若是没有达到重试次数上限,继续判断版本号是否大于已保存版本号 if (version < lastCacheChanged.get()) { savePropertiesRetryTimes.set(0); return; } else { //若是当前版本号大于已保存版本号,调用其余线程来从新执行一次doSaveProperties方法,原子竞争到版本号加1的线程能够执行 registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); } logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e); } }
private class SaveProperties implements Runnable { private long version; private SaveProperties(long version) { this.version = version; } @Override public void run() { doSaveProperties(version); } }
还有一些通知方法notify()是在客户端第一次订阅获取全量数据后,后续因为订阅获得新数据时,都会调用该方法进行保存。