当不少进程须要访问共享资源时,咱们能够经过zk来实现分布式锁。主要步骤是:
1.创建一个节点,假如名为:lock 。节点类型为持久节点(PERSISTENT)
2.每当进程须要访问共享资源时,会调用分布式锁的lock()或tryLock()方法得到锁,这个时候会在第一步建立的lock节点下创建相应的顺序子节点,节点类型为临时顺序节点(EPHEMERAL_SEQUENTIAL),经过组成特定的名字name+lock+顺序号。
3.在创建子节点后,对lock下面的全部以name开头的子节点进行排序,判断刚刚创建的子节点顺序号是不是最小的节点,假如是最小节点,则得到该锁对资源进行访问。
4.假如不是该节点,就得到该节点的上一顺序节点,并给该节点是否存在注册监听事件。同时在这里阻塞。等待监听事件的发生,得到锁控制权。
5.当调用完共享资源后,调用unlock()方法,关闭zk,进而能够引起监听事件,释放该锁。
实现的分布式锁是严格的按照顺序访问的并发锁。java
import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class DistributedLock implements Watcher { ZooKeeper zk = null; // zookeeper原生api去实现一个分布式锁 private String root = "/locks"; private String myZonode; // 表示当前获取到的锁名称-也就是节点名称 private String waitNode; // 表示当前等待的节点 private CountDownLatch latch; private static final int SESSION_TIMEOUT = 10000; // 超时时间 /** * 构造函数初始化 * * @param config * 表示zookeeper链接串 */ public DistributedLock(String config) { try { zk = new ZooKeeper(config, SESSION_TIMEOUT, this); Stat stat = zk.exists(root, false); // 判断是否是已经存在locks节点,不须要监听root节点 if (stat == null) { // 若是不存在,则建立根节点 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public void process(WatchedEvent event) { if (this.latch != null) { // 若是计数器不为空话话,释放计数器锁 this.latch.countDown(); } } /** * 获取锁的方法 */ public boolean lock(String name) { if (tryLock(name)) { return true; } try { return waitLock(waitNode, SESSION_TIMEOUT); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } /** * 释放锁操做的方法 */ public void unlock() { try { zk.delete(myZonode, -1); myZonode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } private boolean tryLock(String name) { String splitStr = name; // lock_0000000001 try { // 建立一个有序的临时节点,赋值给myznode myZonode = zk.create(root + "/" + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List<String> subNodes = zk.getChildren(root, false); Collections.sort(subNodes); // 讲全部的子节点排序 if (myZonode.equals(root + "/" + subNodes.get(0))) { // 当前客户端建立的临时有序节点是locks下节点中的最小的节点,表示当前的客户端可以获取到锁 return true; } // 不然的话,监听比本身小的节点 locks/lock_0000000003 String subMyZnode = myZonode .substring((myZonode.lastIndexOf("/") + 1)); waitNode = subNodes.get(Collections.binarySearch(subNodes, subMyZnode) - 1);// 获取比当前节点小的节点 } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } private boolean waitLock(String lower, long waitTime) throws KeeperException, InterruptedException { Stat stat = zk.exists(root + "/" + lower, true); // 获取节点状态,并添加监听 if (stat != null) { this.latch = new CountDownLatch(1); // 实例化计数器,让当前的线程等待 this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; } public static int count =10; public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { Runnable runnable = new Runnable() { public void run() { try { DistributedLock distributeLockDemo = new DistributedLock( "127.0.0.1:2181"); boolean lock = distributeLockDemo.lock("test_"); if (lock) { System.out.println(count--); distributeLockDemo.unlock(); } } catch (Exception e) { e.printStackTrace(); } } }; executorService.execute(runnable); } executorService.shutdown(); } }