spring boot redis分布式锁

一. Redis 分布式锁的实现以及存在的问题

锁是针对某个资源,保证其访问的互斥性,在实际使用当中,这个资源通常是一个字符串。使用 Redis 实现锁,主要是将资源放到 Redis 当中,利用其原子性,当其余线程访问时,若是 Redis 中已经存在这个资源,就不容许以后的一些操做。spring boot使用 Redis 的操做主要是经过 RedisTemplate 来实现,通常步骤以下:html

  1. 将锁资源放入 Redis (注意是当key不存在时才能放成功,因此使用 setIfAbsent 方法):
redisTemplate.opsForValue().setIfAbsent("key", "value");
  1. 设置过时时间
redisTemplate.expire("key", 30000, TimeUnit.MILLISECONDS);
  1. 释放锁
redisTemplate.delete("key");

通常状况下,这样的实现就可以知足锁的需求了,可是若是在调用 setIfAbsent 方法以后线程挂掉了,即没有给锁定的资源设置过时时间,默认是永不过时,那么这个锁就会一直存在。因此须要保证设置锁及其过时时间两个操做的原子性,spring data的 RedisTemplate 当中并无这样的方法。可是在jedis当中是有这种原子操做的方法的,须要经过 RedisTemplate 的 execute 方法获取到jedis里操做命令的对象,代码以下:java

String result = redisTemplate.execute(new RedisCallback<String>() {
    @Override public String doInRedis(RedisConnection connection) throws DataAccessException {
        JedisCommands commands = (JedisCommands) connection.getNativeConnection(); return commands.set(key, "锁定的资源", "NX", "PX", expire);
    }
});

注意: Redis 从2.6.12版本开始 set 命令支持 NX 、 PX 这些参数来达到 setnx 、 setex 、 psetex 命令的效果,文档参见: http://doc.redisfans.com/string/set.htmlredis

NX: 表示只有当锁定资源不存在的时候才能 SET 成功。利用 Redis 的原子性,保证了只有第一个请求的线程才能得到锁,而以后的全部线程在锁定资源被释放以前都不能得到锁。spring

PX: expire 表示锁定的资源的自动过时时间,单位是毫秒。具体过时时间根据实际场景而定sql

这样在获取锁的时候就可以保证设置 Redis 值和过时时间的原子性,避免前面提到的两次 Redis 操做期间出现意外而致使的锁不能释放的问题。可是这样仍是可能会存在一个问题,考虑以下的场景顺序:架构

  • 线程T1获取锁
  • 线程T1执行业务操做,因为某些缘由阻塞了较长时间
  • 锁自动过时,即锁自动释放了
  • 线程T2获取锁
  • 线程T1业务操做完毕,释放锁(实际上是释放的线程T2的锁)

按照这样的场景顺序,线程T2的业务操做实际上就没有锁提供保护机制了。因此,每一个线程释放锁的时候只能释放本身的锁,即锁必需要有一个拥有者的标记,而且也须要保证释放锁的原子性操做。并发

所以在获取锁的时候,能够生成一个随机不惟一的串放入当前线程中,而后再放入 Redis 。释放锁的时候先判断锁对应的值是否与线程中的值相同,相同时才作删除操做。app

Redis 从2.6.0开始经过内置的 Lua 解释器,可使用 EVAL 命令对 Lua 脚本进行求值,文档参见: http://doc.redisfans.com/script/eval.htmldom

所以咱们能够经过 Lua 脚原本达到释放锁的原子操做,定义 Lua 脚本以下:异步

if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end

具体意思能够参考上面提供的文档地址

使用 RedisTemplate 执行的代码以下:

// 使用Lua脚本删除Redis中匹配value的key,能够避免因为方法执行时间过长而redis锁自动过时失效的时候误删其余线程的锁 // spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,因此只能拿到原redis的connection来执行脚本 Long result = redisTemplate.execute(new RedisCallback<Long>() { public Long doInRedis(RedisConnection connection) throws DataAccessException {
        Object nativeConnection = connection.getNativeConnection(); // 集群模式和单机模式虽然执行脚本的方法同样,可是没有共同的接口,因此只能分开执行 // 集群模式 if (nativeConnection instanceof JedisCluster) { return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
        } // 单机模式 else if (nativeConnection instanceof Jedis) { return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
        } return 0L;
    }
});

代码中分为集群模式和单机模式,而且二者的方法、参数都同样,缘由是spring封装的执行脚本的方法中( RedisConnection 接口继承于 RedisScriptingCommands 接口的 eval 方法),集群模式的方法直接抛出了不支持执行脚本的异常(虽然实际是支持的),因此只能拿到 Redis 的connection来执行脚本,而 JedisCluster 和 Jedis 中的方法又没有实现共同的接口,因此只能分开调用。

spring封装的集群模式执行脚本方法源码:

# JedisClusterConnection.java /**
 * (non-Javadoc)
 * @see org.springframework.data.redis.connection.RedisScriptingCommands#eval(byte[], org.springframework.data.redis.connection.ReturnType, int, byte[][])
 */ @Override public  T eval(byte[] script, ReturnType returnType, int numKeys, byte[]... keysAndArgs) { throw new InvalidDataAccessApiUsageException("Eval is not supported in cluster environment.");
}

至此,咱们就完成了一个相对可靠的 Redis 分布式锁,可是,在集群模式的极端状况下,仍是可能会存在一些问题,好比以下的场景顺序( 本文暂时不深刻开展 ):

  • 线程T1获取锁成功
  • Redis 的master节点挂掉,slave自动顶上
  • 线程T2获取锁,会从slave节点上去判断锁是否存在,因为Redis的master slave复制是异步的,因此此时线程T2可能成功获取到锁

为了能够之后扩展为使用其余方式来实现分布式锁,定义了接口和抽象类,全部的源码以下:

# DistributedLock.java 顶级接口 /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:05
 * @version 1.0.0
 */ public interface DistributedLock { public static final long TIMEOUT_MILLIS = 30000; public static final int RETRY_TIMES = Integer.MAX_VALUE; public static final long SLEEP_MILLIS = 500; public boolean lock(String key); public boolean lock(String key, int retryTimes); public boolean lock(String key, int retryTimes, long sleepMillis); public boolean lock(String key, long expire); public boolean lock(String key, long expire, int retryTimes); public boolean lock(String key, long expire, int retryTimes, long sleepMillis); public boolean releaseLock(String key);
}
# AbstractDistributedLock.java 抽象类,实现基本的方法,关键方法由子类去实现 /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:10:57
 * @version 1.0.0
 */ public abstract class AbstractDistributedLock implements DistributedLock {

    @Override public boolean lock(String key) { return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
    }

    @Override public boolean lock(String key, int retryTimes) { return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
    }

    @Override public boolean lock(String key, int retryTimes, long sleepMillis) { return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
    }

    @Override public boolean lock(String key, long expire) { return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
    }

    @Override public boolean lock(String key, long expire, int retryTimes) { return lock(key, expire, retryTimes, SLEEP_MILLIS);
    }

}
# RedisDistributedLock.java Redis分布式锁的实现 import java.util.ArrayList; import java.util.List; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.util.StringUtils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisCommands; /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:14
 * @version 1.0.0
 */ public class RedisDistributedLock extends AbstractDistributedLock { private final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class); private RedisTemplate<Object, Object> redisTemplate; private ThreadLocal<String> lockFlag = new ThreadLocal<String>(); public static final String UNLOCK_LUA; static {
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
        sb.append("then ");
        sb.append("    return redis.call(\"del\",KEYS[1]) ");
        sb.append("else ");
        sb.append("    return 0 ");
        sb.append("end ");
        UNLOCK_LUA = sb.toString();
    } public RedisDistributedLock(RedisTemplate<Object, Object> redisTemplate) { super(); this.redisTemplate = redisTemplate;
    }

    @Override public boolean lock(String key, long expire, int retryTimes, long sleepMillis) { boolean result = setRedis(key, expire); // 若是获取锁失败,按照传入的重试次数进行重试 while((!result) && retryTimes-- > 0){ try {
                logger.debug("lock failed, retrying..." + retryTimes);
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) { return false;
            }
            result = setRedis(key, expire);
        } return result;
    } private boolean setRedis(String key, long expire) { try { String result = redisTemplate.execute(new RedisCallback<String>() {
                @Override public String doInRedis(RedisConnection connection) throws DataAccessException {
                    JedisCommands commands = (JedisCommands) connection.getNativeConnection(); String uuid = UUID.randomUUID().toString();
                    lockFlag.set(uuid); return commands.set(key, uuid, "NX", "PX", expire);
                }
            }); return !StringUtils.isEmpty(result);
        } catch (Exception e) {
            logger.error("set redis occured an exception", e);
        } return false;
    }
    
    @Override public boolean releaseLock(String key) { // 释放锁的时候,有可能由于持锁以后方法执行时间大于锁的有效期,此时有可能已经被另一个线程持有锁,因此不能直接删除 try {
            List<String> keys = new ArrayList<String>();
            keys.add(key);
            List<String> args = new ArrayList<String>();
            args.add(lockFlag.get()); // 使用lua脚本删除redis中匹配value的key,能够避免因为方法执行时间过长而redis锁自动过时失效的时候误删其余线程的锁 // spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,因此只能拿到原redis的connection来执行脚本 Long result = redisTemplate.execute(new RedisCallback() { public Long doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); // 集群模式和单机模式虽然执行脚本的方法同样,可是没有共同的接口,因此只能分开执行 // 集群模式 if (nativeConnection instanceof JedisCluster) { return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    } // 单机模式 else if (nativeConnection instanceof Jedis) { return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                    } return 0L;
                }
            }); return result != null && result > 0;
        } catch (Exception e) {
            logger.error("release lock occured an exception", e);
        } return false;
    }
    
}

二. 基于 AOP 的 Redis 分布式锁

在实际的使用过程当中,分布式锁能够封装好后使用在方法级别,这样就不用每一个地方都去获取锁和释放锁,使用起来更加方便。

  • 首先定义个注解:
import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:10:36
 * @version 1.0.0
 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface RedisLock { /** 锁的资源,redis的key*/ String value() default "default"; /** 持锁时间,单位毫秒*/ long keepMills() default 30000; /** 当获取失败时候动做*/ LockFailAction action() default LockFailAction.CONTINUE;
    
    public enum LockFailAction{ /** 放弃 */ GIVEUP, /** 继续 */ CONTINUE;
    } /** 重试的间隔时间,设置GIVEUP忽略此项*/ long sleepMills() default 200; /** 重试次数*/ int retryTimes() default 5;
}
  • 装配分布式锁的bean
import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.RedisTemplate; import com.itopener.lock.redis.spring.boot.autoconfigure.lock.DistributedLock; import com.itopener.lock.redis.spring.boot.autoconfigure.lock.RedisDistributedLock; /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:31
 * @version 1.0.0
 */ @Configuration @AutoConfigureAfter(RedisAutoConfiguration.class)
public class DistributedLockAutoConfiguration {
    
    @Bean @ConditionalOnBean(RedisTemplate.class)
    public DistributedLock redisDistributedLock(RedisTemplate redisTemplate){ return new RedisDistributedLock(redisTemplate);
    }
    
}
  • 定义切面(spring boot配置方式)
import java.lang.reflect.Method; import java.util.Arrays; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock; import com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock.LockFailAction; import com.itopener.lock.redis.spring.boot.autoconfigure.lock.DistributedLock; /**
 * @author fuwei.deng
 * @date 2017年6月14日 下午3:11:22
 * @version 1.0.0
 */ @Aspect @Configuration @ConditionalOnClass(DistributedLock.class) @AutoConfigureAfter(DistributedLockAutoConfiguration.class) public class DistributedLockAspectConfiguration { private final Logger logger = LoggerFactory.getLogger(DistributedLockAspectConfiguration.class); @Autowired private DistributedLock distributedLock; @Pointcut("@annotation(com.itopener.lock.redis.spring.boot.autoconfigure.annotations.RedisLock)") private void lockPoint(){
        
    } @Around("lockPoint()") public Object around(ProceedingJoinPoint pjp) throws Throwable{
        Method method = ((MethodSignature) pjp.getSignature()).getMethod();
        RedisLock redisLock = method.getAnnotation(RedisLock.class);
        String key = redisLock.value(); if(StringUtils.isEmpty(key)){
            Object[] args = pjp.getArgs();
            key = Arrays.toString(args);
        }
        int retryTimes = redisLock.action().equals(LockFailAction.CONTINUE) ? redisLock.retryTimes() : 0;
        boolean lock = distributedLock.lock(key, redisLock.keepMills(), retryTimes, redisLock.sleepMills()); if(!lock) {
            logger.debug("get lock failed : " + key); return null;
        } //获得锁,执行方法,释放锁 logger.debug("get lock success : " + key); try { return pjp.proceed();
        } catch (Exception e) {
            logger.error("execute locked method occured an exception", e);
        } finally {
            boolean releaseResult = distributedLock.releaseLock(key);
            logger.debug("release lock : " + key + (releaseResult ? " success" : " failed"));
        } return null;
    }
}
  • spring boot starter还须要在 resources/META-INF 中添加 spring.factories 文件
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.itopener.lock.redis.spring.boot.autoconfigure.DistributedLockAutoConfiguration,\ com.itopener.lock.redis.spring.boot.autoconfigure.DistributedLockAspectConfiguration

这样封装以后,使用spring boot开发的项目,直接依赖这个starter,就能够在方法上加 RedisLock 注解来实现分布式锁的功能了,固然若是须要本身控制,直接注入分布式锁的bean便可

@Autowired private DistributedLock distributedLock;

若是须要使用其余的分布式锁实现,继承 AbstractDistributedLock 后实现获取锁和释放锁的方法便可

欢迎欢迎学Java的朋友们加入java架构交流: 855835163 群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交代!

相关文章
相关标签/搜索