好久以前有讲过并发编程中的锁并发编程的锁机制:synchronized和lock。在单进程的系统中,当存在多个线程能够同时改变某个变量时,就须要对变量或代码块作同步,使其在修改这种变量时可以线性执行消除并发修改变量。而同步的本质是经过锁来实现的。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么须要在某个地方作个标记,这个标记必须每一个线程都能看到,当标记不存在时能够设置该标记,其他后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记。html
分布式环境下,数据一致性问题一直是一个比较重要的话题,而又不一样于单进程的状况。分布式与单机状况下最大的不一样在于其不是多线程而是多进程。多线程因为能够共享堆内存,所以能够简单的采起内存做为标记存储位置。而进程之间甚至可能都不在同一台物理机上,所以须要将标记存储在一个全部进程都能看到的地方。java
常见的是秒杀场景,订单服务部署了多个实例。如秒杀商品有4个,第一个用户购买3个,第二个用户购买2个,理想状态下第一个用户能购买成功,第二个用户提示购买失败,反之亦可。而实际可能出现的状况是,两个用户都获得库存为4,第一个用户买到了3个,更新库存以前,第二个用户下了2个商品的订单,更新库存为2,致使出错。redis
在上面的场景中,商品的库存是共享变量,面对高并发情形,须要保证对资源的访问互斥。在单机环境中,Java中其实提供了不少并发处理相关的API,可是这些API在分布式场景中就无能为力了。也就是说单纯的Java Api并不能提供分布式锁的能力。分布式系统中,因为分布式系统的分布性,即多线程和多进程而且分布在不一样机器中,synchronized和lock这两种锁将失去原有锁的效果,须要咱们本身实现分布式锁。sql
常见的锁方案以下:数据库
下面咱们简单介绍下这几种锁的实现。apache
基于数据库的锁实现也有两种方式,一是基于数据库表,另外一种是基于数据库排他锁。编程
基于数据库表增删是最简单的方式,首先建立一张锁的表主要包含下列字段:方法名,时间戳等字段。缓存
具体使用的方法,当须要锁住某个方法时,往该表中插入一条相关的记录。这边须要注意,方法名是有惟一性约束的,若是有多个请求同时提交到数据库的话,数据库会保证只有一个操做能够成功,那么咱们就能够认为操做成功的那个线程得到了该方法的锁,能够执行方法体内容。bash
执行完毕,须要delete该记录。服务器
固然,笔者这边只是简单介绍一下。对于上述方案能够进行优化,如应用主从数据库,数据之间双向同步。一旦挂掉快速切换到备库上;作一个定时任务,每隔必定时间把数据库中的超时数据清理一遍;使用while循环,直到insert成功再返回成功,虽然并不推荐这样作;还能够记录当前得到锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,若是当前机器的主机信息和线程信息在数据库能够查到的话,直接把锁分配给他就能够了,实现可重入锁。
咱们还能够经过数据库的排他锁来实现分布式锁。基于MySql的InnoDB引擎,可使用如下方法来实现加锁操做:
public void lock(){
connection.setAutoCommit(false)
int count = 0;
while(count < 4){
try{
select * from lock where lock_name=xxx for update;
if(结果不为空){
//表明获取到锁
return;
}
}catch(Exception e){
}
//为空或者抛异常的话都表示没有获取到锁
sleep(1000);
count++;
}
throw new LockException();
}
复制代码
在查询语句后面增长for update,数据库会在查询过程当中给数据库表增长排他锁。当某条记录被加上排他锁以后,其余线程没法再在该行记录上增长排他锁。其余没有获取到锁的就会阻塞在上述select语句上,可能的结果有2种,在超时以前获取到了锁,在超时以前仍未获取到锁。
得到排它锁的线程便可得到分布式锁,当获取到锁以后,能够执行方法的业务逻辑,执行完方法以后,释放锁connection.commit()
。
存在的问题主要是性能不高和sql超时的异常。
上面两种方式都是依赖数据库的一张表,一种是经过表中的记录的存在状况肯定当前是否有锁存在,另一种是经过数据库的排他锁来实现分布式锁。
基于zookeeper临时有序节点能够实现的分布式锁。每一个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个惟一的瞬时有序节点。 判断是否获取锁的方式很简单,只须要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除便可。同时,其能够避免服务宕机致使的锁没法释放,而产生的死锁问题。
提供的第三方库有curator,具体使用读者能够自行去看一下。Curator提供的InterProcessMutex是分布式锁的实现。acquire方法获取锁,release方法释放锁。另外,锁释放、阻塞锁、可重入锁等问题均可以有有效解决。讲下阻塞锁的实现,客户端能够经过在ZK中建立顺序节点,而且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端能够检查本身建立的节点是否是当前全部节点中序号最小的,若是是就获取到锁,即可以执行业务逻辑。
最后,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并无缓存服务那么高。由于每次在建立锁和释放锁的过程当中,都要动态建立、销毁瞬时节点来实现锁功能。ZK中建立和删除节点只能经过Leader服务器来执行,而后将数据同不到全部的Follower机器上。并发问题,可能存在网络抖动,客户端和ZK集群的session链接断了,zk集群觉得客户端挂了,就会删除临时节点,这时候其余客户端就能够获取到分布式锁了。
相对于基于数据库实现分布式锁的方案来讲,基于缓存来实如今性能方面会表现的更好一点,存取速度快不少。并且不少缓存是能够集群部署的,能够解决单点问题。基于缓存的锁有好几种,如memcached、redis、本文下面主要讲解基于redis的分布式实现。
使用redis的SETNX实现分布式锁,多个进程执行如下Redis命令:
SETNX lock.id <current Unix time + lock timeout + 1>
复制代码
SETNX是将 key 的值设为 value,当且仅当 key 不存在。若给定的 key 已经存在,则 SETNX 不作任何动做。
SETNX实现分布式锁,可能会存在死锁的状况。与单机模式下的锁相比,分布式环境下不只须要保证进程可见,还须要考虑进程与锁之间的网络问题。某个线程获取了锁以后,断开了与Redis 的链接,锁没有及时释放,竞争该锁的其余线程都会hung,产生死锁的状况。
在使用 SETNX 得到锁时,咱们将键 lock.id 的值设置为锁的有效时间,线程得到锁后,其余线程还会不断的检测锁是否已超时,若是超时,等待的线程也将有机会得到锁。然而,锁超时,咱们不能简单地使用 DEL 命令删除键 lock.id 以释放锁。
考虑如下状况:
- A已经首先得到了锁 lock.id,而后线A断线。B,C都在等待竞争该锁;
- B,C读取lock.id的值,比较当前时间和键 lock.id 的值来判断是否超时,发现超时;
- B执行 DEL lock.id命令,并执行 SETNX lock.id 命令,并返回1,B得到锁;
- C因为各刚刚检测到锁已超时,执行 DEL lock.id命令,将B刚刚设置的键 lock.id 删除,执行 SETNX lock.id命令,并返回1,即C得到锁。
上面的步骤很明显出现了问题,致使B,C同时获取了锁。在检测到锁超时后,线程不能直接简单地执行 DEL 删除键的操做以得到锁。
对于上面的步骤进行改进,问题是出在删除键的操做上面,那么获取锁以后应该怎么改进呢? 首先看一下redis的GETSET这个操做,GETSET key value
,将给定 key 的值设为 value ,并返回 key 的旧值(old value)。利用这个操做指令,咱们改进一下上述的步骤。
- A已经首先得到了锁 lock.id,而后线A断线。B,C都在等待竞争该锁;
- B,C读取lock.id的值,比较当前时间和键 lock.id 的值来判断是否超时,发现超时;
- B检测到锁已超时,即当前的时间大于键 lock.id 的值,B会执行
GETSET lock.id <current Unix timestamp + lock timeout + 1>
设置时间戳,经过比较键 lock.id 的旧值是否小于当前时间,判断进程是否已得到锁;- B发现GETSET返回的值小于当前时间,则执行 DEL lock.id命令,并执行 SETNX lock.id 命令,并返回1,B得到锁;
- C执行GETSET获得的时间大于当前时间,则继续等待。
在线程释放锁,即执行 DEL lock.id 操做前,须要先判断锁是否已超时。若是锁已超时,那么锁可能已由其余线程得到,这时直接执行 DEL lock.id 操做会致使把其余线程已得到的锁释放掉。
public boolean lock(long acquireTimeout, TimeUnit timeUnit) throws InterruptedException {
acquireTimeout = timeUnit.toMillis(acquireTimeout);
long acquireTime = acquireTimeout + System.currentTimeMillis();
//使用J.U.C的ReentrantLock
threadLock.tryLock(acquireTimeout, timeUnit);
try {
//循环尝试
while (true) {
//调用tryLock
boolean hasLock = tryLock();
if (hasLock) {
//获取锁成功
return true;
} else if (acquireTime < System.currentTimeMillis()) {
break;
}
Thread.sleep(sleepTime);
}
} finally {
if (threadLock.isHeldByCurrentThread()) {
threadLock.unlock();
}
}
return false;
}
public boolean tryLock() {
long currentTime = System.currentTimeMillis();
String expires = String.valueOf(timeout + currentTime);
//设置互斥量
if (redisHelper.setNx(mutex, expires) > 0) {
//获取锁,设置超时时间
setLockStatus(expires);
return true;
} else {
String currentLockTime = redisUtil.get(mutex);
//检查锁是否超时
if (Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) < currentTime) {
//获取旧的锁时间并设置互斥量
String oldLockTime = redisHelper.getSet(mutex, expires);
//旧值与当前时间比较
if (Objects.nonNull(oldLockTime) && Objects.equals(oldLockTime, currentLockTime)) {
//获取锁,设置超时时间
setLockStatus(expires);
return true;
}
}
return false;
}
}
复制代码
lock调用tryLock方法,参数为获取的超时时间与单位,线程在超时时间内,获取锁操做将自旋在那里,直到该自旋锁的保持者释放了锁。
tryLock方法中,主要逻辑以下:
public boolean unlock() {
//只有锁的持有线程才能解锁
if (lockHolder == Thread.currentThread()) {
//判断锁是否超时,没有超时才将互斥量删除
if (lockExpiresTime > System.currentTimeMillis()) {
redisHelper.del(mutex);
logger.info("删除互斥量[{}]", mutex);
}
lockHolder = null;
logger.info("释放[{}]锁成功", mutex);
return true;
} else {
throw new IllegalMonitorStateException("没有获取到锁的线程没法执行解锁操做");
}
}
复制代码
在上面获取锁的实现下,其实此处的释放锁函数能够不须要了,有兴趣的读者能够结合上面的代码看下为何?有想法能够留言哦!
本文主要讲解了基于redis分布式锁的实现,在分布式环境下,数据一致性问题一直是一个比较重要的话题,而synchronized和lock锁在分布式环境已经失去了做用。常见的锁的方案有基于数据库实现分布式锁、基于缓存实现分布式锁、基于Zookeeper实现分布式锁,简单介绍了每种锁的实现特色;而后,文中探索了一下redis锁的实现方案;最后,本文给出了基于Java实现的redis分布式锁,读者能够自行验证一下。