05.Curator分布式锁

    锁: 分布式的锁全局同步,这意味着任何一个时间点不会有两个客户端都拥有相同的锁。

1.可重入锁Shared Reentrant Lock

    首先咱们先看一个全局可重入的锁( 能够屡次获取,不会被阻塞 )。Shared意味着锁是全局可见的,客户端均可以请求锁。Reentrant和JDK的ReentrantLock相似,意味着同一个客户端在拥有锁的同时,能够屡次获取,不会被阻塞。
1.可重入锁相关类介绍
     它是由类InterProcessMutex来实现。 它的主要方法:
     
     
     
     
// 构造方法public InterProcessMutex(CuratorFramework client, String path)public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)// 经过acquire得到锁,并提供超时机制:public void acquire() throws Exceptionpublic boolean acquire(long time, TimeUnit unit) throws Exception// 撤销锁public void makeRevocable(RevocationListener<InterProcessMutex> listener)public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
错误处理: 仍是强烈推荐你使用ConnectionStateListener处理链接状态的改变。当链接LOST时你再也不拥有锁。
2.编写示例程序
    首先让咱们建立一个模拟的共享资源, 这个资源指望只能单线程的访问,不然会有并发问题。
    
    
    
    
public class FakeLimitedResource{ private final AtomicBoolean inUse = new AtomicBoolean(false); // 模拟只能单线程操做的资源 public void use() throws InterruptedException { if (!inUse.compareAndSet(false, true)) { // 在正确使用锁的状况下,此异常不可能抛出 throw new IllegalStateException("Needs to be used by one client at a time"); } try { Thread.sleep((long) (3 * Math.random())); } finally { inUse.set(false); } }}
    而后建立一个ExampleClientThatLocks类,它负责请求锁,使用资源,释放锁这样一个完整的访问过程。
    
    
    
    
public class ExampleClientThatLocks{ private final InterProcessMutex lock; private final FakeLimitedResource resource; private final String clientName; public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; lock = new InterProcessMutex(client, lockPath); } public void doWork(long time, TimeUnit unit) throws Exception { if (!lock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能获得互斥锁"); } try { System.out.println(clientName + " 已获取到互斥锁"); resource.use(); // 使用资源 Thread.sleep(1000 * 1); } finally { System.out.println(clientName + " 释放互斥锁"); lock.release(); // 老是在finally中释放 } }}
    最后建立主程序来测试:
    
    
    
    
public class InterProcessMutexExample{ private static final int QTY = 5; private static final int REPETITIONS = QTY * 10; private static final String PATH = "/examples/locks"; public static void main(String[] args) throws Exception { final FakeLimitedResource resource = new FakeLimitedResource(); final List<CuratorFramework> clientList = new ArrayList<CuratorFramework>(); for (int i = 0; i < QTY; i++) { CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); clientList.add(client); } System.out.println("链接初始化完成!"); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { try { final ExampleClientThatLocks example = new ExampleClientThatLocks(clientList.get(index), PATH, resource, "Client " + index); for (int j = 0; j < REPETITIONS; ++j) { example.doWork(10, TimeUnit.SECONDS); } } catch (Throwable e) { e.printStackTrace(); } finally { CloseableUtils.closeQuietly(clientList.get(index)); } return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); System.out.println("OK!"); }}
代码也很简单,生成5个client,每一个client重复执行10次 请求锁--访问资源--释放锁的过程。每一个client都在独立的线程中。
结果能够看到,锁是随机的被每一个实例排他性的使用。
既然是可重入锁,你能够在一个线程中屡次调用acquire,在线程拥有锁时它老是返回true。
注意:你不该该在多个线程中用同一个InterProcessMutex, 你能够在每一个线程中都生成一个InterProcessMutex实例,它们的path都同样,这样它们能够共享同一个锁。
3.示例运行结果
    运行结果控制台以下:
    
    
    
    
链接初始化完成!Client 4 已获取到互斥锁Client 4 释放互斥锁Client 3 已获取到互斥锁Client 3 释放互斥锁......Client 2 已获取到互斥锁Client 2 释放互斥锁OK!
    运行时查看Zookeeper节点信息以下:

2.不可重入锁Shared Lock

    这个锁和上面的相比,就是少了Reentrant的功能,也就意味着它不能在同一个线程中重入。 这个类是InterProcessSemaphoreMutex 使用方法和上面的类相似
     首先咱们将上面的例子修改一下,测试一下它的重入。 修改ExampleClientThatLocks.doWork,连续两次acquire:
    
    
    
    
public void doWork(long time, TimeUnit unit) throws Exception{ if (!lock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能获得互斥锁"); } System.out.println(clientName + " 已获取到互斥锁"); if (!lock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能获得互斥锁"); } System.out.println(clientName + " 再次获取到互斥锁"); try { resource.use(); // 使用资源 Thread.sleep(1000 * 1); } finally { System.out.println(clientName + " 释放互斥锁"); lock.release(); // 老是在finally中释放 lock.release(); // 获取锁几回 释放锁也要几回 }}
注意:咱们也须要调用release两次。这和JDK的ReentrantLock用法一致。若是少调用一次release,则此线程依然拥有锁。
上面的代码没有问题,咱们能够屡次调用acquire,后续的acquire也不会阻塞。
可是将上面的InterProcessMutex换成不可重入锁InterProcessSemaphoreMutex,若是再运行上面的代码,结果就会发现线程被阻塞在第二个acquire上,直到超时。也就是此锁不是可重入的。

3.可重入读写锁Shared Reentrant Read Write Lock

    相似JDK的ReentrantReadWriteLock。 一个读写锁管理一对相关的锁。一个负责读操做,另一个负责写操做。读操做在写锁没被使用时可同时由多个进程使用,而写锁在使用时不容许读(阻塞)。
    此锁是可重入的。一个拥有写锁的线程可重入读锁,可是读锁却不能进入写锁。 这也意味着写锁能够降级成读锁, 好比请求写锁 --->读锁 ---->释放写锁。从读锁升级成写锁是不行的。
1.可重入读写锁相关类介绍
    可重入读写锁 主要由两个类实现: InterProcessReadWriteLock、InterProcessMutex 使用时首先建立一个InterProcessReadWriteLock实例,而后再根据你的需求获得读锁或者写锁,读写锁的类型是InterProcessMutex

2.编写示例程序
    示例程序仍使用上面的FakeLimitedResource、InterProcessMutexExample类
    
    
    
    
public class ExampleClientReadWriteLocks{ private final InterProcessReadWriteLock lock; private final InterProcessMutex readLock; private final InterProcessMutex writeLock; private final FakeLimitedResource resource; private final String clientName; public ExampleClientReadWriteLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; lock = new InterProcessReadWriteLock(client, lockPath); readLock = lock.readLock(); writeLock = lock.writeLock(); } public void doWork(long time, TimeUnit unit) throws Exception { // 注意只能先获得写锁再获得读锁,不能反过来!!! if (!writeLock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能获得写锁"); } System.out.println(clientName + " 已获得写锁"); if (!readLock.acquire(time, unit)) { throw new IllegalStateException(clientName + " 不能获得读锁"); } System.out.println(clientName + " 已获得读锁"); try { resource.use(); // 使用资源 Thread.sleep(1000 * 1); } finally { System.out.println(clientName + " 释放读写锁"); readLock.release(); writeLock.release(); } }}
    在这个类中咱们首先请求了一个写锁,而后降级成读锁。执行业务处理,而后释放读写锁。修改 InterProcessMutexExample类中的 ExampleClientThatLocks ExampleClientReadWriteLocks 而后运行示例。
3. 示例运行结果
    运行结果控制台:
    
    
    
    
链接初始化完成!Client 1 已获得写锁Client 1 已获得读锁Client 1 释放读写锁......Client 3 已获得写锁Client 3 已获得读锁Client 3 释放读写锁OK!
    此时查看Zookeeper数据节点以下:

4.信号量Shared Semaphore

    一个计数的信号量相似JDK的Semaphore。JDK中Semaphore维护的一组许可(permits),而Cubator中称之为 租约(Lease)
    有两种方式能够决定semaphore的最大租约数。第一种方式是有用户给定的path决定。第二种方式使用SharedCountReader类。
    若是不使用SharedCountReader,没有内部代码检查进程是否假定有10个租约而进程B假定有20个租约。 因此全部的实例必须使用相同的numberOfLeases值.
1.信号量实现类说明
主要类有:
  • InterProcessSemaphoreV2 - 信号量实现类
  • Lease - 租约(单个信号)
  • SharedCountReader - 计数器,用于计算最大租约数量
    此次调用acquire会返回一个租约对象。客户端必须在finally中close这些租约对象,不然这些租约会丢失掉。可是,若是客户端session因为某种缘由好比crash丢掉,那么这些客户端持有的租约会自动close,这样其它客户端能够继续使用这些租约。
租约还能够经过下面的方式返还:
    
    
    
    
public void returnLease(Lease lease)public void returnAll(Collection<Lease> leases)
    注意一次你能够请求多个租约,若是Semaphore当前的租约不够,则请求线程会被阻塞。同时还提供了超时的重载方法。
    
    
    
    
public Lease acquire() throws Exceptionpublic Collection<Lease> acquire(int qty) throws Exceptionpublic Lease acquire(long time, TimeUnit unit) throws Exceptionpublic Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
2.编写示例程序
    
    
    
    
public class InterProcessSemaphoreExample{ private static final int MAX_LEASE = 10; private static final String PATH = "/examples/locks"; public static void main(String[] args) throws Exception { FakeLimitedResource resource = new FakeLimitedResource(); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE); Collection<Lease> leases = semaphore.acquire(5); System.out.println("获取租约数量:" + leases.size()); Lease lease = semaphore.acquire(); System.out.println("获取单个租约"); resource.use(); Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS); System.out.println("获取租约,若是为空则超时: " + leases2); System.out.println("释放租约"); semaphore.returnLease(lease); System.out.println("释放集合中的全部租约"); semaphore.returnAll(leases); client.close(); System.out.println("OK!"); }}
首先咱们先得到了5个租约, 接着请求了一个租约,由于semaphore还有5个租约,因此请求能够知足,返回一个租约,还剩4个租约。
而后再请求一个租约,由于租约不够,阻塞到超时,仍是没能知足,返回结果为null。
3.示例运行结果
    运行结果控制台以下:
      
      
      
      
获取租约数量:5获取单个租约获取租约,若是为空则超时: null释放租约释放集合中的全部租约OK!
     此时查看Zookeeper数据节点以下:

注意: 上面所讲的4种锁都是公平锁(fair)。从ZooKeeper的角度看,每一个客户端都按照请求的顺序得到锁。至关公平。

5.多锁对象 Multi Shared Lock

    Multi Shared Lock是一个锁的容器。当调用acquire,全部的锁都会被acquire,若是请求失败,全部的锁都会被release。一样调用release时全部的锁都被release(失败被忽略)。 基本上,它就是组锁的表明,在它上面的请求释放操做都会传递给它包含的全部的锁。
1.主要类说明
主要涉及两个类:
  • InterProcessMultiLock - 对所对象实现类
  • InterProcessLock - 分布式锁接口类
它的构造函数须要包含的锁的集合,或者一组ZooKeeper的path。用法和Shared Lock相同。
   
   
   
   
public InterProcessMultiLock(CuratorFramework client, List<String> paths)public InterProcessMultiLock(List<InterProcessLock> locks)
2.编写示例程序
    
    
    
    
public class InterProcessMultiLockExample{ private static final String PATH1 = "/examples/locks1"; private static final String PATH2 = "/examples/locks2"; public static void main(String[] args) throws Exception { FakeLimitedResource resource = new FakeLimitedResource(); CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); InterProcessLock lock1 = new InterProcessMutex(client, PATH1); // 可重入锁 InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2); // 不可重入锁 InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2)); if (!lock.acquire(10, TimeUnit.SECONDS)) { throw new IllegalStateException("不能获取多锁"); } System.out.println("已获取多锁"); System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess()); System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess()); try { resource.use(); // 资源操做 } finally { System.out.println("释放多个锁"); lock.release(); // 释放多锁 } System.out.println("是否有第一个锁: " + lock1.isAcquiredInThisProcess()); System.out.println("是否有第二个锁: " + lock2.isAcquiredInThisProcess()); client.close(); System.out.println("OK!"); }}
新建一个InterProcessMultiLock,包含一个重入锁和一个非重入锁。 调用acquire后能够看到线程同时拥有了这两个锁。 调用release看到这两个锁都被释放了。
注意: 再重申一遍,强烈推荐使用ConnectionStateListener监控链接的状态。
3.示例运行结果
    运行结果控制台以下:
   
   
   
   
已获取多锁是否有第一个锁: true是否有第二个锁: true释放多个锁是否有第一个锁: false是否有第二个锁: falseOK!
     此时查看Zookeeper数据节点以下:

-------------------------------------------------------------------------------------------------------------------------------

相关文章
相关标签/搜索