分布式部署的应用集群中保证数据更新的互斥性,且程序出现异常时,锁可以自动释放,避免死锁发生。java
为了保证分布式部署的应用集群中同一时间只有一个客户端对共享资源进行操做。根据锁的用途再细分:node
这也是分布式锁的关键技术。git
问题:假设设置失效时间10秒,若是因为某些缘由致使10秒还没执行完任务,这时候锁自动失效,致使其余线程也会拿到分布式锁,怎么处理? 答:Redisson内部提供了一个监控锁的看门狗,它的做用是在Redisson实例被关闭前,不断的延长锁的有效期。github
具体使用能够参考Redisson官方文档 这里贴上我简单使用的例子:redis
我使用的是springboot,因此直接用了redisson提供的集成包。算法
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.11.4</version> </dependency>
我用的redis是官方cluster的3主3从。spring
# common spring boot settings #spring.redis.database= #spring.redis.host= #spring.redis.port= spring.redis.password=XXXXXXXXXX #spring.redis.ssl= #spring.redis.timeout= spring.redis.cluster.nodes=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx #spring.redis.sentinel.master= #spring.redis.sentinel.nodes= # Redisson settings #path to redisson.yaml or redisson.json #spring.redis.redisson.config=classpath:redisson.yaml
public void contextLoads() { //简单使用测试 // RBucket<String> bucket = redisson.getBucket("bucket"); // bucket.set("test"); // String obj = bucket.get(); // System.out.println(obj); // 得到锁对象实例 RLock lock = redisson.getLock("lock"); // 获取分布式锁,采用默认超时时间30秒 // 若是负责储存这个分布式锁的Redisson节点宕机之后, // 并且这个锁正好处于锁住的状态时, // 这个锁会出现锁死的状态。 // 为了不这种状况的发生,Redisson内部提供了一个监控锁的看门狗, // 它的做用是在Redisson实例被关闭前,不断的延长锁的有效期。 // 默认状况下,看门狗的检查锁的超时时间是30秒钟, // 也能够经过修改Config.lockWatchdogTimeout来另行指定 lock.lock(); try { Thread.sleep(80000); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 释放锁 lock.unlock(); } // 加锁之后10秒钟自动解锁 // 无需调用unlock方法手动解锁 // 这种指定了超时时间的锁不会走看门狗逻辑, // 即会发生任务没有执行完成时,锁超时了,其余进程会获取到这个分布式锁。 // 尽可能使用第一种方式,走看门狗逻辑。 // lock.lock(40, TimeUnit.SECONDS); // try { // Thread.sleep(80000); // } catch (InterruptedException e) { // e.printStackTrace(); // } }
这里再看一看具体获取锁和释放锁的核心逻辑:json
首先,调用了RedissonLock中的Lock方法:安全
@Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } }
注意这里第一个入参为-1。 进入lock方法:springboot
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); //获取锁逻辑 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { //获取成功,返回 return; } //订阅锁 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { //持续获取锁 while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { getEntry(threadId).getLatch().acquire(); } else { getEntry(threadId).getLatch().acquireUninterruptibly(); } } } } finally { //最终取消订阅获取锁 unsubscribe(future, threadId); } }
再看tryAcquire方法:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //获取锁 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); //看门狗逻辑 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
由于leaseTime为-1,因此首先异步的获取锁,以后会走看门狗逻辑。 先看获取锁的操做:tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " "redis.call('hset', KEYS[1], ARGV[2], 1); " "redis.call('pexpire', KEYS[1], ARGV[1]); " "return nil; " "end; " "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " "redis.call('hincrby', KEYS[1], ARGV[2], 1); " "redis.call('pexpire', KEYS[1], ARGV[1]); " "return nil; " "end; " "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
Redisson 使用 EVAL 命令执行上面的 Lua 脚原本完成获取锁的操做:
@Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } @Override public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { if (e != null) { cancelExpirationRenewal(threadId); result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " id " thread-id: " threadId); result.tryFailure(cause); return; } cancelExpirationRenewal(threadId); result.trySuccess(null); }); return result; }
上面opStatus为null时,会抛出异常,必须由加锁的线程释放锁。 再来看核心方法:unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " "return nil;" "end; " "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " "if (counter > 0) then " "redis.call('pexpire', KEYS[1], ARGV[2]); " "return 0; " "else " "redis.call('del', KEYS[1]); " "redis.call('publish', KEYS[2], ARGV[1]); " "return 1; " "end; " "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
依然使用 EVAL 命令执行 Lua 脚原本释放锁:
上面的代码解析文本源自:Redisson 分布式锁实现分析(一)
Redis做者antirez基于分布式环境下提出了一种更高级的分布式锁的实现方式:Redlock。 Redisson中也实现了这种算法,具体能够参考看8.4章节 这里简单描述一下这种算法: 假设有5个互不链接的Redis集群
贴一段Redisson的小例子:
RLock lock1 = redissonInstance1.getLock("lock1"); RLock lock2 = redissonInstance2.getLock("lock2"); RLock lock3 = redissonInstance3.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 同时加锁:lock1 lock2 lock3 // 红锁在大部分节点上加锁成功就算成功。 lock.lock(); ... lock.unlock();
RedissonRedLock继承自RedissonMultiLock,具体源码就再也不继续分析了。
Zookeeper是一个一致性的文件系统,保证了其每一个节点的惟一性。 有4种节点类型:
顺序号是单调递增的计数器,由父节点维护。
分布式锁就是利用了Zookeeper的临时顺序标号目录节点的原理来实现。Locks主节点下面的ID最小的节点得到锁的权限,其余客户端来获取锁时,发现本身不是最靠前的,会监视他的前一个锁节点,当锁释放时,相应的节点被删除,会通知这个等待的客户端,让其获取锁的权利,至关于造成了一个等待队列。
Zookeeper分布式锁的优势就是有现成的框架能够拿来就用,由于有等待队列,枪锁的效率也会高。缺点是由于Zookeeper是相似文件系统的数据结构,因此删除和新增节点的效率会比较低。
《Redis官方文档》用Redis构建分布式锁 8. 分布式锁和同步器 Redisson 分布式锁实现分析(一) Redis 分布式锁的前世此生 Redlock:Redis分布式锁最牛逼的实现