聊聊分布式锁的实现(一)

在并发编程中最经常使用的手段就是锁了,好比基于JVM底层的synchronized、基于AQS的ReentrantLock;但是这些锁都只是局限于单机,本篇给你们介绍常见的分布式锁。redis

为何须要分布式锁

假如你的某个业务中存在某个需求:若是查询不存在则生成一条记录插入。那么你可能会这样写代码:select --> if not exist --> insert; 这个时候你可能考虑到若是两条线程同时select都拿不到结果会致使插入两条记录,这个时候你可能会在这个操做上加锁保证线程安全,固然了具体的不一样业务处理方式也有多种。若是是在分布式集群环境中那么该如何保证这个线程安全呢,这个时候你可使用分布式锁来解决这个问题。

基于redis实现的分布式锁

1、setnx key value

基于redis实现的分布式锁咱们可使用setnx命令,这个命令的做用是若是指定的key不存在时则set一对k-v,这样同一时刻就只能有一个请求能够set这个key达到加锁的目的,可是这个命令不能同时设置过时时间,这样可能会致使死锁。如图一,请求A和请求B在T1时刻同时发起setnx命令,请求A成功了,而后在T2设置key的过时时间,若是在这以前请求A所在的服务忽然挂了,那这个key就一直存在,这个时候其余请求就没法加锁。算法

图一:setnx实现锁

2、set key value [expiration EX seconds|PX milliseconds] [NX|XX]

使用这个命令能够在设置一个key的同时设置key的过时时间,NX是当key不存在时进行操做,XX是当key存在时进行操做。业务执行完成以后,这个时候须要手动释放这个锁;那么如何保证释放锁的安全性呢?首先要确保释放的锁是本身的,咱们能够利用key对应的value来判断当前这个key是否是本身设置的,这样就能保证释放的锁是本身的;编程

private Boolean lock(String key, String value) {
    return stringRedisTemplate.opsForValue().setIfAbsent(key, value, 10L, TimeUnit.SECONDS);
}
    
private Boolean unLock(String key, String value) {
    String cacheValue = stringRedisTemplate.opsForValue().get(key);
    if (!value.equals(cacheValue)) {
        return false;
    }
    return stringRedisTemplate.delete(key);
}
复制代码

那上面这段代码就能保证释放锁的安全性吗?这个方法存在的问题在于在判断了key对应的value与本身的value相等以后,若是这个时候key不争气的恰好到期失效了,其余线程获取了这个锁,那么下面的delete key操做就将其余线程的锁释放掉了。怎么就那么多幺蛾子…… 那么如何保证释放锁的原子性呢?安全

3、Lua脚本保证释放锁的原子性

Lua脚本我不过多的介绍,有兴趣的同窗能够去了解,直接上代码bash

private Boolean luaUnLock(String key, String value) {
    ScriptSource lua = new ResourceScriptSource(new ClassPathResource("redisUnLock.lua"));
    DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
    redisScript.setScriptSource(lua);
    redisScript.setResultType(Boolean.class);
    return stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
}
复制代码

redisUnLock.lua数据结构

if redis.call('get', KEYS[1]) == ARGV[1] then
    return redis.call('del', KEYS[1])
else
    return 0
end
复制代码

这段脚本比较简单,就是比较参数key对应的value是否与参数value相等,相等则删除这个key,在redis中lua脚本可以保证原子性。那么问题叒来了!这样就保证了这个分布式锁的安全性吗?如今这个分布式锁的问题在于存在业务时间过长致使锁过时被其余线程获取的状况,此时须要检测续租锁来避免这个问题。并发

图二:锁过时致使业务错误

4、redisson的watch dog实现续租锁

4.一、Demo演示

那么如何续租呢,主要思路就是用一个线程检测当前这个业务是否执行完,锁还有多久过时;若是锁即将失效时业务尚未执行完那么就给这个锁从新设置过时时间。这里咱们使用redisson的实现,毕竟本身实现的轮子没那么靠谱😅。异步

public class RedissonLockerImpl implements RedissonLocker {
    @Resource
    private RedissonClient redissonClient;
    
    @Override
    public void lock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.lock();
    }
    
    @Override
    public void unlock(String lockKey) {
        RLock lock = redissonClient.getLock(lockKey);
        lock.unlock();
    }
}

public void test() {
    CountDownLatch count = new CountDownLatch(2);
    String lockKey = "LOCK_KEY";
    CustomizeThreadPool.threadPool.execute(() -> {
        try {
            count.await();
            redissonLocker.lock(lockKey);
            log.info("线程1获取锁");
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            redissonLocker.unlock(lockKey);
            log.info("线程1释放锁");
        }
    });
    count.countDown();
    CustomizeThreadPool.threadPool.execute(() -> {
        try {
            count.await();
            redissonLocker.lock(lockKey);
            log.info("线程2获取锁");
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            redissonLocker.unlock(lockKey);
            log.info("线程2释放锁");
        }
    });
    count.countDown();
}
复制代码

public void test2() {
    String lockKey = "LOCK_KEY";
    CustomizeThreadPool.threadPool.execute(() -> {
        redissonLocker.lock(lockKey);
        redissonLocker.lock(lockKey);
        try {
            TimeUnit.SECONDS.sleep(25);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            redissonLocker.unlock(lockKey);
        }
    });
}
复制代码
Java线程状态间的转换

怎么样,这个API是否是至关简洁呢,可能有同窗看到以前的代码会问你这里加锁的时候为何没有给锁设置过时时间呢?你加锁key对应的value呢?经过这两个例子咱们能够看到加锁key对应的value是一个hash结构,第一个属性对应的就是咱们所说的value,用来判断是不是本身加的锁;第二个属性对应的实际上是加锁的次数,这和Java中的ReentrantLock同样是可重入锁,因此第二个例子里只作了一次unLock没办法释放锁。至于这个key对应的value和锁过时时间在下面的源码分析介绍。分布式

4.二、源码分析
package org.redisson.config;
import ..........
public class Config {
    // 其余源码省略
    private long lockWatchdogTimeout;
    public Config() {
        // 默认的锁过时时间 
        this.lockWatchdogTimeout = 30000L;
    }
    public Config(Config oldConf) {
        this.lockWatchdogTimeout = 30000L;
        // 若是有读取配置文件修改的锁过时时间
        this.setLockWatchdogTimeout(oldConf.getLockWatchdogTimeout());
    }
}
// 不带过时时间加锁
public void lock() {
    try {
        this.lockInterruptibly();
    } catch (InterruptedException var2) {
        Thread.currentThread().interrupt();
    }
}

public void lockInterruptibly() throws InterruptedException {
    this.lockInterruptibly(-1L, (TimeUnit)null);
}

public void lock(long leaseTime, TimeUnit unit) {
    try {
        this.lockInterruptibly(leaseTime, unit);
    } catch (InterruptedException var5) {
        Thread.currentThread().interrupt();
    }
}

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    // 获取当前线程的ID
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁返回过时时间
    Long ttl = this.tryAcquire(leaseTime, unit, threadId);
    // 若是加锁失败
    if (ttl != null) {
        // 订阅解锁队列
        RFuture<RedissonLockEntry> future = this.subscribe(threadId);
        this.commandExecutor.syncSubscription(future);
        try {
            while(true) {
                // 尝试加锁
                ttl = this.tryAcquire(leaseTime, unit, threadId);
                // 加锁成功则返回
                if (ttl == null) {
                    return;
                }
                if (ttl >= 0L) {
                    // 加锁失败阻塞
                    this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    this.getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            // 取消订阅解锁消息
            this.unsubscribe(future, threadId);
        }
    }
}

//尝试加锁
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}


private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    // 设置了默认的过时时间
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 若是调用的是没有过时时间的lock,则默认时间为lockWatchdogTimeout
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            public void operationComplete(Future<Long> future) throws Exception {   
                if (future.isSuccess()) {
                    Long ttlRemaining = (Long)future.getNow();
                    // 加锁成功以后开始一个调度任务
                    if (ttlRemaining == null) {
                        RedissonLock.this.scheduleExpirationRenewal(threadId);
                    }
                }
            }
        });
        return ttlRemainingFuture;
    }
}

// 调用lua脚本异步加锁,value由getLockName()生成,uuid+threadId
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    // 若是当前不存在key则加锁,若是当前存在而且是本身的则加锁次数加一
    return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}

// 锁续租定时任务
private void scheduleExpirationRenewal(final long threadId) {
    if (!expirationRenewalMap.containsKey(this.getEntryName())) {
        // 添加一个回调任务,每1/3锁过时时间执行一次
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                // 异步重置锁过时时间
                RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                future.addListener(new FutureListener<Boolean>() {
                    public void operationComplete(Future<Boolean> future) throws Exception {
                       // 从加锁集合移除 RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());
                        if (!future.isSuccess()) {
                            RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", future.cause());
                        } else {
                            // 成功重置锁时间以后再次调用
                            if ((Boolean)future.getNow()) {
                                RedissonLock.this.scheduleExpirationRenewal(threadId);
                            }
                        }
                    }
                });
            }
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
        // 保证任务不会被重复建立,取消任务
        if (expirationRenewalMap.putIfAbsent(this.getEntryName(), new RedissonLock.ExpirationEntry(threadId, task)) != null) {
            task.cancel();
        }

    }
}
    
复制代码

从源码中咱们能够了解redisson实现分布式锁的大体流程;当咱们没有设置锁过时时间的时候,redisson会使用lockWatchdogTimeout时间(默认为30s)设置为锁过时时间;redisson设置的redis数据结构是一个hash,其中一个属性是锁的值,由uuid和当前线程id组成,另外一个属性是加锁次数用来实现可重入性;当没有设置锁过时时间的时候,redisson会每隔1/3锁过时时间将锁过时时间重置为初始值(默认30s时,当过时时间还有20s就会从新设置过时时间为30s)直到释放锁;若是设置了过时时间则不会有锁续租的功能。加锁的时候若是当前key不存在则直接设置key,若是存在而且是本身的则将加锁次数加一。加锁失败则订阅释放锁redis channel,线程进入阻塞。释放锁先判断当前是不是本身的锁,若是是则将当前加锁次数减一,若是减一以后为0则删除key,若是有续租任务则取消续租任务,向redis channel中发一条消息唤醒被阻塞的线程获取锁。 ide

总结

原本想在这一篇把基于zookeeper的实现也介绍一下,可是由于篇幅缘由,因此决定放在下一篇。讲到这里你以为这个分布式锁实现方案是否完美呢?其实仍是存在问题的,这种是单机redis下实现的,并不能保证高可用性;若是拿到锁的client经历STW这种状况且停顿太长超过了锁过时时间,此时锁已经被另外一个client所获得,原先的client没有感知到锁过时,那么就会发生错误。在集群模式下redis官方提出了redLock算法,redisson有它的实现,可是业界大佬们对这个算法存在质疑,有兴趣的小伙伴能够本身去了解。
相关文章
相关标签/搜索