spring mvc + redis 实现分布式锁(注解实现可自动重试)

说明 

分布式锁通常有三种实现方式:1. 数据库乐观锁;2. 基于 Redis 的分布式锁;3. 基于 ZooKeeper 的分布式锁。本文介绍基于 Redis 实现分布式锁。 html

关于实现分布式锁的三种方式,能够参考网上的一些关于    分布式锁简单入门以及三种实现方式介绍 (由于不少这种文章,我也不一一列举了)redis

本文中的分布式锁经过注解的方式实现,能够自定义重试次数,锁超时时间等。  spring

可靠性 

  • 首先,为了确保分布式锁可用,咱们至少要确保锁的实现同时知足如下四个条件: 
  • 互斥性。在任意时刻,只有一个客户端能持有锁。 
  • 不会发生死锁。即便有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其余客户端能加锁。 
  • 具备容错性。只要大部分的 Redis 节点正常运行,客户端就能够加锁和解锁。 
  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端本身不能把别人加的锁给解了。 

并发问题 

在没有使用分布式锁以前,若是有两个线程并发操做同一条数据,可能会出现并发问题(脏读、不可重复读、幻读)。 数据库

 举个例子,下面是一段将数据的值 +1 的代码: 缓存

public void addNum() {
    try {
        // 查询数据
        TE te = teManager.findOne(1L);
        System.out.println("start:" + te.getNum());
        // 模拟耗时操做
        Thread.sleep(2000);
        // 值加1
        te.setNum(te.getNum() + 1);
        // 保存数据
        teManager.save(te);
        System.out.println("end:" + te.getNum());
    } catch (Exception e) {

    }
}复制代码

有两个线程同时访问: bash

15:56:06,241 INFO  [stdout] (default task-39) start:0
15:56:07,548 INFO  [stdout] (default task-40) start:0
15:56:08,242 INFO  [stdout] (default task-39) end:1
15:56:09,555 INFO  [stdout] (default task-40) end:1复制代码

能够看到 task-39 和 task-40 两个线程读取到的值均是 0,执行两次后,值为 1 ,并非想要的 2。 多线程

具体执行的状况以下: 并发

若是是单机部署,那么能够用多线程的 18 般武艺来解决并发问题,好比加锁等,改动以下:app

public synchronized void addNum() {
    try {
        // 查询数据
        TE te = teManager.findOne(1L);
        System.out.println("start:" + te.getNum());
        // 模拟耗时操做
        Thread.sleep(2000);
        // 值加1
        te.setNum(te.getNum() + 1);
        // 保存数据
        teManager.save(te);
        System.out.println("end:" + te.getNum());
    } catch (Exception e) {

    }
}复制代码

加了一个关键字 synchronized 就能解决单机下的并发问题,结果以下:  dom

16:09:49,539 INFO  [stdout] (default task-46) start:0
16:09:51,541 INFO  [stdout] (default task-46) end:1
16:09:51,592 INFO  [stdout] (default task-47) start:1
16:09:53,597 INFO  [stdout] (default task-47) end:2复制代码


若是集群部署的话,这种方式就没法解决了(每台机器的 JVM 没法共享,没法加锁),只能使用分布式锁。 

分布式锁的实现 

pom.xml: 

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-redis</artifactId>
    <version>1.8.4.RELEASE</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.10.2</version>
</dependency>

<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjrt</artifactId>
    <version>1.8.9</version>
</dependency>
<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
    <version>1.8.9</version>
</dependency>复制代码

首先定义一个接口,提供加锁、解锁两个方法。 

public interface IDistributedLock {
    public static final long TIMEOUT_MILLIS = 5000;

    public static final int RETRY_TIMES = Integer.MAX_VALUE;

    public static final long SLEEP_MILLIS = 500;

    public boolean lock(String key);

    public boolean lock(String key, int retryTimes);

    public boolean lock(String key, int retryTimes, long sleepMillis);

    public boolean lock(String key, long expire);

    public boolean lock(String key, long expire, int retryTimes);

    public boolean lock(String key, long expire, int retryTimes, long sleepMillis);

    public boolean releaseLock(String key);
}复制代码

定义一个抽象类,实现该接口: 

public abstract class AbstractDistributedLockImpl implements IDistributedLock {

    @Override
    public boolean lock(String key) {
        return lock(key, TIMEOUT_MILLIS, RETRY_TIMES, SLEEP_MILLIS);
    }

    @Override
    public boolean lock(String key, int retryTimes) {
        return lock(key, TIMEOUT_MILLIS, retryTimes, SLEEP_MILLIS);
    }

    @Override
    public boolean lock(String key, int retryTimes, long sleepMillis) {
        return lock(key, TIMEOUT_MILLIS, retryTimes, sleepMillis);
    }

    @Override
    public boolean lock(String key, long expire) {
        return lock(key, expire, RETRY_TIMES, SLEEP_MILLIS);
    }

    @Override
    public boolean lock(String key, long expire, int retryTimes) {
        return lock(key, expire, retryTimes, SLEEP_MILLIS);
    }

}复制代码

具体实现: 

@Component
public class RedisDistributedLock extends AbstractDistributedLockImpl {

    private RedisTemplate<Object, Object> redisTemplate;

    private ThreadLocal<String> lockFlag = new ThreadLocal<>();

    private static final String UNLOCK_LUA;

    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    static {
        /**
         * Redis 从2.6.0开始经过内置的 Lua 解释器,可使用 EVAL 命令对 Lua 脚本进行求值,文档参见: http://doc.redisfans.com/script/eval.html
         */
        UNLOCK_LUA = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    }

    public RedisDistributedLock(RedisTemplate<Object, Object> redisTemplate) {
        super();
        this.redisTemplate = redisTemplate;
    }

    @Override
    public boolean lock(String key, long expire, int retryTimes, long sleepMillis) {
        boolean result = setRedis(key, expire);
        // 若是获取锁失败,按照传入的重试次数进行重试
        while ((!result) && retryTimes-- > 0) {
            try {
                System.out.println("lock failed, retrying..." + retryTimes);
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                return false;
            }
            result = setRedis(key, expire);
        }
        return result;
    }

    /**
     * 在获取锁的时候就可以保证设置 Redis 值和过时时间的原子性,避免前面提到的两次 Redis 操做期间出现意外而致使的锁不能释放的问题。可是这样仍是可能会存在一个问题,考虑以下的场景顺序:
     * <p>
     * 1. 线程T1获取锁
     * 2. 线程T1执行业务操做,因为某些缘由阻塞了较长时间
     * 3. 锁自动过时,即锁自动释放了
     * 4. 线程T2获取锁
     * 5. 线程T1业务操做完毕,释放锁(实际上是释放的线程T2的锁)
     * 6. 按照这样的场景顺序,线程T2的业务操做实际上就没有锁提供保护机制了。因此,每一个线程释放锁的时候只能释放本身的锁,即锁必需要有一个拥有者的标记,而且也须要保证释放锁的原子性操做。
     * <p>
     * 所以在获取锁的时候,能够生成一个随机不惟一的串放入当前线程中,而后再放入 Redis 。释放锁的时候先判断锁对应的值是否与线程中的值相同,相同时才作删除操做
     *
     * @param key redis key
     * @return 是否释放锁成功
     */
    @Override
    public boolean releaseLock(String key) {
        // 释放锁的时候,有可能由于持锁以后方法执行时间大于锁的有效期,此时有可能已经被另一个线程持有锁,因此不能直接删除
        try {
            List<String> keys = new ArrayList<>();
            keys.add(key);
            List<String> args = new ArrayList<>();
            args.add(lockFlag.get());

            // 使用lua脚本删除redis中匹配value的key,能够避免因为方法执行时间过长而redis锁自动过时失效的时候误删其余线程的锁
            // spring自带的执行脚本方法中,集群模式直接抛出不支持执行脚本的异常,因此只能拿到原redis的connection来执行脚本

            Long result = redisTemplate.execute((RedisCallback<Long>) redisConnection -> {
                Object nativeConnection = redisConnection.getNativeConnection();
                // 集群模式和单机模式虽然执行脚本的方法同样,可是没有共同的接口,因此只能分开执行
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                }

                // 单机模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                }
                return 0L;
            });

            return result != null && result > 0;
        } catch (Exception e) {
            System.out.println("release lock occured an exception" + e);
        } finally {
            // 清除掉ThreadLocal中的数据,避免内存溢出
            lockFlag.remove();
        }
        return false;
    }

    private boolean setRedis(String key, long expire) {
        try {
            String result = redisTemplate.execute((RedisCallback<String>) redisConnection -> {
                JedisCommands commands = (JedisCommands) redisConnection.getNativeConnection();
                String uuid = UUID.randomUUID().toString();
                lockFlag.set(uuid);
                return commands.set(key, uuid, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expire);
            });
            return !StringUtils.isEmpty(result);
        } catch (Exception e) {
            System.out.println("set redis occured an exception" + e);
        }
        return false;
    }

}复制代码

定义一个注解类: 

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface DistributeLock {

    /**
     * 锁的资源,key。
     * 支持spring El表达式
     */
    @AliasFor("name")
    String name() default "'default'";

    /**
     * 锁的资源,value。
     * 支持spring El表达式
     */
    @AliasFor("value")
    String value() default "'default'";

    /**
     * 持锁时间,单位毫秒
     */
    long keepMills() default 10000;

    /**
     * 当获取失败时候动做
     */
    LockFailAction action() default LockFailAction.CONTINUE;

    public enum LockFailAction {
        /**
         * 放弃
         */
        GIVEUP,
        /**
         * 继续
         */
        CONTINUE;
    }

    /**
     * 重试的间隔时间,设置GIVEUP忽略此项
     */
    long sleepMills() default 400;

    /**
     * 重试次数
     */
    int retryTimes() default 5;

}复制代码

定义切面: 

@Aspect
@Component
public class DistributedLockAspect {

    @Autowired
    private IDistributedLock distributedLock;

    private ExpressionParser parser = new SpelExpressionParser();

    private LocalVariableTableParameterNameDiscoverer discoverer = new LocalVariableTableParameterNameDiscoverer();

    /**
     * 定义切入点
     */
    @Pointcut("@annotation(com.telehot.distributedlock.annotations.DistributeLock)")
    private void lockPoint() {
    }

    /**
     * 环绕通知
     *
     * @param pjp pjp
     * @return 方法返回结果
     * @throws Throwable throwable
     */
    @Around("lockPoint()")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {
        Method method = ((MethodSignature) pjp.getSignature()).getMethod();
        DistributeLock lockAction = method.getAnnotation(DistributeLock.class);
        String logKey = getLogKey(lockAction, pjp, method);

        int retryTimes = lockAction.action().equals(DistributeLock.LockFailAction.CONTINUE) ? lockAction.retryTimes() : 0;
        boolean lock = distributedLock.lock(logKey, lockAction.keepMills(), retryTimes, lockAction.sleepMills());
        if (!lock) {
            System.out.println("get lock failed : " + logKey);
            return null;
        }

        //获得锁,执行方法,释放锁
        System.out.println("get lock success : " + logKey);
        try {
            return pjp.proceed();
        } catch (Exception e) {
            System.out.println("execute locked method occured an exception" + e);
        } finally {
            boolean releaseResult = distributedLock.releaseLock(logKey);
            System.out.println("release lock : " + logKey + (releaseResult ? " success" : " failed"));
        }
        return null;
    }

    /**
     * 得到分布式缓存的key
     *
     * @param lockAction 注解对象
     * @param pjp        pjp
     * @param method     method
     * @return String
     */
    private String getLogKey(DistributeLock lockAction, ProceedingJoinPoint pjp, Method method) {
        String name = lockAction.name();
        String value = lockAction.value();
        Object[] args = pjp.getArgs();
        return parse(name, method, args) + "_" + parse(value, method, args);
    }

    /**
     * 解析spring EL表达式
     *
     * @param key    key
     * @param method method
     * @param args   args
     * @return parse result
     */
    private String parse(String key, Method method, Object[] args) {
        String[] params = discoverer.getParameterNames(method);
        if (null == params || params.length == 0 || !key.contains("#")) {
            return key;
        }
        EvaluationContext context = new StandardEvaluationContext();
        for (int i = 0; i < params.length; i++) {
            context.setVariable(params[i], args[i]);
        }
        return parser.parseExpression(key).getValue(context, String.class);
    }

}复制代码

配置文件: 

 application.properties: 

redis.pool.min-idle=0
redis.pool.max-idle=8
redis.hostName=127.0.0.1
redis.port=6379
复制代码

applicationContext.xml: 

<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
    <property name="maxIdle" value="${redis.pool.max-idle}"/> <!-- 最大可以保持idel状态的对象数  -->
    <property name="minIdle" value="${redis.pool.min-idle}"/> <!-- 最小可以保持idel状态的对象数  -->
</bean>

<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
    <!-- redis 配置 -->
    <constructor-arg index="0" ref="jedisPoolConfig"/>
    <property name="hostName" value="${redis.hostName}"/>
    <property name="port" value="${redis.port}"/>
</bean>

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="jedisConnectionFactory" />
    <property name="keySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
    </property>
    <property name="valueSerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
    </property>
    <property name="hashKeySerializer">
       <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
    </property>
    <property name="hashValueSerializer">
       <bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
    </property>
</bean>

<aop:aspectj-autoproxy proxy-target-class="true" />复制代码

注意下,最后一行别忘了加上,否则 AOP 不会起做用。若是报错,须要加上 xml 的命名空间,能够自行百度。 

分布式锁的使用 

修改以前的代码: 

@DistributeLock(name = "#key")
public void addNum(String key) {
    try {
        // 查询数据
        TE te = teManager.findOne(1L);
        System.out.println("start:" + te.getNum());
        // 模拟耗时操做
        Thread.sleep(2000);
        // 值加1
        te.setNum(te.getNum() + 1);
        // 保存数据
        teManager.save(te);
        System.out.println("end:" + te.getNum());
    } catch (Exception e) {

    }
}复制代码

只须要加一个注解就行,能够用固定的 key 值,也能够用业务 ID 来做为锁的 key。这边用了业务 ID 来当 key,注解中也能自定义重试次数、超时时间等。 

运行结果: 

16:31:11,093 INFO  [stdout] (default task-60) get lock success : key1_'default'
16:31:11,102 INFO  [stdout] (default task-60) start:0
16:31:12,413 INFO  [stdout] (default task-61) lock failed, retrying...4
16:31:12,813 INFO  [stdout] (default task-61) lock failed, retrying...3
16:31:13,103 INFO  [stdout] (default task-60) end:1
16:31:13,104 INFO  [stdout] (default task-60) release lock : key1_'default' success
16:31:13,214 INFO  [stdout] (default task-61) get lock success : key1_'default'
16:31:13,222 INFO  [stdout] (default task-61) start:1
16:31:15,223 INFO  [stdout] (default task-61) end:2
16:31:15,225 INFO  [stdout] (default task-61) release lock : key1_'default' success复制代码

能够看到 task-60 一开始就获取到了锁,而 task-61 一开始获取锁失败,进行了重试,直到 task-60 运行完释放锁后,task-61 才拿到锁,继续执行代码。 

总结

  • 利用 redis 的 SETNX 命令,能够实现分布式锁,而且具备超时自动解锁的功能,防止死锁。
  • 利用 spring 的注解及 AOP 的特性,能够很方便地使用分布式锁。
  • 使用 redis 来实现分布式锁,比其余两种方式(数据库乐观锁、zookeeper)更为简单。
相关文章
相关标签/搜索