主要介绍下原生zookeeper客户端API使用、zkClient工具包和分布式锁的简单实现。java
导入依赖jarapache
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>复制代码
实现对zookeeper的基本操做vim
复制代码
private static String connection = "127.0.0.1:2181";//链接信息
private static String rootPath ="/study";//目录
/*建立一个Watcher*/
private static Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("path:"+event.getPath()+",type:"+event.getType());
if(event.getType().equals(Event.EventType.NodeChildrenChanged)){//子节点变化触发
System.out.println("子节点变化触发");
}else if(event.getType().equals(Event.EventType.NodeCreated)){//建立节点触发
System.out.println("建立节点触发");
}else if(event.getType().equals(Event.EventType.NodeDataChanged)){//节点数据变化
System.out.println("节点数据变化");
}else if(event.getType().equals(Event.EventType.NodeDeleted)){//节点删除
System.out.println("节点删除");
}else {
System.out.println("其余");
}
}
};
public static void main(String[] args) throws Exception {
ZooKeeper zooKeeper = new ZooKeeper(connection,2000,watcher);
//没有建立父节点时抛出异常 KeeperException$NoNodeException
/*zooKeeper.create(firstPath+"/test","this is my first create".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);*/
String firstPath = rootPath;
//判断节点是否存在,两个参数:一、节点路径;二、是否监控(Watcher即初始化ZooKeeper时传入的Watcher)
if(zooKeeper.exists(firstPath,true) != null){
//删除节点
zooKeeper.delete(firstPath,-1);
}
if(zooKeeper.exists(firstPath,true) == null){
//建立一个持久节点节点 ,四个参数:一、节点路径;二、节点数据;三、节点权限;四、建立模式
String path = zooKeeper.create(firstPath,"this is my first create".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
Stat stat = new Stat();
//获取该节点下的数据,三个参数:一、节点路径;二、书否监控该节点;三、版本等信息能够经过一个Stat对象来指定
String data = new String(zooKeeper.getData(firstPath,false,stat));
System.out.println("data:"+data);
System.out.println(stat.toString());
//注册watcher
zooKeeper.register(watcher);
//修改节点数据 ,version -1 匹配全部版本
zooKeeper.setData(firstPath,"update date".getBytes(),-1);
System.out.println("修改后:"+new String(zooKeeper.getData(firstPath,watcher,null)));
//建立子节点,并设置watcher
String childrenPath = firstPath+"/first";
//获取孩子节点,并注册watcher监听Event.EventType.NodeChildrenChanged事件
zooKeeper.getChildren(firstPath,true);
zooKeeper.exists(childrenPath,watcher);
zooKeeper.create(childrenPath,"this is a child".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);//触发连个watcher
//获取孩子节点,并注册watcher监听Event.EventType.NodeChildrenChanged事件
List<String> children = zooKeeper.getChildren(firstPath,true);
System.out.println(children);
zooKeeper.exists(childrenPath,watcher);
zooKeeper.setData(childrenPath,"modify child data".getBytes(),-1);
//判断子节点是否存在,并在子节点注册watcher
zooKeeper.exists(childrenPath,watcher);
zooKeeper.delete(childrenPath,-1);//触发了了两个watcher,
//关闭链接
zooKeeper.close();
}复制代码
须要注意的是原生的API建立节点只能在父目录存在时才能建立,删除时也不能递归删除。因为watch机制只触发一次的特性,若是想一直监听节点变化就要不断的注册watch,这对于开发者而言是不太友好的。
bash
导入依赖jar
分布式
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>复制代码
要使用zkClient必须实现它的序列化接口ZkSerializeride
public class MyZkSerializer implements ZkSerializer {
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
String d = (String) data;
try {
return d.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
} 复制代码
代码示例:工具
public static void main(String[] args) {
// 建立一个zk客户端
ZkClient client = new ZkClient("localhost:2181");
String path = "/zkClient";
client.setZkSerializer(new MyZkSerializer());
//建立节点
client.create(path, "123", CreateMode.PERSISTENT);
// 递归建立节点(持久节点)
client.createPersistent("/key1/key2/key3",true);
//建立对子节点的监听
IZkChildListener iZkChildListener = new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath+"子节点发生变化:"+currentChilds);
}
};
//注册watcher,每次触发了watcher事件以后会自动注册,
client.subscribeChildChanges(path,iZkChildListener);
//取消注册
// client.unsubscribeChildChanges(path,iZkChildListener);
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(dataPath+"节点被删除");
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println(dataPath+"发生变化:"+data);
}
};
client.subscribeDataChanges(path, iZkDataListener);
// if(client.exists(path)){//当前节点是否存在
// client.deleteRecursive(path);//删除节点,能够递归删除目录下全部子节点
// }
//获取数据
Object data = client.readData(path);
System.out.println(data);
//获取子节点列表
List<String> children = client.getChildren(path);
System.out.println(children);
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}复制代码
分布式锁实现的基于客户端断开链接临时节点自动删除这一特性上实现。ui
实现思路有两种:this
1.建立同一个临时节点,建立成功的获取锁,其余服务对这一个临时节点注册watcher,一旦节点被删除全部注册了watcher的服务都将受到通知进行抢锁。spa
2.利用临时顺序节点,每个服务建立一个临时顺序节点,抢锁时由当前排序最小的获取锁。其余的注册前一个节点的watcher事件,监听节点删除事件,前一个节点删除则再次发起抢锁操做。
下面一段代码利用临时顺序节点实现的锁
package com.top.learn;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class ZKDistributed implements Lock {
private String lockPath;
private ZkClient client;
/*
* 利用临时顺序节点来实现分布式锁
* 获取锁:取排队号(建立本身的临时顺序节点),而后判断本身是不是最小号,如是,则得到锁;不是,则注册前一节点的watcher,阻塞等待
* 释放锁:删除本身建立的临时顺序节点
*/
//存放当前节点路径
ThreadLocal<String> currentPath = new ThreadLocal<>();
//存放前一个节点路劲
ThreadLocal<String> beforePath = new ThreadLocal<>();
public ZKDistributed(String connection,String path){
//这里能够作一些参数校验
this.lockPath = path;
client = new ZkClient(connection);
if(!client.exists(path)){
client.createPersistent(path,true);
}
}
@Override
public boolean tryLock() {
String path = currentPath.get();
if(path == null || !client.exists(path)){//未建立过节点
//建立一个临时顺序节点
path = client.createEphemeralSequential(lockPath + "/","locked");
currentPath.set(path);
}
//获取全部的自节点
List<String> children = client.getChildren(lockPath);
//排序
Collections.sort(children);
if(currentPath.get().equals(lockPath + "/" + children.get(0))){
//当前是最小节点,获取锁
return true;
}else {
int index = children.indexOf(currentPath);
beforePath.set(children.get(index-1)); //获取上一个节点路径
}
return false;
}
@Override
public void lock() {
while(!tryLock()){
// 阻塞等待
waitForLock();
// 再次尝试加锁
lock();
}
}
private void waitForLock() {
CountDownLatch countDownLatch = new CountDownLatch(1);
IZkDataListener dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
//节点删除,取消阻塞
countDownLatch.countDown();
}
};
//注册对前一个节点的监听
client.subscribeDataChanges(beforePath.get(),dataListener);
try {
if(client.exists(beforePath.get())){//前一个节点存在才等待
countDownLatch.await();
}
}catch (Exception e){
}
//取消watcher
client.unsubscribeDataChanges(beforePath.get(),dataListener);
}
@Override
public void unlock() {
client.deleteRecursive(currentPath.get());
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
} 复制代码
上面一段代码没有对锁进行可重入实现。