Redis分布式锁(二):支持锁的续期,避免锁超时后致使多个线程得到锁

使用现状

Redis分布式锁的基础内容,咱们已经在Redis分布式锁:基于AOP和Redis实现的简易版分布式锁这篇文章中讲过了,也在文章中示范了正常的加锁和解锁方法。redis

分布式锁在以前的项目中一直运行良好,没有辜负咱们的指望。bash

发现问题

但在最近查线上日志的时候偶然发现,有一个业务场景下,分布式锁偶尔会失效,致使有多个线程同时执行了相同的代码。dom

咱们通过初步排查,定位到是由于在这段代码中间调用了第三方的接口致使。分布式

由于业务代码耗时过长,超过了锁的超时时间,形成锁自动失效,而后另一个线程意外的持有了锁。因而就出现了多个线程共同持有锁的现象。ide

解决方案

问题既然已经出现了,那么接下来咱们就应该考虑解决方案了。函数

咱们也曾经想过,是否能够经过合理地设置LockTime(锁超时时间)来解决这个问题?post

但LockTime的设置本来就很不容易。LockTime设置太小,锁自动超时的几率就会增长,锁异常失效的几率也就会增长,而LockTime设置过大,万一服务出现异常没法正常释放锁,那么出现这种异常锁的时间也就越长。咱们只能经过经验去配置,一个能够接受的值,基本上是这个服务历史上的平均耗时再增长必定的buff。ui

既然这条路走不通了,那么还有其余路能够走么?this

固然仍是有的,咱们能够先给锁设置一个LockTime,而后启动一个守护线程,让守护线程在一段时间后,从新去设置这个锁的LockTime。spa

看起来很简单是否是?

但在实际操做中,咱们要注意如下几点:
一、和释放锁的状况一致,咱们须要先判断锁的对象是否没有变。不然会形成不管谁持有锁,守护线程都会去从新设置锁的LockTime。不该该续的不能瞎续。
二、守护线程要在合理的时间再去从新设置锁的LockTime,不然会形成资源的浪费。不能动不动就去续。
三、若是持有锁的线程已经处理完业务了,那么守护线程也应该被销毁。不能主人都挂了,守护者还在那里继续浪费资源。

代码实现

咱们首先先生成一个内部类去实现Runnable,做为守护线程的参数。

public class SurvivalClamProcessor implements Runnable {

    private static final int REDIS_EXPIRE_SUCCESS = 1;

    SurvivalClamProcessor(String field, String key, String value, int lockTime) {
        this.field = field;
        this.key = key;
        this.value = value;
        this.lockTime = lockTime;
        this.signal = Boolean.TRUE;
    }

    private String field;

    private String key;

    private String value;

    private int lockTime;

    //线程关闭的标记
    private volatile Boolean signal;

    void stop() {
        this.signal = Boolean.FALSE;
    }

    @Override
    public void run() {
        int waitTime = lockTime * 1000 * 2 / 3;
        while (signal) {
            try {
                Thread.sleep(waitTime);
                if (cacheUtils.expandLockTime(field, key, value, lockTime) == REDIS_EXPIRE_SUCCESS) {
                    if (logger.isInfoEnabled()) {
                        logger.info("expandLockTime 成功,本次等待{}ms,将重置锁超时时间重置为{}s,其中field为{},key为{}", waitTime, lockTime, field, key);
                    }
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("expandLockTime 失败,将致使SurvivalClamConsumer中断");
                    }
                    this.stop();
                }
            } catch (InterruptedException e) {
                if (logger.isInfoEnabled()) {
                    logger.info("SurvivalClamProcessor 处理线程被强制中断");
                }
            } catch (Exception e) {
                logger.error("SurvivalClamProcessor run error", e);
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("SurvivalClamProcessor 处理线程已中止");
        }
    }
}
复制代码

其中expandLockTime是经过Lua脚本实现的。延长锁超时的脚本语句和释放锁的Lua脚本相似。

String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return '0' end";

复制代码

在以上代码中,咱们将waitTime设置为Math.max(1, lockTime * 2 / 3),即守护线程许须要等待waitTime后才能够去从新设置锁的超时时间,避免了资源的浪费。

同时在expandLockTime时候也去判断了当前持有锁的对象是否一致,避免了胡乱重置锁超时时间的状况。

而后咱们在得到锁的代码以后,添加以下代码:

SurvivalClamProcessor survivalClamProcessor 
	= new SurvivalClamProcessor(lockField, lockKey, randomValue, lockTime);
Thread survivalThread = new Thread(survivalClamProcessor);
survivalThread.setDaemon(Boolean.TRUE);
survivalThread.start();
Object returnObject = joinPoint.proceed(args);
survivalClamProcessor.stop();
survivalThread.interrupt();
return returnObject;
复制代码

这段代码会先初始化守护线程的内部参数,而后经过start函数启动线程,最后在业务执行完以后,设置守护线程的关闭标记,最后经过interrupt()去中断sleep状态,保证线程及时销毁。

后续

本文讲解了如何经过启动一个守护线程去重置锁超时时间,也同时介绍了在实现过程的注意点。随带着也科普了一下线程销毁的正确方式。

那么关于分布式锁还有下文么?我也不知道,权当是有吧,可能下一期会讲讲如何经过其余方式(除Redis以外的)去实现分布式锁,也多是讲一下Redis分布式锁的其余问题和解决方案。

好了,咱们下一期再见,欢迎你们一块儿留言讨论。同时也欢迎点赞,欢迎送小星星~

相关文章
相关标签/搜索