//lock 锁 定义分布式锁 public interface Lock { //获取锁 public void getLock(); //释放锁 public void unLock(); }
public abstract class ZookeeperAbstractLock implements Lock{ //zk链接地址 private static final String address = "127.0.0.1:2181"; protected static final String path = "/lock"; //zk链接客户端 protected ZkClient zkClient = new ZkClient(address); protected CountDownLatch countDownLatch = null; public void getLock() { if(tryLock()){ System.out.println("###获取锁成功###"); }else{ //等待 waitLock(); //从新获取锁 getLock(); } } public void unLock() { if(zkClient != null){ zkClient.close(); } } // 是否获取锁成功,成功返回true,失败返回false abstract Boolean tryLock(); // 等待锁 abstract void waitLock(); }
public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock{ @Override Boolean tryLock() { try{ zkClient.createEphemeral(path); return Boolean.TRUE; }catch(Exception e){ return Boolean.FALSE; } } @Override void waitLock() { //使用事件监听,获取到节点被删除 IZkDataListener iZkDataListener = new IZkDataListener() { //当节点被删除的时候 public void handleDataDeleted(String dataPath) throws Exception { if(countDownLatch != null){ //信号量减一,唤醒 countDownLatch.countDown(); } } //当节点发生改变 public void handleDataChange(String dataPath, Object data) throws Exception { // TODO Auto-generated method stub } }; //注册节点信息 zkClient.subscribeDataChanges(path, iZkDataListener); //检测节点是否存在,若是存在则等待 if(zkClient.exists(path)){ //建立信号量 countDownLatch = new CountDownLatch(1); try{ //进行等待 countDownLatch.await(); }catch(Exception e){ } } //删除事件通知 zkClient.unsubscribeDataChanges(path, iZkDataListener); } }
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.7.0</version>
</dependency>apache
public class CuratorDistrLockTest {分布式
/** Zookeeper info */
private static final String ZK_ADDRESS = "127.0.0.1:2181";
private static final String ZK_LOCK_PATH = "/zktest";ide
public static void main(String[] args) throws InterruptedException {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
System.out.println("zk client start successfully!");ui
Thread t1 = new Thread(() -> {
doWithLock(client);
}, "t1");
Thread t2 = new Thread(() -> {
doWithLock(client);
}, "t2");code
t1.start();
t2.start();
}事件
private static void doWithLock(CuratorFramework client) {
InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
try {
if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " hold lock");
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " release lock");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}ip
}ci
}get