上一篇博客《基于ZooKeeper与zkclient的统一配置管理实现(一)》分享了基于ZooKeeper原生api实现的统一配置管理,本篇文章将经过使用zkclient封装后的api来再次实现该功能。java
实现的效果与上一篇文章相似,这里再也不赘述。node
系统仍然是由四个组件组成:api
集群或单机版的ZooKeeper服务端,主要用以存储IConfigPublisher发布的配置文件信息服务器
配置文件的发布器,负责将配置文件信息发布到ZooKeeperServer中去app
配置文件变动状况的订阅器,由客户端开启对服务器配置文件信息的订阅,当配置信息发生变动时,负责将本地的信息更新成最新的状态框架
配置文件更改器,通常由用户手动调用,用来更改配置文件的信息异步
启动ZooKeeper集群的方法同上一篇博客,这里再也不赘述,各位能够自行异步《基于ZooKeeper与zkclient的统一配置管理实现(一)》查看。ide
其中IConfigPublisher和IConfigSubscriber是接口,看一下两个接口的定义:测试
/** * Config files publisher * @author hwang * */ public interface IConfigPublisher { /** * publish config files under {@link configDir} to {@link configRootNode} of {@link zkServerHost} * @param zkServerHost * @param configRootNode * @param configDir */ public void publish(String zkServerHost,String configRootNode,String configDir); }
IConfigPublisher接口只有一个publish方法,主要的工做就是把configDir目录下的配置文件发布到zkServerHost的configRootNode节点中去。ui
/** * Subscribe Config files change * @author hwang * */ public interface IConfigSubscriber { /** * <p>Subscribe config files change event under rootNode {@link configRootNode} of {@link zkServerHost}</p> * <p>include the dataChange and childrenChange </p> * @param zkServerHost * @param configRootNode */ public void subscribe(String zkServerHost,String configRootNode); }
IConfigSubscriber接口也只有一个方法,主要的工做就是订阅ZkServerHost中的configRootNode节点的变化状况。
如今来看下这两个主要的核心接口的实现状况。
IConfigPublisher接口的实现类是ZkConfigPublisher,看一下ZkConfigPublisher是怎么实现publish方法的:
public class ZkConfigPublisher implements IConfigPublisher{ private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class); private ZkClient client; private String configRootNode; @Override public void publish(String zkServerHost,String configRootNode,String configDir){ try{ if(client==null){ client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT); client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET)); } this.configRootNode = configRootNode; String rootNode = "/" + configRootNode; // 建立根节点 ZkClientNodeUtil.createNode(client, rootNode, configDir); // 扫描全部配置文件 this.scanConfigFiles(configDir,ZkConstant.ACCEPT_SUFFIX); }catch(Exception e){ logger.error("",e); } } /** * 扫描指定目录下的全部配置文件,并将内容写入到zookeeper节点中 * @param path 扫描的目录 * @param acceptSuffix 接受的文件后缀 * @throws KeeperException * @throws InterruptedException * @throws IOException */ private void scanConfigFiles(String path,String acceptSuffix) throws KeeperException, InterruptedException, IOException{ File dir = new File(path); if(dir.exists() && dir.isDirectory()){ File[] subFiles = dir.listFiles(); for(File file : subFiles){ String absPath = file.getAbsolutePath(); String fileName = file.getName(); if(file.isDirectory() || (null!=acceptSuffix && !fileName.endsWith(acceptSuffix))){ this.scanConfigFiles(absPath,acceptSuffix); }else{ String parentDir = file.getParentFile().getAbsolutePath(); // 读取文件内容 String fileContent = FileUtils.readFileToString(file,ZkConstant.CONF_CHAR_SET); // 建立目录节点 ZkClientNodeUtil.createDirNode(client, configRootNode, parentDir); // 建立该目录下的文件节点 ZkClientNodeUtil.createFileNode(client, configRootNode, parentDir, fileName, fileContent); } } } } }
实现方法很简单,先建立了一个ZkClient对象,而后建立了一个根节点,最后扫描了指定目录下的全部配置文件,并将符合要求的配置文件(及目录)加入到ZooKeeperServer中去。
IConfigSubscriber接口的实现类是ZkConfigSubscriber,看一下ZkConfigSubscriber是怎么实现subscribe方法的:
public class ZkConfigSubscriber implements IConfigSubscriber{ private static final Log logger = LogFactory.getLog(ZkConfigSubscriber.class); private ZkClient client; @Override public void subscribe(String zkServerHost, String configRootNode) { try{ if(client==null){ client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT); client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET)); } String rootNode = "/" + configRootNode; this.clearConfigDir(client,rootNode); this.subscribeRootNode(client, rootNode); // 等待配置信息变动 Thread.currentThread().join(); }catch(Exception e){ logger.error("",e); } } /** * 清空本地的配置文件目录 * @param client * @param rootNode * @throws IOException * @throws InterruptedException * @throws KeeperException */ private void clearConfigDir(ZkClient client,String rootNode) throws IOException{ if(client.exists(rootNode)){ String configDir = client.readData(rootNode); FileUtils.deleteDirectory(new File(configDir)); logger.info("Delete config dir:"+configDir); } } /** * 订阅根节点和递归订阅全部子节点 * @param client * @param rootNodePath * @throws KeeperException * @throws InterruptedException * @throws IOException */ private void subscribeRootNode(ZkClient client,String rootNodePath) throws IOException{ if(client.exists(rootNodePath)){ logger.debug("subscribe node:"+rootNodePath); ZkConfigSubscriber.subscribePath(client, rootNodePath); List<String> subList = client.getChildren(rootNodePath); if(null!=subList && subList.size()>0){ // 将节点的全部子节点保存起来 NodeChildrenChangedWrapper.addChildren(rootNodePath, subList); } for (String subNode : subList) { this.subscribeSubNode(client,rootNodePath,subNode); } }else{ logger.warn("rootNode:"+rootNodePath+" does not exists!"); } } /** * 订阅子节点 * @param client * @param currentNode * @param subNode * @throws KeeperException * @throws InterruptedException * @throws IOException */ private void subscribeSubNode(ZkClient client,String currentNode,String subNode) throws IOException{ String nodePath = currentNode+"/"+subNode; if(nodePath.startsWith("/")){ // 订阅子节点 if(client.exists(nodePath)){ // sync content to client String content = client.readData(nodePath); OnetimeConfigSyncer.syncToClient(content); logger.debug("subscribe node:"+nodePath); ZkConfigSubscriber.subscribePath(client, nodePath); List<String> subList = client.getChildren(nodePath); if(null!=subList && subList.size()>0){ // 将节点的全部子节点保存起来 NodeChildrenChangedWrapper.addChildren(nodePath, subList); } for (String _subNode : subList) { this.subscribeSubNode(client,nodePath,_subNode); } }else{ logger.warn("subNode:"+nodePath+" does not exists!"); } } } }
subscribe方法的具体实现也很简单,先是清空本地的配置文件目录,而后订阅根节点和递归订阅全部子节点。订阅的时候,会将每一个节点和该节点的子节点的状况保存到Map中去,具体的缘由在上一篇博客中已经作过说明,这个再也不赘述。具体执行订阅的方法是ZkConfigSubscriber类中的subscribePath()方法,来看下该方法的内容:
/** * Store the paths that already subscribed */ private static Set<String> subscribedPathSet = new CopyOnWriteArraySet<String>(); public static void subscribePath(ZkClient client,String path){ if(!subscribedPathSet.contains(path)){ subscribedPathSet.add(path); // Subscribe ChildChange and DataChange event at path client.subscribeChildChanges(path, new ChildrenChangeListener(client)); client.subscribeDataChanges(path, new DataChangeListener(client)); logger.info("Subscribe ChildChange and DataChange event at path:"+path); } } public static void unsubscribePath(ZkClient client,String path){ if(subscribedPathSet.contains(path)){ subscribedPathSet.remove(path); // Unsubscribe ChildChange and DataChange event at path client.unsubscribeChildChanges(path, new ChildrenChangeListener(client)); client.unsubscribeDataChanges(path, new DataChangeListener(client)); logger.info("Unsubscribe ChildChange and DataChange event at path:"+path); } }
主要是用一个CopyOnWriteArraySet存储全部已经订阅的节点的path,防止重复订阅。
其中订阅时使用了两个Listener类,分别是ChildrenChangeListener和DataChangeListener。
先来看下ChildrenChangeListener的实现:
/** * ChildrenChangeListener * @author hwang * */ public static class ChildrenChangeListener implements IZkChildListener{ private static final Log logger = LogFactory.getLog(ChildrenChangeListener.class); private ZkClient client; public ChildrenChangeListener(ZkClient client){ this.client = client; } @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { if(currentChilds==null || currentChilds.isEmpty()){ logger.warn("No currentChilds get form parentPath:"+parentPath); return; } ChildrenChangeResult changeResult = NodeChildrenChangedWrapper.diff(parentPath, currentChilds); ChildrenChangeType changeType = changeResult.getChangeType(); List<String> changePath = changeResult.getChangePath(); if(changePath==null || changePath.isEmpty()){ logger.warn("No children changePath get form parentPath:"+parentPath); return; } switch(changeType){ case add:{ for(String subPath : changePath){ logger.info("Add children node,path:"+parentPath+"/"+subPath); String path = parentPath+"/"+subPath; RealtimeConfigSyncer.syncToClient(client,path); } }break; case delete:{ for(String subPath : changePath){ ZkConfigSubscriber.unsubscribePath(client, subPath); String filePath = subPath.replaceAll(ZkConstant.SEPRATOR, "/"); FileUtils.deleteQuietly(new File(filePath)); logger.info("Delete children node,file:"+filePath); } }break; case update:{ logger.info("Update children node,will do nothing"); }break; default:{ logger.info("Default children node operate,will do nothing"); }break; } } }
ZkClient会在NodeChildChanged事件发生时主动触发IZkChildListener接口的handleChildChange方法。因此咱们只须要实现IZkChildListener接口的handleChildChange方法便可,而且同一个path只须要订阅一次,zkclient会自动为咱们对path进行续订。
一样的还有IZkDataListener接口,咱们只须要实现IZkDataListener接口的handleDataChange和handleDataDeleted方法便可,下面就是该接口的具体实现状况:
/** * DataChangeListener * @author hwang */ public static class DataChangeListener implements IZkDataListener{ private static final Log logger = LogFactory.getLog(DataChangeListener.class); private ZkClient client; public DataChangeListener(ZkClient client){ this.client = client; } @Override public void handleDataChange(String dataPath, Object data) throws Exception { logger.info("handleDataChange event,dataPath:"+dataPath); RealtimeConfigSyncer.syncToClient(client,dataPath); } @Override public void handleDataDeleted(String dataPath) throws Exception { logger.info("handleDataDeleted event,dataPath:"+dataPath); ZkConfigSubscriber.unsubscribePath(client, dataPath); String filePath = dataPath.substring(dataPath.indexOf(ZkConstant.SEPRATOR)).replaceAll(ZkConstant.SEPRATOR, "/"); FileUtils.deleteQuietly(new File(filePath)); } }
须要注意的是,当出现新增或修改事件时,只须要将最新的配置文件的内容同步到本地便可,可是出现删除事件时,除了须要删除本地的相关配置文件,还须要将已经订阅的事件取消掉,也就是须要执行ZkConfigSubscriber.unsubscribePath()方法。
实现完发布器和订阅器以后,最后的一个就是配置文件更改器了。更改器主要的工做就是用来修改ZooKeeperServer端的配置文件的内容,具体的实现以下:
/** * 服务端配置文件更改器 * @author hwang * */ public class ZkConfigChanger { private static final Log logger = LogFactory.getLog(ZkConfigChanger.class); private static ZkClient client; /** * 初始化zkclient */ public static void init(){ if(client==null){ try { client = new ZkClient(ZkConstant.ZK_CLUSTER_HOSTS,ZkConstant.ZK_SESSION_TIMEOUT); client.setZkSerializer(new ZkUtils.StringSerializer(ZkConstant.CONF_CHAR_SET)); } catch (Exception e) { logger.error("",e); } } } /** * 新增目录节点 * @param configRootNode * @param dirAbsolutePath 目录的绝对路径,该目录必须是/config/开头的目录 * @throws KeeperException * @throws InterruptedException * @throws UnsupportedEncodingException */ public static boolean addConfigDir(String configRootNode,String dirAbsolutePath) throws KeeperException, InterruptedException, UnsupportedEncodingException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(dirAbsolutePath)){ logger.error("dirAbsolutePath can't be empty"); return false; } return ZkClientNodeUtil.createDirNode(client, configRootNode, dirAbsolutePath); } /** * 删除目录节点 * @param configRootNode * @param dirAbsolutePath * @throws InterruptedException * @throws KeeperException */ public static boolean deleteConfigDir(String configRootNode,String dirAbsolutePath) throws InterruptedException, KeeperException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(dirAbsolutePath)){ logger.error("dirAbsolutePath can't be empty"); return false; } return ZkClientNodeUtil.deleteDirNode(client, configRootNode, dirAbsolutePath); } /** * 新增文件节点 * @param configRootNode * @param fileAbsolutePath 文件的绝对路径,不包括文件名 * @param fileName 文件名 * @param fileContent 文件内容 * @throws KeeperException * @throws InterruptedException * @throws UnsupportedEncodingException */ public static boolean addConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws KeeperException, InterruptedException, UnsupportedEncodingException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){ logger.error("fileAbsolutePath,fileName,fileContent can't be empty"); return false; } return ZkClientNodeUtil.createFileNode(client, configRootNode, fileAbsolutePath, fileName, fileContent); } /** * 删除文件节点 * @param configRootNode * @param fileAbsolutePath 文件的绝对路径,不包括文件名 * @param fileName 文件名 * @throws InterruptedException * @throws KeeperException */ public static boolean deleteConfigFile(String configRootNode,String fileAbsolutePath,String fileName) throws InterruptedException, KeeperException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName)){ logger.error("fileAbsolutePath,fileName can't be empty"); return false; } return ZkClientNodeUtil.deleteFileNode(client, configRootNode, fileAbsolutePath, fileName); } /** * 更新配置文件内容 * @param configRootNode * @param fileAbsolutePath * @param fileName * @param fileContent * @throws InterruptedException * @throws KeeperException * @throws UnsupportedEncodingException */ public static boolean updateConfigFile(String configRootNode,String fileAbsolutePath,String fileName,String fileContent) throws InterruptedException, KeeperException, UnsupportedEncodingException{ if(null==client){ logger.warn("Not connected to ZooKeeper,will return"); return false; } if(StringUtils.isEmpty(fileAbsolutePath) || StringUtils.isEmpty(fileName) || StringUtils.isEmpty(fileContent)){ logger.error("fileAbsolutePath,fileName,fileContent can't be empty"); return false; } return ZkClientNodeUtil.updateFileNode(client, configRootNode, fileAbsolutePath, fileName, fileContent); } }
至此,经过ZkClient重构的统一配置管理框架就完成了。
通过实际测试,zkclient能够完美解决上一篇博客中未解决的问题,这得益于zkclient大量正确的使用了retryUntilConnected方法。