上一篇的内容是补充了ZAB协议和分布式队列的一种实现,ZAB咱们谈到了它的一个协议流程和在和follower失联时的崩溃恢复,还有如何进行数据同步和丢弃事务。分布式队列的具体代码实现中的结构,还有类中定义的每一个方法基本也都有说起了,相信你们也必定可以本身动手完成代码的补充并成功运行。分布式队列的代码逻辑以下图,注意使用的虚实线和线的颜色都指代了不一样的行为。java
从零开始的高并发(一)--- Zookeeper的基础概念node
从零开始的高并发(二)--- Zookeeper实现分布式锁服务器
从零开始的高并发(三)--- Zookeeper集群的搭建和leader选举markdown
从零开始的高并发(四)--- Zookeeper的经典应用场景数据结构
分布式环境下,咱们的服务不少,配置确是共用一套的。处理起来会十分麻烦,配置中心能够帮助咱们解决系统参数配置及参数的动态修改问题并发
运维管理人员把配置修改以后进行提交,把配置推送到配置中心,咱们分布式应用下的每一个应用实例能够经过对watch事件的监听来获取配置中心文件的变动,在不重启服务的状况下也能作到把应用中的一些属性从内存中替换掉app
假设咱们如今拥有这么一台zookeeper服务器,咱们须要建立一个配置中心的根目录distributeConfigure,注意图中的server是指咱们集群中的某项服务,server1-file1.cnf是指这个服务下的配置文件1,2,3···,server.port也属于其中一个配置,从这一层开始对应zookeeper下的一个个节点,也就是说,咱们把每一项配置,好比刚刚的服务端口server.port,都看做是一个znode,而后一个一个往zookeeper中存过去便可。运维
此时咱们不关心服务是否挂掉或者怎样,也不关心节点的顺序(除非是服务功能中有特殊要求),还有就是,咱们的配置通常也会有该配置的名字,可是对于顺序通常也是不要求的,因此咱们就会选用持久节点来记录配置。分布式
此时应用服务要作的事情就很简单了,就是对这些个节点进行监控,只要节点存在变化,应用服务就把节点下的数据取过来便可。ide
前面提到的都是一个配置项对应一个znode,那咱们其实也能够换一种想法,好比我不少个配置项都放在了同一个文件下,那我就换成,一个文件对应一个znode,把文件下的内容都放置在znode的value里面
刚刚咱们也提到过了,运维管理人员把配置修改以后进行提交推送给配置中心,那咱们如今就得实现一个运维人员须要用到的接口出来,可以对配置文件进行读写操做。
public interface ConfigureWriter {
/**
* 建立一个新的配置文件
* @param fileName 文件名称
* @param items 配置项
* @return 新文件的在zk上的路径
*/
String createCnfFile(String fileName, Properties items);
/**
* 删除一个配置文件
* @param fileName
*/
void deleteCnfFile(String fileName);
/**
* 修改一个配置文件
* @param fileName
* @param items
*/
void modifyCnfItem(String fileName, Properties items);
/**
* 加载配置文件
* @param fileName
* @return
*/
Properties loadCnfFile(String fileName);
复制代码
}
/**
* 配置文件读取器
* ConfigureReader
*/
public interface ConfigureReader {
/**
* 读取配置文件
* @param fileName 配置文件名称
* @param ChangeHandler 配置发生变化的处理器
* @return 若是存在文件配置,则返回Properties对象,不存在返回null
*/
Properties loadCnfFile(String fileName);
/**
* 监听配置文件变化,此操做只须要调用一次。
* @param fileName
* @param changeHandler
*/
void watchCnfFile(String fileName, ChangeHandler changeHandler);
/**
* 配置文件变化处理器
* ChangeHandler
*/
interface ChangeHandler {
/**
* 配置文件发生变化后给一个完整的属性对象
* @param newProp
*/
void itemChange(Properties newProp);
}
}
复制代码
刚刚也提到了,运维人员须要使用到ConfigureWriter这个接口进行配置文件的读写操做,中途为了确保zookeeper上不存在这个节点,先执行了一次writer.deleteCnfFile(fileName),使用了一个线程去读配置文件
public class ConfigureTest {
public static void main(String[] args) {
// 模拟运维人员建立配置文件,引用ConfigureWriter接口操做
ConfigureWriter writer = new ZkConfigureCenter();
String fileName = "trade-application.properties";
writer.deleteCnfFile(fileName); // 测试,确保配置中心没有这个问题
Properties items = new Properties();
items.put("abc.gc.a", "123");
items.put("abc.gc.b", "3456");
// 建立配置文件,内容为 properties items的内容。
String znodePath = writer.createCnfFile(fileName, items);
System.out.println("new file: "+znodePath);
new Thread(()->{
readCnf();
}).start();
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3秒后修改文件内容,有新增、有删除、有修改
items.put("abc.gc.a", "haha"); // 修改
items.put("abc.gc.c", "xx"); // 新增
items.remove("abc.gc.b"); // 删除
writer.modifyCnfItem(fileName, items);
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 模拟应用程序加载配置文件,监听配置文件的变化
*/
public static void readCnf() {
// 应用引用ConfigureReader接口进行操做
System.out.println("读取并监听配置文件");
ConfigureReader reader = new ZkConfigureCenter();
String fileName = "trade-application.properties";
Properties p = reader.loadCnfFile(fileName); // 读取配置文件
System.out.println(p);
// 监听配置文件
reader.watchCnfFile(fileName, new ChangeHandler() {
@Override
public void itemChange(Properties newProp) {
System.out.println("发现数据发生变化:"+ newProp);
}
});
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
复制代码
其实就是把咱们的每个配置项都放入zookeeper上
首先,咱们创建了一个这样的配置文件 /distributeConfigure/cnfFile/trade-application.properties,该文件下的内容是
客户端这时也读取到了这个配置文件的相关信息
而后咱们对这个配置文件进行了一些修改,在测试代码的进程处已经写得很是清楚作了哪些修改了,新增删除修改3个操做都进行了一次
此时若是断开链接,就会出现报错
咱们能够打开zkClient经过命令来检查一下程序的可靠性,经过ls /path命令查看一下这些节点都是否正常以后,咱们来手动修改一下节点数据
此时事情还没完,假设咱们一次性对几百甚至上千个配置进行了修改,那岂不是一会儿会弹出几百上千条通知吗,因此咱们还要考虑一下请求合并,还有就是,咱们可能也不止一个运维人员,也有可能同时有好几我的对同一个配置文件进行了修改,因此咱们也能够考虑用锁来锁定这个配置文件,只让一我的来进行修改,不过咱们要注意,由于zookeeper中对写事务的提交是有原子性的,写操做都会按顺序来进行,不过咱们就会进行模拟,只容许一我的修改的状况。
了解以上问题以后,咱们来讲说上面代码中未说起的配置中心的实现
public class ZkConfigureCenter implements ConfigureWriter, ConfigureReader {}
复制代码
zkClient已是连着好几篇文的老油条了,confRootPath是根目录,confFilePath是配置文件目录,fileLockPath是模拟锁的目录,由于咱们会针对一个文件使用一把锁,那就确定不止使用一把,因此咱们就创建一个目录来存放这些锁。每当发起一次写操做,那就增长一个节点当文件锁。
private String confRootPath;
private String confFilePath;
private String fileLockPath;
private static final String default_confRootPath = "/distributeConfigure";
private ZkClient client;
复制代码
public ZkConfigureCenter() {
this(default_confRootPath);
}
public ZkConfigureCenter(String path) {
if(path == null || path.trim().equals("")) {
throw new IllegalArgumentException("patch不能为空字符串");
}
confRootPath = path;
confFilePath = confRootPath+"/cnfFile";
fileLockPath = confRootPath+"/writeLock";
client = new ZkClient("localhost:2181");
client.setZkSerializer(new MyZkSerializer());
if (!this.client.exists(confFilePath)) {
try {
this.client.createPersistent(confFilePath, true);
} catch (ZkNodeExistsException e) {
}
}
}
//简单的参数检查
private void checkElement(String v) {
if (v == null) throw new NullPointerException();
if("".equals(v.trim())) {
throw new IllegalArgumentException("不能使用空格");
}
if(v.startsWith(" ") || v.endsWith(" ")) {
throw new IllegalArgumentException("先后不能包含空格");
}
}
复制代码
首先这配置文件总得有个fileName吧,items就是表明这个配置文件下的各项属性,具体内容请看注释,里面使用到了咱们在 从零开始的高并发(二)--- Zookeeper实现分布式锁 中的ZkDistributeImproveLock.java,代码位置在"使用zookeeper来进行开发"的 3 - ② zookeeper实现分布式锁方式二,若是想要跑一下前面的测试代码的话,建议去ctrl+c/+v一下便可,记得要把我删掉的不须要覆写的方法补全
@Override
public String createCnfFile(String fileName, Properties items) {
checkElement(fileName);
// 建立配置文件Node
String cfgNode = confFilePath+"/"+fileName;
//若是配置文件已经存在,总不能把别人的给覆写掉吧
if(client.exists(cfgNode)) {
throw new IllegalArgumentException("["+fileName+"]文件已存在!");
}
//没问题了,建立持久节点
client.createPersistent(cfgNode, true);
// 建立配置文件中的配置项
if(items == null) {return cfgNode;}
//这里咱们建立了带上这个配置文件名字的一把分布式锁,不一样的文件名就意味着不一样的锁
// ZkDistributeImproveLock的实现(参考"从零开始的高并发(二)--- Zookeeper实现分布式锁")
Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName);
distributeWriteLock.lock();
try {
//如下就是对properties进行遍历而后把属性值一个个写进去而已
//若是真的没看懂这个,建议使用IDEA进行debug一下,
items.keySet().iterator();
Set<Map.Entry<Object, Object>> entrySet = items.entrySet();
for (Map.Entry<Object, Object> entry : entrySet) {
System.out.println(entry.getKey() + "=" + entry.getValue());
String cfgItemNode = cfgNode +"/"+ entry.getKey().toString();
client.createPersistent(cfgItemNode, entry.getValue());
}
} finally {
distributeWriteLock.unlock();
}
return cfgNode;
}
复制代码
删除和建立须要征用同一把锁,否则我建立的时候你就把个人给删了这不太团结吧,deleteRecursive()方法是一个递归删除方法,若是没有获取到锁,会进行阻塞,也能够在分布式锁实现中指定一个恢复时间,这个时间内没有获取到锁,就把进程给结束掉,或者使用try,没有获取到,也就是有人在修改,那这时咱们给个返回值也好,抛出个异常也好,告诉该进程有人在修改便可
@Override
public void deleteCnfFile(String fileName) {
checkElement(fileName);
String cfgNode = confFilePath+"/"+fileName;
Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName);
//获取锁
distributeWriteLock.lock();
try {
client.deleteRecursive(cfgNode);
} finally {
//释放锁
distributeWriteLock.unlock();
}
}
复制代码
下面是deleteRecursive的源码
它就会先从本身的子目录,也就是children那里开始找,而后遍历删除
这里我是把提交过来的properties文件整个放入了znode里面,提交过来的会默认为最新的一份配置,主要思路就在于,先获取原来的,而后再和如今新传过来的进行比对。
由于若是咱们使用刚刚那个demo中先删后增的方法,可能我100个配置我就只修改了一个配置,可是仍是把整个文件给从新弄一份,这样会引起不少的监听,因此try代码块里面是先获取到原来的配置信息,Set existentItemSet主要做用是去重,若是这个集合中包含了所修改的配置信息,就再判断数据是否已经有变更,有变更的状况下修改,而后把多余的配置(这里的多余指的是没有进行过对比处理的数据,由于修改或者说是不变都已是通过对比处理的)给删除便可。若是这个集合中不包含我如今写入的配置项,就要新增。
@Override
public void modifyCnfItem(String fileName, Properties items) {
checkElement(fileName);
// 获取子节点信息
String cfgNode = confFilePath+"/"+fileName;
// 简单粗暴的实现
if(items == null) {throw new NullPointerException("要修改的配置项不能为空");}
items.keySet().iterator();
Set<Map.Entry<Object, Object>> entrySet = items.entrySet();
Lock distributeWriteLock = new ZkDistributeImproveLock(fileLockPath+"/"+fileName);
distributeWriteLock.lock();
try {
// 获取zk中已存在的配置信息
List<String> itemNodes = client.getChildren(cfgNode);
Set<String> existentItemSet = itemNodes.stream().collect(Collectors.toSet());
for (Map.Entry<Object, Object> entry : entrySet) {
System.out.println(entry.getKey() + "=" + entry.getValue());
String itemName = entry.getKey().toString();
String itemData = entry.getValue().toString();
String cfgItemNode = cfgNode + "/" + itemName;
if(existentItemSet.contains(itemName)) {// zk中存在的配置项
String itemNodeData = client.readData(cfgItemNode);
if(! eql(itemNodeData, itemData)) { // 数据不一致才须要修改
client.writeData(cfgItemNode, itemData);
}
existentItemSet.remove(itemName); // 剩下的就是须要删除的配置项
} else { // zk中不存在的配置项,新的配置项
client.createPersistent(cfgItemNode, itemData);
}
}
// existentItemSet中剩下的就是须要删除的
if(!existentItemSet.isEmpty()) {
for(String itemName : existentItemSet) {
String cfgItemNode = cfgNode + "/" + itemName;
client.delete(cfgItemNode);
}
}
} finally {
distributeWriteLock.unlock();
}
}
复制代码
比较简单,没啥好说
@Override
public Properties loadCnfFile(String fileName) {
if(! fileName.startsWith("/")) {
fileName = confFilePath+"/"+fileName;
}
return loadNodeCnfFile(fileName);
}
private Properties loadNodeCnfFile(String cfgNode) {
checkElement(cfgNode);
if(! client.exists(cfgNode)) {
throw new ZkNoNodeException(cfgNode);
}
// 获取子节点信息
List<String> itemNodes = client.getChildren(cfgNode);
// 读取配置信息,并装载到Properties中
if(itemNodes == null || itemNodes.isEmpty()) {
return new Properties();
}
Properties file = new Properties();
itemNodes.stream().forEach((e)->{
String itemNameNode = cfgNode + "/" + e;
String data = client.readData(itemNameNode, true);
file.put(e, data);
});
return file;
}
复制代码
这里子节点的数据读取(也就是我刚刚打开zookeeper的zkClient而后用命令来新增节点)会有个问题,当咱们须要新增节点的时候,咱们是不会触发咱们DataChange监听事件的,那是由于,我新增节点的时候,根本就尚未这个节点,在尚未这个节点的时候,是没法监听内容的变动的。因此咱们在仍是须要经过子节点的handleChildChange()来补救这个监听,这就是为何代码最后须要用到client.subscribeChildChanges(···),此时监听父节点的子节点变动,若是子节点有发生变化了,那就触发事件便可,fileNodePath是父节点的路径
triggerHandler请参考 ⑨ 的内容
@Override
public void watchCnfFile(String fileName, ChangeHandler changeHandler) {
if(! fileName.startsWith("/")) {
fileName = confFilePath+"/"+fileName;
}
final String fileNodePath = fileName;
// 读取文件
Properties p = loadNodeCnfFile(fileNodePath);
if(p != null) {
// 合并5秒配置项变化,5秒内变化只触发一次处理事件
int waitTime = 5;
final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1);
scheduled.setRemoveOnCancelPolicy(true);
final List<ScheduledFuture<?>> futureList = new ArrayList<ScheduledFuture<?>>();
Set<Map.Entry<Object, Object>> entrySet = p.entrySet();
for (Map.Entry<Object, Object> entry : entrySet) {
System.out.println("监控:"+fileNodePath+"/"+entry.getKey().toString());
client.subscribeDataChanges(fileNodePath+"/"+entry.getKey().toString(), new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("触发删除:"+dataPath);
triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("触发修改:"+dataPath);
triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
}
});
}
client.subscribeChildChanges(fileNodePath, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("触发子节点:"+parentPath);
triggerHandler(futureList, scheduled, waitTime, fileNodePath, changeHandler);
}
});
}
}
复制代码
在 ⑧ 那里咱们已经看到了一个配置项的修改会触发这么多个监听事件,这种作法不太可取,回到咱们的ConfigureReader中,咱们已经定义好的这么一个接口,把配置发生变化的配置项在5秒(waitTime)内所进行的修改都合并成一个事件
/**
* 配置文件变化处理器
* ChangeHandler
*/
interface ChangeHandler {
/**
* 配置文件发生变化后给一个完整的属性对象
* @param newProp
*/
void itemChange(Properties newProp);
}
复制代码
再回到ZkConfigureCenter.java,比较方便的理解就是,咱们把咱们对提交上来的修改,根据时间划分为5秒一块,此时在这5秒以内最后一个修改任务以前的future,若是仍未执行成功,会进行cancel()取消掉,而后remove掉,咱们只取这5秒内的最后一个事件做为咱们监听事件触发的条件,因此与其说合并事件,不如就是单纯认为,咱们取了5秒内futureList的最后一个future。
/**
* 合并修改变化事件,5秒钟内发生变化的合并到一个事件进行
* @param futureList 装有定时触发任务的列表
* @param scheduled 定时任务执行器
* @param waitTime 延迟时间,单位秒
* @param fileName zk配置文件的节点
* @param changeHandler 事件处理器
*/
private void triggerHandler(List<ScheduledFuture<?>> futureList, ScheduledThreadPoolExecutor scheduled, int waitTime, String fileName, ChangeHandler changeHandler) {
if(futureList != null && !futureList.isEmpty()) {
for(int i = 0 ; i < futureList.size(); i++) {
ScheduledFuture<?> future = futureList.get(i);
if(future != null && !future.isCancelled() && !future.isDone()) {
future.cancel(true);
futureList.remove(future);
i--;
}
}
}
ScheduledFuture<?> future = scheduled.schedule(()->{
Properties p = loadCnfFile(fileName);
changeHandler.itemChange(p);
}, waitTime, TimeUnit.SECONDS);
futureList.add(future);
}
复制代码
至此配置中心的模拟实现就结束了。整个类的代码都在 ① ~ ⑨ 中,能够直接ctrl+c/+v使用
配置中心的知识总结其实就是下面4个知识点
持久节点+watch机制+分布式锁+事件合并
复制代码
master和关于一些zookeeper官网的一些treasure的介绍搁置到下一篇···
next:从零开始的高并发(六)--- Zookeeper的经典应用场景3