分布式锁是在分布式环境下(多个JVM进程)控制多个客户端对某一资源的同步访问的一种实现,与之相对应的是线程锁,线程锁控制的是同一个JVM进程内多个线程之间的同步。分布式锁的通常实现方法是在应用服务器以外经过一个共享的存储服务器存储锁资源,同一时刻只有一个客户端能占有锁资源来完成。一般有基于Zookeeper,Redis,或数据库三种实现形式。本文介绍基于Redis的实现方案。html
基于Redis实现分布式锁须要知足以下几点要求:java
基于Redis的分布式锁加锁操做通常使用 SETNX
命令,其含义是“将 key 的值设为 value ,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不作任何动做”。 在 Spring Boot 中,可使用 StringRedisTemplate 来实现,以下,一行代码便可实现加锁过程。(下列代码给出两种调用形式——当即返回加锁结果与给定超时时间获取加锁结果)redis
/**
* 尝试获取锁(当即返回)
* @param key 锁的redis key
* @param value 锁的value
* @param expire 过时时间/秒
* @return 是否获取成功
*/
public boolean lock(String key, String value, long expire) {
return stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS);
}
/**
* 尝试获取锁,并至多等待timeout时长
*
* @param key 锁的redis key
* @param value 锁的value
* @param expire 过时时间/秒
* @param timeout 超时时长
* @param unit 时间单位
* @return 是否获取成功
*/
public boolean lock(String key, String value, long expire, long timeout, TimeUnit unit) {
long waitMillis = unit.toMillis(timeout);
long waitAlready = 0;
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, value, expire, TimeUnit.SECONDS) && waitAlready < waitMillis) {
try {
Thread.sleep(waitMillisPer);
} catch (InterruptedException e) {
log.error("Interrupted when trying to get a lock. key: {}", key, e);
}
waitAlready += waitMillisPer;
}
if (waitAlready < waitMillis) {
return true;
}
log.warn("<====== lock {} failed after waiting for {} ms", key, waitAlready);
return false;
}复制代码
上述实现如何知足前面提到的几点要求:数据库
SETNX
命令,操做具有原子性。错误示例:服务器
网上有以下实现,分布式
public boolean lock(String key, String value, long expire) {
boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, value);
if(result) {
stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS);
}
return result;
}复制代码
该实现的问题是若是在result为true,但还没成功设置expire时,程序异常退出了,将致使该锁一直被占用而致使死锁,不知足第二点要求。ide
解锁也须要知足前面所述的四个要求,实现代码以下:ui
private static final String RELEASE_LOCK_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
private static final Long RELEASE_LOCK_SUCCESS_RESULT = 1L;
/**
* 释放锁
* @param key 锁的redis key
* @param value 锁的value
*/
public boolean unLock(String key, String value) {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_LOCK_LUA_SCRIPT, Long.class);
long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), value);
return Objects.equals(result, RELEASE_LOCK_SUCCESS_RESULT);
}复制代码
这段实现使用一个Lua脚原本实现解锁操做,保证操做的原子性。传入的value值需与该线程加锁时的value一致,可使用requestId(具体实现下面给出)。this
错误示例:url
public boolean unLock(String key, String value) {
String oldValue = stringRedisTemplate.opsForValue().get(key);
if(value.equals(oldValue)) {
stringRedisTemplate.delete(key);
}
}复制代码
该实现先获取锁的当前值,判断两值相等则删除。考虑一种极端状况,若是在判断为true时,恰好该锁过时时间到,另外一个客户端加锁成功,则接下来的delete将无论三七二十一将别人加的锁直接删掉了,不知足第三点要求。该示例主要是由于没有保证解锁操做的原子性致使。
为了方便使用,添加一个注解,能够放于方法上控制方法在分布式环境中的同步执行。
/**
* 标注在方法上的分布式锁注解
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedLockable {
String key();
String prefix() default "disLock:";
long expire() default 10L; // 默认10s过时
}复制代码
添加一个切面来解析注解的处理,
/**
* 分布式锁注解处理切面
*/
@Aspect
@Slf4j
public class DistributedLockAspect {
private DistributedLock lock;
public DistributedLockAspect(DistributedLock lock) {
this.lock = lock;
}
/**
* 在方法上执行同步锁
*/
@Around(value = "@annotation(lockable)")
public Object distLock(ProceedingJoinPoint point, DistributedLockable lockable) throws Throwable {
boolean locked = false;
String key = lockable.prefix() + lockable.key();
try {
locked = lock.lock(key, WebUtil.getRequestId(), lockable.expire());
if(locked) {
return point.proceed();
} else {
log.info("Did not get a lock for key {}", key);
return null;
}
} catch (Exception e) {
throw e;
} finally {
if(locked) {
if(!lock.unLock(key, WebUtil.getRequestId())){
log.warn("Unlock {} failed, maybe locked by another client already. ", lockable.key());
}
}
}
}
}复制代码
RequestId 的实现以下,经过注册一个Filter,在请求开始时生成一个uuid存于ThreadLocal中,在请求返回时清除。
public class WebUtil {
public static final String REQ_ID_HEADER = "Req-Id";
private static final ThreadLocal<String> reqIdThreadLocal = new ThreadLocal<>();
public static void setRequestId(String requestId) {
reqIdThreadLocal.set(requestId);
}
public static String getRequestId(){
String requestId = reqIdThreadLocal.get();
if(requestId == null) {
requestId = ObjectId.next();
reqIdThreadLocal.set(requestId);
}
return requestId;
}
public static void removeRequestId() {
reqIdThreadLocal.remove();
}
}
public class RequestIdFilter implements Filter {
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
String reqId = httpServletRequest.getHeader(WebUtil.REQ_ID_HEADER);
//没有则生成一个
if (StringUtils.isEmpty(reqId)) {
reqId = ObjectId.next();
}
WebUtil.setRequestId(reqId);
try {
filterChain.doFilter(servletRequest, servletResponse);
} finally {
WebUtil.removeRequestId();
}
}
}
//在配置类中注册Filter
/**
* 添加RequestId
* @return
*/
@Bean
public FilterRegistrationBean requestIdFilter() {
RequestIdFilter reqestIdFilter = new RequestIdFilter();
FilterRegistrationBean registrationBean = new FilterRegistrationBean();
registrationBean.setFilter(reqestIdFilter);
List<String> urlPatterns = Collections.singletonList("/*");
registrationBean.setUrlPatterns(urlPatterns);
registrationBean.setOrder(Ordered.HIGHEST_PRECEDENCE + 1);
return registrationBean;
}复制代码
@DistributedLockable(key = "test", expire = 10)
public void test(){
System.out.println("线程-"+Thread.currentThread().getName()+"开始执行..." + LocalDateTime.now());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程-"+Thread.currentThread().getName()+"结束执行..." + LocalDateTime.now());
}复制代码
本文给出了基于Redis的分布式锁的实现方案与常见的错误示例。要保障分布式锁的正确运行,需知足本文所提的四个要求,尤为注意保证加锁解锁操做的原子性,设置过时时间,及对同一个锁的加锁解锁线程一致。原文地址: blog.jboost.cn/distributed…
[转载请注明出处] 做者:雨歌 欢迎关注做者公众号:半路雨歌,查看更多技术干货文章