Redisson 分布式锁场景和使用

分布式锁的使用场景

在传统架构单机单服务中,对一些并发场景读取公共资源时,好比加减库存、银行卡消费等,经过同步或者加锁就能够进行资源控制。node

这里主要经过线程锁实现:经过给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程能够执行该方法或该代码段。线程锁只在同一JVM中有效果,由于线程锁的实如今根本上是依靠线程之间共享内存实现的,好比Synchronized、Lock等。redis

当应用从单进程多线程拆分为分布式系统集群部署的多进程多线程后,以前的解决方案就没法知足需求了,业界经常使用的解决方案一般是借助于一个第三方组件并利用它自身的排他性来达到多进程间的互斥、资源隔离。spring

  • 基于zk的临时顺序节点
  • 基于数据库的惟一主键
  • 基于redis的setnx setex

zk和DB在高并发的场景下可能会有性能问题,经过redis实现分布式锁不论是从实现难度和性能方面都比较合适。数据库

redis分布式锁方案比较

在对比redis分布式锁方案以前,先列举下分布式锁的特色:编程

  • 互斥性:任何一时刻只有一个线程获取到锁
  • 可重入性:同一线程在获取锁以后可重复获取该锁
  • 锁超时:设置超时时间,避免死锁
  • 安全性:锁只能被持有者删除

对于Redis分布式锁的实现方式,网上的文章大部分都是单纯使用setnx命令的基础上进行一个简单封装,且少有文章分析这样设计的缺陷。在这个博客满天飞,代码随便贴的时代,这样的局面无形之中给了你们一个假象,就是Redis分布式锁只能是以这样简单的形式存在,即使有缺陷也只能在业务代码里规避。那么为何不换位思考一下,即用稍微复杂点的设计来弥补它的不足,从而换取业务上的灵活性呢?再介绍redisson以前,咱们先了解一下单纯使用setnx命令封装的分布式锁有哪些不足。安全

  1. 不具有可重入性markdown

    在执行setnx命令时,一般采用业务上指定的名称做为key名,用时间或随机值做为value来实现。这样的实现方式不具有追踪请求线程的能力,同时也不具有统计重入次数的能力,甚至有些实现方式都不具有操做的原子性。当遇到业务上须要在多个地方用到一样一个锁的时候,很显然使用不具备可重入的锁会很容易发生死锁的现象。特别是在有递归逻辑的场景里,发生死锁的概率会更高。Java并发工具包里的Lock对象和Synchronized语块都具备可重入性,对于常用这些工具的人来讲,每每会很容易忽略setnx的这个缺陷。多线程

  2. 不支持续约架构

    在分布式环境中,为了保证锁的活性和避免程序宕机形成的死锁现象,分布式锁每每会引入一个失效时间,超过这个时间则认为自动解锁。这样的设计前提是开发人员对这个自动解锁时间的粒度有一个很好的把握,过短了可能会出现任务没作完锁就失效了,而太长了在出现程序宕机或业务节点挂掉时,其它节点须要等很长时间才能恢复,而难以保证业务的SLA(正常运行时间保障)。setnx的设计缺少一个延续有效期的续约机制,没法保证业务可以先工做作完再解锁,也不能确保在某个程序宕机或业务节点挂掉的时候,其它节点可以很快的恢复业务处理能力。并发

  3. 不具有阻塞的能力

    阻塞性是指的在有竞争的状况下,未获取到资源的线程会中止继续操做,直到成功获取到资源或取消操做。很显然setnx命令只提供了互斥的特性,却没有提供阻塞的能力。虽然在业务代码里能够引入自旋机制来进行再次获取,但这仅仅是把本来应该在锁里实现的功能搬到了业务代码里,经过增长业务代码的复杂程度来简化锁的实现彷佛显得有点南辕北辙。

Redisson的分布式锁在知足以上三个基本要求的同时还增长了线程安全的特色。利用Redis的Hash结构做为储存单元,将业务指定的名称做为key,将随机UUID和线程ID做为field,最后将加锁的次数做为value来储存。同时UUID做为锁的实例变量保存在客户端。将UUID和线程ID做为标签在运行多个线程同时使用同一个锁的实例时,仍然保证了操做的独立性,知足了线程安全的要求。

拥抱开源,不重复造轮子

redisson介绍

Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数据库提供的一系列优点,基于Java实用工具包中经常使用接口,为使用者提供了一系列具备分布式特性的经常使用工具类。使得本来做为协调单机多线程并发程序的工具包得到了协调分布式多机多线程并发系统的能力,大大下降了设计和研发大规模分布式系统的难度。同时结合各富特点的分布式服务,更进一步简化了分布式环境中程序相互之间的协做。

jedis客户端:采用同步编程模型的客户端,须要随时确保并发线程数与链接数一对一

redisson客户端:采用Netty异步编程框架,使用了与Redis服务端结构相似的事件循环(EventLoop)式的线程池,并结合链接池的方式弹性管理链接。最终作到了使用少许的链接就能够知足对大量线程的要求,从根本上缓解线程之间的竞争关系。

项目集成配置

引入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.10.6</version>
</dependency>
复制代码

可经过JSON、YAML和Spring XML文件配置集群模式,这里经过yaml方式进行配置,增长文件redisson.yaml,以下:

clusterServersConfig:
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  reconnectionTimeout: 3000
  failedAttempts: 3
  password: null
  subscriptionsPerConnection: 5
  clientName: null
  loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
  slaveSubscriptionConnectionMinimumIdleSize: 1
  slaveSubscriptionConnectionPoolSize: 50
  slaveConnectionMinimumIdleSize: 32
  slaveConnectionPoolSize: 64
  masterConnectionMinimumIdleSize: 32
  masterConnectionPoolSize: 64
  readMode: "SLAVE"
  nodeAddresses:
  - "redis://127.0.0.1:7004"
  - "redis://127.0.0.1:7001"
  - "redis://127.0.0.1:7000"
  scanInterval: 1000
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.JsonJacksonCodec> {}
"transportMode":"NIO"
复制代码

在application.yml中增长redis配置

spring.redis.redisson:
    config: classpath:redisson.yaml
复制代码

结合apollo的配置

因为apollo会在客户端保存一份配置文件的备份,与classpath下的redisson.yaml是两个文件,因此没法经过以上方式加载,经过查看RedissonAutoConfiguration中RedissonClient的初始化代码,经过自定义Properties类,加载配置文件生成org.redisson.config.config进行RedissonClient的建立。

@Slf4j
@ConfigurationProperties(
    prefix = "spring.redisson"
)
@Data
public class RedissonProperties implements InitializingBean {

    private SentinelServersConfig sentinelServersConfig;
    private MasterSlaveServersConfig masterSlaveServersConfig;
    private SingleServerConfig singleServerConfig;
    private Map<String,Object> clusterServersConfig;
    private ReplicatedServersConfig replicatedServersConfig;
    private ConnectionManager connectionManager;
    private int threads = 16;
    private int nettyThreads = 32;
    private JSONObject codec;
    private ExecutorService executor;
    private boolean referenceEnabled = true;
    private TransportMode transportMode;
    private EventLoopGroup eventLoopGroup;
    private long lockWatchdogTimeout;
    private boolean keepPubSubOrder;
    private boolean decodeInExecutor;
    private boolean useScriptCache;
    private int minCleanUpDelay;
    private int maxCleanUpDelay;
    private JSONObject addressResolverGroupFactory;

    @Override
    public void afterPropertiesSet() throws Exception {
        Map<String,String> nodeAddressesMap = (Map<String,String>)clusterServersConfig.get("nodeAddresses");
        clusterServersConfig.put("nodeAddresses",nodeAddressesMap.values());
        log.debug(JSON.toJSONString(this));
    }
}
复制代码
public class YamlPropertySourceFactory implements PropertySourceFactory {

    @Override
    public PropertySource<?> createPropertySource(@Nullable String name, EncodedResource resource) throws IOException {
        Properties propertiesFromYaml = loadYamlIntoProperties(resource);
        String sourceName = name != null ? name : resource.getResource().getFilename();
        return new PropertiesPropertySource(sourceName, propertiesFromYaml);
    }

    private Properties loadYamlIntoProperties(EncodedResource resource) throws FileNotFoundException {
        try {
            YamlPropertiesFactoryBean factory = new YamlPropertiesFactoryBean();
            factory.setResources(resource.getResource());
            factory.afterPropertiesSet();
            return factory.getObject();
        } catch (IllegalStateException e) {
            // for ignoreResourceNotFound
            Throwable cause = e.getCause();
            if (cause instanceof FileNotFoundException)
                throw (FileNotFoundException) e.getCause();
            throw e;
        }
    }
}
复制代码
@Configuration
@PropertySource(factory = YamlPropertySourceFactory.class, value = "classpath:redisson.yaml")
@EnableConfigurationProperties({RedissonProperties.class})
public class RedissonConfig {

    @Autowired
    private RedissonProperties redissonProperties;

    @Bean(
            destroyMethod = "shutdown"
    )
    public RedissonClient redisson() throws IOException {
        String redissonCondig = JSON.toJSONString(redissonProperties);
        Config config = Config.fromJSON(redissonCondig);
        return Redisson.create(config);
    }
}
复制代码

分布式锁使用案例

普通加锁

RLock lock = redisson.getLock("anyLock");
lock.lock();
复制代码

设置等待时间、释放时间加锁

// 加锁之后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁之后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}
复制代码

公平锁:当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程

RLock fairLock = redisson.getFairLock("anyLock");
// 最多见的使用方法
fairLock.lock();
复制代码

联合锁

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
复制代码

读写锁(读写互斥,读读不互斥)

redisson.getReadWriteLock("anyRWLock");
RLock lock = rwlock.readLock();
// 尝试加锁,最多等待100秒,上锁之后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
// 或
RLock lock = rwlock.writeLock();
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
复制代码

闭锁

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其余线程或其余JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
复制代码

redisson分布式锁还能够应用于保证幂等,控制mq消费等。

源码分析

加锁lua脚本

//KEYS[1]:锁的名称,getLock(name)的name
//ARGV[2]:id(UUID)+ ":" +threadId
//ARGV[1]:leaseTime

//检查锁是否存在,若是不存在,则直接设置hash对象和超时时间
if (redis.call('exists', KEYS[1]) == 0) then 
    redis.call('hset', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 
//若是锁存在,则判断hash里的ARGV[2]是否与本线程相同,相同则重入,更新失效时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
    redis.call('hincrby', KEYS[1], ARGV[2], 1); 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return nil; 
end; 
//返回锁的失效时间
return redis.call('pttl', KEYS[1]);
复制代码

解锁lua脚本

//KEYS[1]:锁的名称,getLock(name)的name
//KEYS[2]:"redisson_lock__channel"+":{" + KEYS[1] + "}" || "redisson_lock__channel"+":" + KEYS[1]
//ARGV[1]:0L
//ARGV[2]:leaseTime
//ARGV[3]:id(UUID)+ ":" +threadId

//判断是不是加锁的线程,不是则直接返回
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
    return nil;
end; 
//获取加锁次数-1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
//若是加锁次数>0,则更新失效时间后返回;反之则删除锁后发布解锁消息
if (counter > 0) then 
    redis.call('pexpire', KEYS[1], ARGV[2]); 
    return 0; 
else 
    redis.call('del', KEYS[1]); 
    redis.call('publish', KEYS[2], ARGV[1]); 
    return 1; 
end; 
return nil;
复制代码

watchdog

//异步执行加锁lua
RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
    if (e == null) {
        if (ttlRemaining) {
            //新建子线程定时设置失效时间
            this.scheduleExpirationRenewal(threadId);
        }
    }
});
private void renewExpiration() {
    RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
        //建立一个HashedWheelTimeout实例,利用HashedWheelTimer实现定时任务
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                if (ent != null) {
                    Long threadId = ent.getFirstThreadId();
                    if (threadId != null) {
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        future.onComplete((res, e) -> {
                            if (e != null) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            } else {
                                RedissonLock.this.renewExpiration();
                            }
                        });
                    }
                }
            }
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }
}

//定时任务执行的脚本
//KEYS[1]:锁的名称,getLock(name)的name
//ARGV[1]:leaseTime
//ARGV[2]:id(UUID)+ ":" +threadId
//判断是不是加锁的线程,更新失效时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
    redis.call('pexpire', KEYS[1], ARGV[1]); 
    return 1;
end; 
return 0;

复制代码
相关文章
相关标签/搜索