zookeeper之分布式锁

分布式锁

在分布式系统中,对一个共享资源的互斥访问操做,就能够用分布式锁来解决,相似于单机的Lock,分为排他锁和读写锁。node

排他锁

特色:segmentfault

  • 只能一个服务器对共享资源操做
  • 拥有操做资源的服务器释放资源,其余服务器能够知道,并去抢资源

是否是和以前的zookeeper之master选举很像。
因此zookeeper的如下特性,也是能够知足分布式锁服务器

  • 强一致性:知足只能一个服务器抢到共享资源
  • 临时节点:释放资源,临时节点随着会话的关闭而消失
  • Watcher:监听节点的变化,资源被释放,就会被监听到

InterProcessMutexacquirerelease方法,在zookeeper之master选举提过。
DistributeLockdom

public class DistributeLock { 
    static CuratorFramework client = CuratorConnect.getCuratorClient2();  
    private static final String path = "distributeLock";  
  
    public static void distributeLock() {  
        InterProcessMutex mutex = new InterProcessMutex(client, path);  
        CountDownLatch countDownLatch = new CountDownLatch(50);  
        for (int i = 0; i < 50; i++) {  
            new Thread(new Runnable() {
                public void run() {
                    try {
                        countDownLatch.countDown();  
                        countDownLatch.await();  
                        mutex.acquire();  
                        System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "获取到锁");  
                        TimeUnit.SECONDS.sleep(1);  
                        mutex.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }  
  
    public static void main(String[] args) {  
        DistributeLock.distributeLock();  
    }  
}

运行结果以下(部分):
image.png
能够看出,都是间隔一秒。源码已经解析过了,这边不在讲解。分布式

读写锁

ReadWriteLockide

public class ReadWriteLock {  
    static CuratorFramework client \= CuratorConnect.getCuratorClient2();  
    private static final String path \= "/readWriteLock";  
  
    public static void readWriteLock() {  
        InterProcessReadWriteLock mutex = new InterProcessReadWriteLock(client, path);  
        CountDownLatch countDownLatch = new CountDownLatch(50);  
        for (int i = 0; i < 50; i++) {  
            new Thread(new Runnable() {  
 @Override  public void run() {  
  try {  
 countDownLatch.countDown(); countDownLatch.await();  int i = new Random().nextInt();  
  if (i % 9 \== 0) {  
 mutex.writeLock().acquire(); System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "获取到写锁");  
 TimeUnit.SECONDS.sleep(1);  
 mutex.writeLock().release(); }else{  
 mutex.readLock().acquire(); System.out.println(new SimpleDateFormat("HH:mm:ss SSS").format(new Date()) + "获取到读锁");  
 TimeUnit.SECONDS.sleep(1);  
 mutex.readLock().release(); } } catch (InterruptedException e) {  
 e.printStackTrace(); } catch (Exception e) {  
 e.printStackTrace(); } } }).start();  
        }  
    }  
  
    public static void main(String\[\] args) {  
        ReadWriteLock.readWriteLock();  
    }  
}

运行结果以下(部分):
image.png
能够看出,读的时候,没有间隔,第四行写的时候,间隔了一秒。源码分析

源码分析

public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)  
{  
    lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);  
   
    writeMutex = new InternalInterProcessMutex  
    (  
        client,  
        basePath,  
        WRITE_LOCK_NAME,  
        lockData,  
        1,  
        new SortingLockInternalsDriver()  
        {  
            @Override  
  public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception  
            {  
                return super.getsTheLock(client, children, sequenceNodeName, maxLeases); // 写锁,跟排他锁是同样的 
            }  
        }  
    );  
  
    readMutex = new InternalInterProcessMutex  
    (  
        client,  
        basePath,  
        READ_LOCK_NAME,  
        lockData,  
        Integer.MAX_VALUE,  
        new SortingLockInternalsDriver()  
        {  
            @Override  
  public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception  
            {  
                return readLockPredicate(children, sequenceNodeName);  
            }  
        }  
    );  
}

readLockPredicateui

private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception  
{  
    // 是当前写锁,返回
    if ( writeMutex.isOwnedByCurrentThread() )  
    {  
        return new PredicateResults(null, true);  
    }  
  
    int index = 0;  
    int firstWriteIndex = Integer.MAX_VALUE;  
    int ourIndex = -1;  
    // 遍历全部子节点
    for ( String node : children )  
    {  
        // 若是节点有包含__WRIT__,获取最小写的节点位置
        if ( node.contains(WRITE_LOCK_NAME) )  
        {  
            firstWriteIndex = Math.min(index, firstWriteIndex);  
        }  
        // 获取当前节点的位置
        else if ( node.startsWith(sequenceNodeName) )  
        {  
            ourIndex = index;  
            break;  
        }  
  
        ++index;  
    }  
  
    StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);  
    // 当前的节点位置,若是在最小的写节点以前,就获取到写,若是没有,就监听这个写节点
    boolean getsTheLock = (ourIndex < firstWriteIndex);  
    String      pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);  
    return new PredicateResults(pathToWatch, getsTheLock);  
}

若是是读请求,就会判断前面的节点是否有写请求,若是没有,就获取到锁,若是有,就监听写节点。
若是是写请求,就走排他锁的流程。spa

相关文章
相关标签/搜索