Apache Dubbo 2.7孵化版整理

分析Dubbo固然要从注册开始,2.7的注册加入了很是多的方式,已经不限于Zookeeper.java

基本上如今主流的注册模式都有了。apache

这种一看就是模版方法模式,其中Registry接口继承与两个接口Node,RegistryService,它本身自己没有任何方法。缓存

/**
 * Node. (API/SPI, Prototype, ThreadSafe)
 */
public interface Node {

    /**
     * get url.
     *
     * @return url.
     */
    URL getUrl();

    /**
     * is available.
     *
     * @return available.
     */
    boolean isAvailable();

    /**
     * destroy.
     */
    void destroy();

}
/**
 * RegistryService. (SPI, Prototype, ThreadSafe)
 *
 * @see org.apache.dubbo.registry.Registry
 * @see org.apache.dubbo.registry.RegistryFactory#getRegistry(URL)
 */
public interface RegistryService {

    /**
     * 注册
     */
    void register(URL url);

    /**
     * 反注册
     */
    void unregister(URL url);

    /**
     * 订阅
     */
    void subscribe(URL url, NotifyListener listener);

    /**
     * 取消订阅
     */
    void unsubscribe(URL url, NotifyListener listener);

    /**
     * 查找
     */
    List<URL> lookup(URL url);

}

AbstractRegistry是一个抽象类,实现了Registry接口的注册,订阅,查找等方法,咱们来看一下注册方法。安全

private final Set<URL> registered = new ConcurrentHashSet<>();

这个URL是Dubbo本身封装的一个类,其中包含了例如协议、用户名、密码、主机、端口号、路径、参数等的属性。多线程

@Override
public void register(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("register url == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Register: " + url);
    }
    registered.add(url);
}

这个是Apache孵化后的写法。意思就是把提供者或者消费者各自的资源放到该集合中,此处是一个线程安全的集合。dom

取消注册异步

@Override
public void unregister(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("unregister url == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Unregister: " + url);
    }
    registered.remove(url);
}

订阅ide

private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();
public interface NotifyListener {

    /**
     * 给一组URL发通知
     */
    void notify(List<URL> urls);

}

NotifyListener是一个专门用监听通知的接口,它的做用就是专门用来发通知的,至于怎么发通知须要到Registry最终实现里面去具体实现。this

此处能够看到订阅对象是一个URL对应一组监听器url

@Override
public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: " + url);
    }
    //若是订阅对象中没有该url的键,则建立该键对应的集合,集合添加监听器listener
    Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
    listeners.add(listener);
}

取消订阅

@Override
public void unsubscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("unsubscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("unsubscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Unsubscribe: " + url);
    }
    //找出订阅到全部监听器,在这些监听器集合中删除对应多监听器
    Set<NotifyListener> listeners = subscribed.get(url);
    if (listeners != null) {
        listeners.remove(listener);
    }
}

查找

private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();

第一个URL为消费者的URL,Map的String为类型,基本为生产者,具体包含providers、consumers、routes、configurators,List<URL>为消费者订阅生产者的URL集合

lookup即为查找一个消费者订阅了哪些生产者URL

@Override
public List<URL> lookup(URL url) {
    List<URL> result = new ArrayList<>();
    //获取几组已通知的生产者url,根据String类型的不一样,可能会有几组url
    Map<String, List<URL>> notifiedUrls = getNotified().get(url);
    //若是存在这么几组已通知的生产者url
    if (notifiedUrls != null && notifiedUrls.size() > 0) {
        //遍历全部的组,获取每一组的url集合
        for (List<URL> urls : notifiedUrls.values()) {
            //遍历集合每个url
            for (URL u : urls) {
                if (!EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    //将全部协议不为空的url添加到结果集合中
                    result.add(u);
                }
            }
        }
    } else {
        //若是不存在任何已通知的url
        final AtomicReference<List<URL>> reference = new AtomicReference<>();
        //这段代码的意思等同与new NotifyListener() {
        //    void notify(List<URL> urls) {
        //        reference.set(urls);
        //    }
        //}
        //这里就是实现了一个匿名类订阅监听器对象
        NotifyListener listener = reference::set;
        //将该匿名类订阅监听器对象放入订阅对象中,这里是订阅了,有没有通知不知道,看多线程的
        subscribe(url, listener); // Subscribe logic guarantees the first notify to return
        //若是有通知的url就取出
        List<URL> urls = reference.get();
        if (CollectionUtils.isNotEmpty(urls)) {
            for (URL u : urls) {
                if (!EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    //遍历全部有通知的url,且协议不为空协议的,添加到结果集
                    result.add(u);
                }
            }
        }
    }
    return result;
}
public Map<URL, Map<String, List<URL>>> getNotified() {
    //返回一个不可变的映射,该映射不可修改
    return Collections.unmodifiableMap(notified);
}

总的来讲notified是一个内存缓存,用来保存获取的服务提供者,不用每次远程调用都要先从注册中心获取一次可调用的服务列表,对于没有服务提供者提供服务的URL,它会以特殊的empty://前缀开头。固然它还有一个磁盘文件服务缓存以及一个Properties,Properties本质是一个HashTable,线程安全的Map.

private File file;
private final Properties properties = new Properties();

咱们来看一下从硬盘加载资源到内存中的过程

private void loadProperties() {
    if (file != null && file.exists()) {
        InputStream in = null;
        try {
            in = new FileInputStream(file);
            //从硬盘中读取文件的内容,保存到properties中
            properties.load(in);
            if (logger.isInfoEnabled()) {
                logger.info("Load registry cache file " + file + ", data: " + properties);
            }
        } catch (Throwable e) {
            logger.warn("Failed to load registry cache file " + file, e);
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
    }
}

properties保存了全部服务提供者的URL,使用URL#serviceKey()做为key,提供者列表、路由规则列表、配置规则列表等作为value。还有一个key.registies,保存全部的注册中心的地址。若是应用在启动过程当中,注册中心没法链接或宕机,Dubbo会自动经过本地缓存加载Invokers(调用程序).

有从硬盘文件导入,固然就会有保存。保存资源到文件的主入口为doSaveProperties().

private final AtomicLong lastCacheChanged = new AtomicLong(); //版本号,保证数据是最新的
private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger(); //保存文件的重试次数
private static final int MAX_RETRY_TIMES_SAVE_PROPERTIES = 3; //重试的最大次数
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); //用于异步保存文件的线程池,它只有1个线程
public void doSaveProperties(long version) {
    //当前版本号必须大于等于已保存版本号,绝对不能小于
    if (version < lastCacheChanged.get()) {
        return;
    }
    if (file == null) {
        return;
    }
    // Save
    try {
        File lockfile = new File(file.getAbsolutePath() + ".lock");
        if (!lockfile.exists()) {
            lockfile.createNewFile();
        }
        try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
             FileChannel channel = raf.getChannel()) {
            FileLock lock = channel.tryLock();
            //当多线程操做文件时,咱们须要得到一个文件独占锁
            if (lock == null) {
                throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
            }
            //将properties的内容写入到文件中
            try {
                if (!file.exists()) {
                    file.createNewFile();
                }
                try (FileOutputStream outputFile = new FileOutputStream(file)) {
                    properties.store(outputFile, "Dubbo Registry Cache");
                }
            } finally {
                lock.release();
            }
        }
    } catch (Throwable e) {
        //当其余线程拿不到文件锁到时候,将重试次数加1,此处为原子竞争,不一样的线程不能同时加
        savePropertiesRetryTimes.incrementAndGet();
        //若是重试次数达到最大,将其从新置0
        if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) {
            logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e);
            savePropertiesRetryTimes.set(0);
            return;
        }
        //若是没有达到重试次数上限,继续判断版本号是否大于已保存版本号
        if (version < lastCacheChanged.get()) {
            savePropertiesRetryTimes.set(0);
            return;
        } else { //若是当前版本号大于已保存版本号调用其余线程来从新执行一次doSaveProperties方法原子竞争到版本号加1的线程能够执行
            registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
        }
        logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e);
    }
}
private class SaveProperties implements Runnable {
    private long version;

    private SaveProperties(long version) {
        this.version = version;
    }

    @Override
    public void run() {
        doSaveProperties(version);
    }
}

还有一些通知方法notify()是在客户端第一次订阅获取全量数据后,后续因为订阅获得新数据时,都会调用该方法进行保存。

相关文章
相关标签/搜索