在咱们平常开发中,不免会遇到要加锁的情景。例如扣除产品库存,首先要从数据库中取出库存,进行库存判断,再减去库存。这一波操做明显不符合原子性,若是代码块不加锁,很容易由于并发致使超卖问题。我们的系统若是是单体架构,那咱们使用本地锁就能够解决问题。若是是分布式架构,就须要使用分布式锁。node
SETNX key value EXPIRE key seconds DEL key if (setnx("item_1_lock", 1)) { expire("item_1_lock", 30); try { ... 逻辑 } catch { ... } finally { del("item_1_lock"); } }
这种方法看起来能够解决问题,可是有必定的风险,由于 SETNX 和 EXPIRE 这波操做是非原子性的,若是 SETNX 成功以后,出现错误,致使 EXPIRE 没有执行,致使锁没有设置超时时间造成死锁。python
针对这种状况,咱们可使用 lua 脚原本保持操做原子性,保证 SETNX 和 EXPIRE 两个操做要么都成功,要么都不成功。linux
if (redis.call('setnx', KEYS[1], ARGV[1]) < 1) then return 0; end; redis.call('expire', KEYS[1], tonumber(ARGV[2])); return 1;
经过这样的方法,咱们初步解决了竞争锁的原子性问题,虽然其余功能还未实现,可是应该不会形成死锁。redis
SET key value NX EX 30 DEL key if (set("item_1_lock", 1, "NX", "EX", 30)) { try { ... 逻辑 } catch { ... } finally { del("item_1_lock"); } }
改进后的方法不须要借助 lua 脚本就解决了 SETNX 和 EXPIRE 的原子性问题。如今咱们再仔细琢磨琢磨,若是 A 拿到了锁顺利进入代码块执行逻辑,可是因为各类缘由致使超时自动释放锁。数据库
在这以后 B 成功拿到了锁进入代码块执行逻辑,但此时若是 A 执行逻辑完毕再来释放锁,就会把 B 刚得到的锁释放了。就比如用本身家的钥匙开了别家的门,这是不可接受的。架构
为了解决这个问题咱们能够尝试在 SET 的时候设置一个锁标识,而后在 DEL 的时候验证当前锁是否为本身的锁。并发
String value = UUID.randomUUID().toString().replaceAll("-", ""); if (set("item_1_lock", value, "NX", "EX", 30)) { try { ... 逻辑 } catch { ... } finally { ... lua 脚本保证原子性 } } if (redis.call('get', KEYS[1]) == ARGV[1]) then return redis.call('del', KEYS[1]) else return 0 end
到这里,咱们终于解决了竞争锁的原子性问题和误删锁问题。可是锁通常还须要支持可重入、循环等待和超时自动续约等功能点。下面咱们学习使用一个很是好用的包来解决这些问题。dom
Redission 的锁,实现了可重入和超时自动续约功能,它都帮咱们封装好了,咱们只要按照本身的需求调用它的 API 就能够轻松实现上面所提到的几个功能点。详细功能能够查看 Redisson 文档分布式
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.2</version> </dependency>
implementation 'org.redisson:redisson:3.13.2'
用 Maven 或者 Gradle 构建,目前最新版本为 3.13.2,也能够在这里 Redisson 找到你须要的版本。学习
RedissonClient redissonClient = Redisson.create(); RLock lock = redissonClient.getLock("lock"); boolean res = lock.lock(); if (res) { try { ... 逻辑 } finally { lock.unlock(); } }
Redisson 将底层逻辑所有作了一个封装 📦,咱们无需关心具体实现,几行代码就能使用一把完美的锁。下面咱们简单折腾折腾源码 。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); if (ttl == null) { break; } if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); } }
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; } <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; } protected RFuture<Boolean> unlockInnerAsync(long threadId) { return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;", Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
使用 Redis 作分布式锁来解决并发问题仍存在一些困难,也有不少须要注意的点,咱们应该正确评估系统的体量,不能为了使用某项技术而用。要彻底解决并发问题,仍须要在数据库层面作功夫。
福利:豆花同窗为你们精心整理了一份关于linux和python的学习资料大合集!有须要的小伙伴们,关注豆花我的公众号:python头条!回复关键词“资料合集”便可免费领取!