在分布式系统中,对一个共享资源的互斥访问操做,就能够用分布式锁来解决,相似于单机的Lock,分为排他锁和读写锁。node
特色:segmentfault
是否是和以前的zookeeper之master选举很像。
因此zookeeper的如下特性,也是能够知足分布式锁服务器
InterProcessMutex
的acquire
和release
方法,在zookeeper之master选举提过。
DistributeLockdom
public class DistributeLock { static CuratorFramework client = CuratorConnect.getCuratorClient2(); private static final String path = "distributeLock"; public static void distributeLock() { InterProcessMutex mutex = new InterProcessMutex(client, path); CountDownLatch countDownLatch = new CountDownLatch(50); for (int i = 0; i < 50; i++) { new Thread(new Runnable() { public void run() { try { countDownLatch.countDown(); countDownLatch.await(); mutex.acquire(); System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "获取到锁"); TimeUnit.SECONDS.sleep(1); mutex.release(); } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } public static void main(String[] args) { DistributeLock.distributeLock(); } }
运行结果以下(部分):
能够看出,都是间隔一秒。源码已经解析过了,这边不在讲解。分布式
ReadWriteLockide
public class ReadWriteLock { static CuratorFramework client \= CuratorConnect.getCuratorClient2(); private static final String path \= "/readWriteLock"; public static void readWriteLock() { InterProcessReadWriteLock mutex = new InterProcessReadWriteLock(client, path); CountDownLatch countDownLatch = new CountDownLatch(50); for (int i = 0; i < 50; i++) { new Thread(new Runnable() { @Override public void run() { try { countDownLatch.countDown(); countDownLatch.await(); int i = new Random().nextInt(); if (i % 9 \== 0) { mutex.writeLock().acquire(); System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "获取到写锁"); TimeUnit.SECONDS.sleep(1); mutex.writeLock().release(); }else{ mutex.readLock().acquire(); System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "获取到读锁"); TimeUnit.SECONDS.sleep(1); mutex.readLock().release(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }).start(); } } public static void main(String\[\] args) { ReadWriteLock.readWriteLock(); } }
运行结果以下(部分):
能够看出,读的时候,没有间隔,第四行写的时候,间隔了一秒。源码分析
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) { lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length); writeMutex = new InternalInterProcessMutex ( client, basePath, WRITE_LOCK_NAME, lockData, 1, new SortingLockInternalsDriver() { @Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { return super.getsTheLock(client, children, sequenceNodeName, maxLeases); // 写锁,跟排他锁是同样的 } } ); readMutex = new InternalInterProcessMutex ( client, basePath, READ_LOCK_NAME, lockData, Integer.MAX_VALUE, new SortingLockInternalsDriver() { @Override public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception { return readLockPredicate(children, sequenceNodeName); } } ); }
readLockPredicateui
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception { // 是当前写锁,返回 if ( writeMutex.isOwnedByCurrentThread() ) { return new PredicateResults(null, true); } int index = 0; int firstWriteIndex = Integer.MAX_VALUE; int ourIndex = -1; // 遍历全部子节点 for ( String node : children ) { // 若是节点有包含__WRIT__,获取最小写的节点位置 if ( node.contains(WRITE_LOCK_NAME) ) { firstWriteIndex = Math.min(index, firstWriteIndex); } // 获取当前节点的位置 else if ( node.startsWith(sequenceNodeName) ) { ourIndex = index; break; } ++index; } StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex); // 当前的节点位置,若是在最小的写节点以前,就获取到写,若是没有,就监听这个写节点 boolean getsTheLock = (ourIndex < firstWriteIndex); String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex); return new PredicateResults(pathToWatch, getsTheLock); }
若是是读请求,就会判断前面的节点是否有写请求,若是没有,就获取到锁,若是有,就监听写节点。
若是是写请求,就走排他锁的流程。spa