zookeeper实现配置中心


最近做新系统,搞了个配置中心。本地用ehcache,配置最终存储在zookeeper上。系统启动连接zk,检查指定节点目录是否存在。若不存在,则创建。随后设置监听。节点内容变化则触发监听程序,更新数据到本地缓存。读数据时本地缓存不命中,则读取zk对应路径节点内容到本地。

相关代码示例如下:


1.连接zk逻辑

private static ZooKeeper getInstance(){
    if (zooKeeper == null) {
        try {
            if (INSTANCE_INIT_LOCK.tryLock(2, TimeUnit.SECONDS)) {
                try {
                    zooKeeper = new ZooKeeper(Environment.ZK_ADDRESS, 20000, null);
                    // init cfg root path  ConfZkClient.createWithParent(Environment.CONF_DATA_PATH);
                    zooKeeper.register(new Watcher() {
                        @Override
                        public void process(WatchedEvent watchedEvent) {
                            try {
                                logger.info("confcenter: watcher:{}", watchedEvent);

                                // session expire, close old and create new  if (watchedEvent.getState() == Event.KeeperState.Expired) {
                                    zooKeeper.close();
                                    zooKeeper = null;
                                    getInstance();
                                }

                                String path = watchedEvent.getPath();
                                String key = pathToKey(path);
                                if (key != null) {
                                    // add One-time trigger  zooKeeper.exists(path, true);
                                    if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
                                        ConfClient.remove(key);
                                    } else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
                                        String data = getPathDataByKey(key);
                                        ConfClient.update(key, data);
                                    }
                                }
                            } catch (KeeperException e) {
                                logger.error("confcenter KeeperException:"+ e.getMessage(), e);
                            } catch (InterruptedException e) {
                                logger.error("confcenter InterruptedException:" +e.getMessage(), e);
                            }
                        }
                    });

                } finally {
                    INSTANCE_INIT_LOCK.unlock();
                }
            }
        } catch (InterruptedException e) {
           logger.error("confcenter init InterruptedException!" + e.getMessage(), e);
        } catch (IOException e) {
           logger.error("confcenter init IOException!" + e.getMessage(), e);
        }
    }
    if (zooKeeper == null) {
        throw new NullPointerException("confcenter ConfZkClient.zooKeeper is null.");
    }
    return zooKeeper;
}


2.重新初始化置加载逻辑

public class ConfPropertyPlaceholderConfigurer extends PropertyPlaceholderConfigurer {

    private static Logger log = LoggerFactory.getLogger(ConfPropertyPlaceholderConfigurer.class);

    private String beanName;

    private BeanFactory beanFactory;

    /**  * @param beanFactoryToProcess  * @param props  * @throws BeansException  */  @Override
    protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props) throws BeansException {

        // 初始化StringValueResolver  StringValueResolver valueResolver = new StringValueResolver() {
            String placeholderPrefix = "${";
            String placeholderSuffix = "}";

            @Override
            public String resolveStringValue(String strVal) {
                StringBuffer buf = new StringBuffer(strVal);
                // 如果value值匹配'${***}'  boolean start = strVal.startsWith(placeholderPrefix);
                boolean end = strVal.endsWith(placeholderSuffix);
                while (start && end) {
                    // 替换value值,${***} -> ***,例如${default.key01} -> default.key01  String key = buf.substring(placeholderPrefix.length(), buf.length() - placeholderSuffix.length());
                    // 获取value,从properties、cache、zookeeper  String zkValue = ConfClient.get(key, "");

                    buf = new StringBuffer(zkValue);
                    start = buf.toString().startsWith(placeholderPrefix);
                    end = buf.toString().endsWith(placeholderSuffix);
                }
                return buf.toString();
            }
        };

        // init bean define visitor  BeanDefinitionVisitor visitor = new BeanDefinitionVisitor(valueResolver);

        // 获取所有的bean定义,替换占位符  String[] beanNames = beanFactoryToProcess.getBeanDefinitionNames();
        if (beanNames != null && beanNames.length > 0) {
            for (String beanName : beanNames) {
                if (!(beanName.equals(this.beanName) && beanFactoryToProcess.equals(this.beanFactory))) {
                    BeanDefinition bd = beanFactoryToProcess.getBeanDefinition(beanName);
                    visitor.visitBeanDefinition(bd);
                }
            }
        }

    }


3.EhCache逻辑

public class ConfClient {

    private static Logger logger = LoggerFactory.getLogger(ConfClient.class);

    public static Properties localProp = PropertiesUtil.loadProperties("confcenter.properties");

    private static Cache cache;

    static {
        // default use ehcche.xml under src  CacheManager manager = CacheManager.create();
        /**  * name:缓存名称  * maxElementsInMemory:缓存最大个数  * overflowToDisk:当内存中对象数量达到maxElementsInMemory时,ehcache将会对象写到磁盘中。  * eternal:对象是否永久有效,一但设置了,timeout将不起作用。  * timeToLiveSeconds:设置对象在失效前允许存活时间(单位:秒)。最大时间介于创建时间和失效时间之间。仅当eternal=false对象不是永久有效时使用,默认是0.,也就是对象存活时间无穷大  * timeToIdleSeconds:设置对象在失效前的允许闲置时间(单位:秒)。仅当eternal=false对象不是永久有效时使用,可选属性,默认值是0,也就是可闲置时间无穷大。  * */  cache = new Cache(Environment.CONF_DATA_PATH, 10000, false, true, 1800, 1800);
        manager.addCache(cache);
    }

    /**  * 初始化配置  *  * @param key  * @param value  */  public static void set(String key, String value) {
        if (cache != null) {
            logger.info("初始化配置: [{}:{}]", new Object[]{key, value});
            cache.put(new Element(key, value));
        }
    }

    /**  * 更新配置  *  * @param key  * @param value  */  public static void update(String key, String value) {
        if (cache != null) {
            if (cache.get(key)!=null) {
                logger.info("更新配置: [{}:{}]", new Object[]{key, value});
                cache.put(new Element(key, value));
            }
        }
    }

    /**  * 获取配置  *  * @param key  * @param defaultVal  * @return  */  public static String get(String key, String defaultVal) {
        if (localProp != null && localProp.containsKey(key)) {
            return localProp.getProperty(key);
        }
        if (cache != null) {
            Element element = cache.get(key);
            if (element != null) {
                return (String) element.getObjectValue();
            }
        }
        String zkData = ConfZkClient.getPathDataByKey(key);
        if (zkData != null) {
            set(key, zkData);
            return zkData;
        }

        return defaultVal;
    }

    /**  * 删除配置  *  * @param key  * @return  */  public static boolean remove(String key) {
        if (cache != null) {
            logger.info("删除配置:key ", key);
            return cache.remove(key);
        }
        return false;
    }

}