ZooKeeper实现分布式锁

1.基于ZooKeeper分布式锁的流程java

在zookeeper指定节点(locks)下建立临时顺序节点node_n
获取locks下全部子节点children
对子节点按节点自增序号从小到大排序
判断本节点是否是第一个子节点,如果,则获取锁;若不是,则监听比该节点小的那个节点的删除事件
若监听事件生效,则回到第二步从新进行判断,直到获取到锁

2.实现node

zookeeper系列(五)实战分布式锁apache

3.简单实现segmentfault

package zookeeper;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class TestLock implements Watcher {

	private ZooKeeper zk=null;
	private String config;
	private String root="/locks";
	private String lock="/lock_";
	private String currentPath;
	// 计数器
    private CountDownLatch countDownLatch;
    
	@Override
	public void process(WatchedEvent event) {
		System.out.println(event.getPath()+"-"+event.getType()+"-"+event.getState());
	}
	
	/**
	 * @param config 路径
	 * @param root  根目录
	 * @throws IOException
	 * @throws InterruptedException 
	 * @throws KeeperException 
	 */
	public TestLock(String config) throws IOException, KeeperException, InterruptedException {
		if(null==config) {
			return;
		}
		this.config=config;
		zk=new ZooKeeper(config,5000,this);
		Stat st=zk.exists(root, false);
		if(null==st) {
			zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
		}
	}
	
	/**
	 * 获取锁
	 * @return
	 * @throws InterruptedException 
	 * @throws KeeperException 
	 */
	public synchronized boolean  getLock() throws KeeperException, InterruptedException {
		currentPath=zk.create(root+lock,new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
		System.out.println("当前路径:"+currentPath.substring(currentPath.lastIndexOf('/')+1));
		while(true) {
        List<String>children=zk.getChildren(root, false);
        //排序
        Collections.sort(children);
        //获取位置
        System.out.println(children.toString());
        int index=children.indexOf(currentPath.substring(currentPath.lastIndexOf("/")+1));
        //不是在开头位置
        if(index!=0) {
        	System.out.println(root+"/"+children.get(0));
        	new ZkClient(this.config).subscribeDataChanges(root+"/"+children.get(0),new IZkDataListener(){
        		// 当修改当前节点的数据
        		public void handleDataChange(String arg0, Object arg1)
        		throws Exception {
        			System.out.println("---");
        		}
				@Override
				public void handleDataDeleted(String arg0) throws Exception {
					System.out.println("线程:"+Thread.currentThread().getName()+"释放锁:"+currentPath);
					 countDownLatch.countDown();
				}	
        	});
        	this.countDownLatch=new CountDownLatch(1);
        	this.countDownLatch.await();
        	this.countDownLatch=null;        	
        }else {
        	System.out.println("线程:"+Thread.currentThread().getName()+"获取锁:"+currentPath);
        	break;
         }
        }
		return false;
	}

	public void unlock() throws InterruptedException, KeeperException {
		this.zk.delete(currentPath, -1);
		System.out.println("线程:"+Thread.currentThread().getName()+"释放锁:"+currentPath);
	}
	
	public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
		Runnable runnable=new Runnable() {
			@Override
			public void run() {
				TestLock tl;
				try {
					tl = new TestLock("127.0.0.1:2181");
					tl.getLock();
					tl.unlock();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (KeeperException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		};
		
//		 for (int i = 0; i < 1; i++) {
//	            Thread t = new Thread(runnable);
//	            t.start();
//	        }
		
		  Thread t = new Thread(runnable);
          t.start();
	}
	
}