【高并发】高并发分布式锁架构解密,不是全部的锁都是分布式锁!!

写在前面

最近,不少小伙伴留言说,在学习高并发编程时,不太明白分布式锁是用来解决什么问题的,还有很多小伙伴甚至连分布式锁是什么都不太明白。明明在生产环境上使用了本身开发的分布式锁,为何还会出现问题呢?一样的程序,加上分布式锁后,性能差了几个数量级!这又是为何呢?今天,咱们就来讲说如何在高并发环境下实现分布式锁,不是全部的锁都是高并发的。html

万字长文,带你深刻解密高并发环境下的分布式锁架构,不是全部的锁都是分布式锁!!!前端

究竟什么样的锁才能更好的支持高并发场景呢?今天,咱们就一块儿解密高并发环境下典型的分布式锁架构,结合【高并发】专题下的其余文章,学以至用。java

锁用来解决什么问题呢?

在咱们编写的应用程序或者高并发程序中,不知道你们有没有想过一个问题,就是咱们为何须要引入锁?锁为咱们解决了什么问题呢?redis

在不少业务场景下,咱们编写的应用程序中会存在不少的 资源竞争 的问题。而咱们在高并发程序中,引入锁,就是为了解决这些资源竞争的问题。算法

电商超卖问题

这里,咱们能够列举一个简单的业务场景。好比,在电子商务(商城)的业务场景中,提交订单购买商品时,首先须要查询相应商品的库存是否足够,只有在商品库存数量足够的前提下,才能让用户成功的下单。下单时,咱们须要在库存数量中减去用户下单的商品数量,并将库存操做的结果数据更新到数据库中。整个流程咱们能够简化成下图所示。spring

不少小伙伴也留言说,让我给出代码,这样可以更好的学习和掌握相关的知识。好吧,这里,我也给出相应的代码片断吧。咱们可使用下面的代码片断来表示用户的下单操做,我这里将商品的库存信息保存在了Redis中。数据库

@RequestMapping("/submitOrder")
public String submitOrder(){
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        stock -= 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        logger.debug("库存扣减成功,当前库存为:{}", stock);
    }else{
        logger.debug("库存不足,扣减库存失败");
        throw new OrderException("库存不足,扣减库存失败");
    }
    return "success";
}

注意:上述代码片断比较简单,只是为了方便你们理解,真正项目中的代码就不能这么写了。编程

上述的代码看似是没啥问题的,可是咱们不能只从代码表面上来观察代码的执行顺序。这是由于在JVM中代码的执行顺序未必是按照咱们书写代码的顺序执行的。即便在JVM中代码是按照咱们书写的顺序执行,那咱们对外提供的接口一旦暴露出去,就会有成千上万的客户端来访问咱们的接口。因此说,咱们暴露出去的接口是会被并发访问的。数组

试问,上面的代码在高并发环境下是线程安全的吗?答案确定不是线程安全的,由于上述扣减库存的操做会出现并行执行的状况。缓存

咱们可使用Apache JMeter来对上述接口进行测试,这里,我使用Apache JMeter对上述接口进行测试。

在Jmeter中,我将线程的并发度设置为3,接下来的配置以下所示。

以HTTP GET请求的方式来并发访问提交订单的接口。此时,运行JMeter来访问接口,命令行会打印出下面的日志信息。

库存扣减成功,当前库存为:49
库存扣减成功,当前库存为:49
库存扣减成功,当前库存为:49

这里,咱们明明请求了3次,也就是说,提交了3笔订单,为何扣减后的库存都是同样的呢?这种现象在电商领域有一个专业的名词叫作 “超卖”

若是一个大型的高并发电商系统,好比淘宝、天猫、京东等,出现了超卖现象,那损失就没法估量了!架构设计和开发电商系统的人员估计就要统统下岗了。因此,做为技术人员,咱们必定要严谨的对待技术,严格作好系统的每个技术环节。

JVM中提供的锁

JVM中提供的synchronized和Lock锁,相信你们并不陌生了,不少小伙伴都会使用这些锁,也能使用这些锁来实现一些简单的线程互斥功能。那么,做为立志要成为架构师的你,是否了解过JVM锁的底层原理呢?

JVM锁原理

说到JVM锁的原理,咱们就不得不限说说Java中的对象头了。

Java中的对象头

每一个Java对象都有对象头。若是是⾮数组类型,则⽤2个字宽来存储对象头,若是是数组,则会⽤3个字宽来存储对象头。在32位处理器中,⼀个字宽是32位;在64位虚拟机中,⼀个字宽是64位。

对象头的内容以下表 。

长度 内容 说明
32/64bit Mark Word 存储对象的hashCode或锁信息等
32/64bit Class Metadata Access 存储到对象类型数据的指针
32/64bit Array length 数组的长度(若是是数组)

Mark Work的格式以下所示。

锁状态 29bit或61bit 1bit是不是偏向锁? 2bit锁标志位
无锁 0 01
偏向锁 线程ID 1 01
轻量级锁 指向栈中锁记录的指针 此时这一位不用于标识偏向锁 00
重量级锁 指向互斥量(重量级锁)的指针 此时这一位不用于标识偏向锁 10
GC标记 此时这一位不用于标识偏向锁 11

能够看到,当对象状态为偏向锁时, Mark Word 存储的是偏向的线程ID;当状态为轻量级锁时, Mark Word 存储的是指向线程栈中 Lock Record 的指针;当状态为重量级锁时, Mark Word 为指向堆中的monitor对象的指针 。

有关Java对象头的知识,参考《深刻浅出Java多线程》。

JVM锁原理

简单点来讲,JVM中锁的原理以下。

在Java对象的对象头上,有一个锁的标记,好比,第一个线程执行程序时,检查Java对象头中的锁标记,发现Java对象头中的锁标记为未加锁状态,因而为Java对象进行了加锁操做,将对象头中的锁标记设置为锁定状态。第二个线程执行一样的程序时,也会检查Java对象头中的锁标记,此时会发现Java对象头中的锁标记的状态为锁定状态。因而,第二个线程会进入相应的阻塞队列中进行等待。

这里有一个关键点就是Java对象头中的锁标记如何实现。

JVM锁的短板

JVM中提供的synchronized和Lock锁都是JVM级别的,你们都知道,当运行一个Java程序时,会启动一个JVM进程来运行咱们的应用程序。synchronized和Lock在JVM级别有效,也就是说,synchronized和Lock在同一Java进程内有效。若是咱们开发的应用程序是分布式的,那么只是使用synchronized和Lock来解决分布式场景下的高并发问题,就会显得有点力不从心了。

synchronized和Lock支持JVM同一进程内部的线程互斥

synchronized和Lock在JVM级别可以保证高并发程序的互斥,咱们可使用下图来表示。

可是,当咱们将应用程序部署成分布式架构,或者将应用程序在不一样的JVM进程中运行时,synchronized和Lock就不能保证分布式架构和多JVM进程下应用程序的互斥性了。

synchronized和Lock不能实现多JVM进程之间的线程互斥

分布式架构和多JVM进程的本质都是将应用程序部署在不一样的JVM实例中,也就是说,其本质仍是多JVM进程。

分布式锁

咱们在实现分布式锁时,能够参照JVM锁实现的思想,JVM锁在为对象加锁时,经过改变Java对象的对象头中的锁的标志位来实现,也就是说,全部的线程都会访问这个Java对象的对象头中的锁标志位。

咱们一样以这种思想来实现分布式锁,当咱们将应用程序进行拆分并部署成分布式架构时,全部应用程序中的线程访问共享变量时,都到同一个地方去检查当前程序的临界区是否进行了加锁操做,而是否进行了加锁操做,咱们在统一的地方使用相应的状态来进行标记。

能够看到,在分布式锁的实现思想上,与JVM锁相差不大。而在实现分布式锁中,保存加锁状态的服务可使用MySQL、Redis和Zookeeper实现。

可是,在互联网高并发环境中, 使用Redis实现分布式锁的方案是使用的最多的。 接下来,咱们就使用Redis来深刻解密分布式锁的架构设计。

Redis如何实现分布式锁

Redis命令

在Redis中,有一个不常使用的命令以下所示。

SETNX key value

这条命令的含义就是“SET if Not Exists”,即不存在的时候才会设置值。

只有在key不存在的状况下,将键key的值设置为value。若是key已经存在,则SETNX命令不作任何操做。

这个命令的返回值以下。

  • 命令在设置成功时返回1。
  • 命令在设置失败时返回0。

因此,咱们在分布式高并发环境下,可使用Redis的SETNX命令来实现分布式锁。假设此时有线程A和线程B同时访问临界区代码,假设线程A首先执行了SETNX命令,并返回结果1,继续向下执行。而此时线程B再次执行SETNX命令时,返回的结果为0,则线程B不能继续向下执行。只有当线程A执行DELETE命令将设置的锁状态删除时,线程B才会成功执行SETNX命令设置加锁状态后继续向下执行。

引入分布式锁

了解了如何使用Redis中的命令实现分布式锁后,咱们就能够对下单接口进行改造了,加入分布式锁,以下所示。

/**
* 为了演示方便,我这里就简单定义了一个常量做为商品的id
* 实际工做中,这个商品id是前端进行下单操做传递过来的参数
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //经过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
    //实际上,value能够为任意的字符换
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //没有拿到锁,返回下单失败
    if(!isLock){
        return "failure";
    }
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        stock -= 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        logger.debug("库存扣减成功,当前库存为:{}", stock);
    }else{
        logger.debug("库存不足,扣减库存失败");
        throw new OrderException("库存不足,扣减库存失败");
    }
    //业务执行完成,删除PRODUCT_ID key
    stringRedisTemplate.delete(PRODUCT_ID);
    return "success";
}

那么,在上述代码中,咱们加入了分布式锁的操做,那上述代码是否可以在高并发场景下保证业务的原子性呢?答案是能够保证业务的原子性。可是,在实际场景中,上面实现分布式锁的代码是不可用的!!

假设当线程A首先执行stringRedisTemplate.opsForValue()的setIfAbsent()方法返回true,继续向下执行,正在执行业务代码时,抛出了异常,线程A直接退出了JVM。此时,stringRedisTemplate.delete(PRODUCT_ID);代码还没来得及执行,以后全部的线程进入提交订单的方法时,调用stringRedisTemplate.opsForValue()的setIfAbsent()方法都会返回false。致使后续的全部下单操做都会失败。这就是分布式场景下的死锁问题。

因此,上述代码中实现分布式锁的方式在实际场景下是不可取的!!

引入try-finally代码块

说到这,相信小伙伴们都可以想到,使用try-finall代码块啊,接下来,咱们为下单接口的方法加上try-finally代码块。

/**
* 为了演示方便,我这里就简单定义了一个常量做为商品的id
* 实际工做中,这个商品id是前端进行下单操做传递过来的参数
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //经过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
    //实际上,value能够为任意的字符换
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //没有拿到锁,返回下单失败
    if(!isLock){
        return "failure";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("库存扣减成功,当前库存为:{}", stock);
        }else{
            logger.debug("库存不足,扣减库存失败");
            throw new OrderException("库存不足,扣减库存失败");
        }
    }finally{
         //业务执行完成,删除PRODUCT_ID key
    	stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

那么,上述代码是否真正解决了死锁的问题呢?咱们在写代码时,不能只盯着代码自己,以为上述代码没啥问题了。实际上,生产环境是很是复杂的。若是线程在成功加锁以后,执行业务代码时,还没来得及执行删除锁标志的代码,此时,服务器宕机了,程序并无优雅的退出JVM。也会使得后续的线程进入提交订单的方法时,因没法成功的设置锁标志位而下单失败。因此说,上述的代码仍然存在问题。

引入Redis超时机制

在Redis中能够设置缓存的自动过时时间,咱们能够将其引入到分布式锁的实现中,以下代码所示。

/**
* 为了演示方便,我这里就简单定义了一个常量做为商品的id
* 实际工做中,这个商品id是前端进行下单操做传递过来的参数
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //经过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
    //实际上,value能够为任意的字符换
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //没有拿到锁,返回下单失败
    if(!isLock){
        return "failure";
    }
    try{
        stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("库存扣减成功,当前库存为:{}", stock);
        }else{
            logger.debug("库存不足,扣减库存失败");
            throw new OrderException("库存不足,扣减库存失败");
        }
    }finally{
         //业务执行完成,删除PRODUCT_ID key
    	stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

在上述代码中,咱们加入了以下一行代码来为Redis中的锁标志设置过时时间。

stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);

此时,咱们设置的过时时间为30秒。

那么问题来了,这样是否就真正的解决了问题呢?上述程序就真的没有坑了吗?答案是仍是有坑的!!

“坑位”分析

咱们在下单操做的方法中为分布式锁引入了超时机制,此时的代码仍是没法真正避免死锁的问题,那“坑位”到底在哪里呢?试想,当程序执行完stringRedisTemplate.opsForValue().setIfAbsent()方法后,正要执行stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS)代码时,服务器宕机了,你还别说,生产坏境的状况很是复杂,就是这么巧,服务器就宕机了。此时,后续请求进入提交订单的方法时,都会由于没法成功设置锁标志而致使后续下单流程没法正常执行。

既然咱们找到了上述代码的“坑位”,那咱们如何将这个”坑“填上?如何解决这个问题呢?别急,Redis已经提供了这样的功能。咱们能够在向Redis中保存数据的时候,能够同时指定数据的超时时间。因此,咱们能够将代码改形成以下所示。

/**
* 为了演示方便,我这里就简单定义了一个常量做为商品的id
* 实际工做中,这个商品id是前端进行下单操做传递过来的参数
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //经过stringRedisTemplate来调用Redis的SETNX命令,key为商品的id,value为字符串“binghe”
    //实际上,value能够为任意的字符换
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe", 30, TimeUnit.SECONDS);
   //没有拿到锁,返回下单失败
    if(!isLock){
        return "failure";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("库存扣减成功,当前库存为:{}", stock);
        }else{
            logger.debug("库存不足,扣减库存失败");
            throw new OrderException("库存不足,扣减库存失败");
        }
    }finally{
         //业务执行完成,删除PRODUCT_ID key
    	stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}

在上述代码中,咱们在向Redis中设置锁标志位的时候就设置了超时时间。此时,只要向Redis中成功设置了数据,则即便咱们的业务系统宕机,Redis中的数据过时后,也会自动删除。后续的线程进入提交订单的方法后,就会成功的设置锁标志位,并向下执行正常的下单流程。

到此,上述的代码基本上在功能角度解决了程序的死锁问题,那么,上述程序真的就完美了吗?哈哈,不少小伙伴确定会说不完美!确实,上面的代码还不是完美的,那你们知道哪里不完美吗?接下来,咱们继续分析。

在开发集成角度分析代码

在咱们开发公共的系统组件时,好比咱们这里说的分布式锁,咱们确定会抽取一些公共的类来完成相应的功能来供系统使用。

这里,假设咱们定义了一个RedisLock接口,以下所示。

public interface RedisLock{
    //加锁操做
    boolean tryLock(String key, long timeout, TimeUnit unit);
    //解锁操做
    void releaseLock(String key);
}

接下来,使用RedisLockImpl类实现RedisLock接口,提供具体的加锁和解锁实现,以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        return stringRedisTemplate.opsForValue().setIfAbsent(key, "binghe", timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        stringRedisTemplate.delete(key);
    }
}

在开发集成的角度来讲,当一个线程从上到下执行时,首先对程序进行加锁操做,而后执行业务代码,执行完成后,再进行释放锁的操做。理论上,加锁和释放锁时,操做的Redis Key都是同样的。可是,若是其余开发人员在编写代码时,并无调用tryLock()方法,而是直接调用了releaseLock()方法,而且他调用releaseLock()方法传递的key与你调用tryLock()方法传递的key是同样的。那此时就会出现问题了,他在编写代码时,硬生生的将你加的锁释放了!!!

因此,上述代码是不安全的,别人可以随随便便的将你加的锁删除,这就是锁的误删操做,这是很是危险的,因此,上述的程序存在很严重的问题!!

那如何实现只有加锁的线程才能进行相应的解锁操做呢? 继续向下看。

如何实现加锁和解锁的归一化?

什么是加锁和解锁的归一化呢?简单点来讲,就是一个线程执行了加锁操做后,后续必须由这个线程执行解锁操做,加锁和解锁操做由同一个线程来完成。

为了解决只有加锁的线程才能进行相应的解锁操做的问题,那么,咱们就须要将加锁和解锁操做绑定到同一个线程中,那么,如何将加锁操做和解锁操做绑定到同一个线程呢?其实很简单,相信不少小伙伴都想到了—— 使用ThreadLocal实现 。没错,使用ThreadLocal类确实可以解决这个问题。

此时,咱们将RedisLockImpl类的代码修改为以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        String uuid = UUID.randomUUID().toString();
        threadLocal.set(uuid);
        return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操做
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
         	stringRedisTemplate.delete(key);   
        }
    }
}

上述代码的主要逻辑为:在对程序执行尝试加锁操做时,首先生成一个uuid,将生成的uuid绑定到当前线程,并将传递的key参数操做Redis中的key,生成的uuid做为Redis中的Value,保存到Redis中,同时设置超时时间。当执行解锁操做时,首先,判断当前线程中绑定的uuid是否和Redis中存储的uuid相等,只有两者相等时,才会执行删除锁标志位的操做。这就避免了一个线程对程序进行了加锁操做后,其余线程对这个锁进行了解锁操做的问题。

继续分析

咱们将加锁和解锁的方法改为以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    private String lockUUID;
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        String uuid = UUID.randomUUID().toString();
        threadLocal.set(uuid);
        lockUUID = uuid;
        return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操做
        if(lockUUID.equals(stringRedisTemplate.opsForValue().get(key))){
         	stringRedisTemplate.delete(key);   
        }
    }
}

相信不少小伙伴都会看出上述代码存在什么问题了!! 没错,那就是 线程安全的问题。

因此,这里,咱们须要使用ThreadLocal来解决线程安全问题。

可重入性分析

在上面的代码中,当一个线程成功设置了锁标志位后,其余的线程再设置锁标志位时,就会返回失败。还有一种场景就是在提交订单的接口方法中,调用了服务A,服务A调用了服务B,而服务B的方法中存在对同一个商品的加锁和解锁操做。

因此,服务B成功设置锁标志位后,提交订单的接口方法继续执行时,也不能成功设置锁标志位了。也就是说,目前实现的分布式锁没有可重入性。

这里,就存在可重入性的问题了。咱们但愿设计的分布式锁 具备可重入性 ,那什么是可重入性呢?简单点来讲,就是同一个线程,可以屡次获取同一把锁,而且可以按照顺序进行解决操做。

其实,在JDK 1.5以后提供的锁不少都支持可重入性,好比synchronized和Lock。

如何实现可重入性呢?

映射到咱们加锁和解锁方法时,咱们如何支持同一个线程可以屡次获取到锁(设置锁标志位)呢?能够这样简单的设计:若是当前线程没有绑定uuid,则生成uuid绑定到当前线程,而且在Redis中设置锁标志位。若是当前线程已经绑定了uuid,则直接返回true,证实当前线程以前已经设置了锁标志位,也就是说已经获取到了锁,直接返回true。

结合以上分析,咱们将提交订单的接口方法代码改形成以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }else{
            isLocked = true;   
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操做
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
         	stringRedisTemplate.delete(key);   
        }
    }
}

这样写看似没有啥问题,可是你们细想一下,这样写就真的OK了吗?

可重入性的问题分析

既然上面分布式锁的可重入性是存在问题的,那咱们就来分析下问题的根源在哪里!

假设咱们提交订单的方法中,首先使用RedisLock接口对代码块添加了分布式锁,在加锁后的代码中调用了服务A,而服务A中也存在调用RedisLock接口的加锁和解锁操做。而屡次调用RedisLock接口的加锁操做时,只要以前的锁没有失效,则会直接返回true,表示成功获取锁。也就是说,不管调用加锁操做多少次,最终只会成功加锁一次。而执行完服务A中的逻辑后,在服务A中调用RedisLock接口的解锁方法,此时,会将当前线程全部的加锁操做得到的锁所有释放掉。

咱们可使用下图来简单的表示这个过程。

那么问题来了,如何解决可重入性的问题呢?

解决可重入性问题

相信不少小伙伴都可以想出使用计数器的方式来解决上面可重入性的问题,没错,就是使用计数器来解决。 总体流程以下所示。

那么,体如今程序代码上是什么样子呢?咱们来修改RedisLockImpl类的代码,以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }else{
            isLocked = true;   
        }
        //加锁成功后将计数器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操做
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //计数器减为0时释放锁
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key);      
            }
        }
    }
}

至此,咱们基本上解决了分布式锁的可重入性问题。

说到这里,我还要问你们一句,上面的解决问题的方案真的没问题了吗?

阻塞与非阻塞锁

在提交订单的方法中,当获取Redis分布式锁失败时,咱们直接返回了failure来表示当前请求下单的操做失败了。试想,在高并发环境下,一旦某个请求得到了分布式锁,那么,在这个请求释放锁以前,其余的请求调用下单方法时,都会返回下单失败的信息。在真实场景中,这是很是不友好的。咱们能够将后续的请求进行阻塞,直到当前请求释放锁后,再唤醒阻塞的请求得到分布式锁来执行方法。

因此,咱们设计的分布式锁须要支持 阻塞和非阻塞 的特性。

那么,如何实现阻塞呢?咱们可使用自旋来实现,继续修改RedisLockImpl的代码以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //若是获取锁失败,则自旋获取锁,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
        }else{
            isLocked = true;   
        }
        //加锁成功后将计数器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操做
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //计数器减为0时释放锁
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key);      
            }
        }
    }
}

在分布式锁的设计中,阻塞锁和非阻塞锁 是很是重要的概念,你们必定要记住这个知识点。

锁失效问题

尽管咱们实现了分布式锁的阻塞特性,可是还有一个问题是咱们不得不考虑的。那就是 锁失效 的问题。

当程序执行业务的时间超过了锁的过时时间会发生什么呢? 想必不少小伙伴都可以想到,那就是前面的请求没执行完,锁过时失效了,后面的请求获取到分布式锁,继续向下执行了,程序没法作到真正的互斥,没法保证业务的原子性了。

那如何解决这个问题呢?答案就是:咱们必须保证在业务代码执行完毕后,才能释放分布式锁。 方案是有了,那如何实现呢?

说白了,咱们须要在业务代码中,时不时的执行下面的代码来保证在业务代码没执行完时,分布式锁不会因超时而被释放。

springRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);

这里,咱们须要定义一个定时策略来执行上面的代码,须要注意的是:咱们不能等到30秒后再执行上述代码,由于30秒时,锁已经失效了。例如,咱们能够每10秒执行一次上面的代码。

有些小伙伴说,直接在RedisLockImpl类中添加一个while(true)循环来解决这个问题,那咱们就这样修改下RedisLockImpl类的代码,看看有没有啥问题。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //若是获取锁失败,则自旋获取锁,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
            //定义更新锁的过时时间
            while(true){
                Integer count = threadLocalInteger.get();
                //当前锁已经被释放,则退出循环
                if(count == 0 || count <= 0){
                    break;
                }
                springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
                try{
                    //每隔10秒执行一次
                    Thread.sleep(10000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }else{
            isLocked = true;   
        }
        //加锁成功后将计数器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操做
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //计数器减为0时释放锁
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key);      
            }
        }
    }
}

相信小伙伴们看了代码就会发现哪里有问题了:更新锁过时时间的代码确定不能这么去写。由于这么写会 致使当前线程在更新锁超时时间的while(true)循环中一直阻塞而没法返回结果。 因此,咱们不能将当前线程阻塞,须要异步执行定时任务来更新锁的过时时间。

此时,咱们继续修改RedisLockImpl类的代码,将定时更新锁超时的代码放到一个单独的线程中执行,以下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    
    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();
    
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
        	threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //若是获取锁失败,则自旋获取锁,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
            //启动新线程来执行定时任务,更新锁过时时间
           new Thread(new UpdateLockTimeoutTask(uuid, stringRedisTemplate)).start();
        }else{
            isLocked = true;   
        }
        //加锁成功后将计数器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //当前线程中绑定的uuid与Redis中的uuid相同时,再执行删除锁的操做
        String uuid = stringRedisTemplate.opsForValue().get(key);
        if(threadLocal.get().equals(uuid)){
            Integer count = threadLocalInteger.get();
            //计数器减为0时释放锁
            if(count == null || --count <= 0){
             	stringRedisTemplate.delete(key); 
                //获取更新锁超时时间的线程并中断
                long threadId = stringRedisTemplate.opsForValue().get(uuid);
                Thread updateLockTimeoutThread = ThreadUtils.getThreadByThreadId(threadId);
                if(updateLockTimeoutThread != null){
                     //中断更新锁超时时间的线程
                    updateLockTimeoutThread.interrupt();
                    stringRedisTemplate.delete(uuid);   
                }
            }
        }
    }
}

建立UpdateLockTimeoutTask类来执行更新锁超时的时间。

public class UpdateLockTimeoutTask implements Runnable{
    //uuid
    private long uuid;
    private StringRedisTemplate stringRedisTemplate;
    public UpdateLockTimeoutTask(long uuid, StringRedisTemplate stringRedisTemplate){
        this.uuid = uuid;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    @Override
    public void run(){
        //以uuid为key,当前线程id为value保存到Redis中
        stringRedisTemplate.opsForValue().set(uuid, Thread.currentThread().getId());
         //定义更新锁的过时时间
        while(true){
            springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
            try{
                //每隔10秒执行一次
                Thread.sleep(10000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}

接下来,咱们定义一个ThreadUtils工具类,这个工具类中有一个根据线程id获取线程的方法getThreadByThreadId(long threadId)。

public class ThreadUtils{
    //根据线程id获取线程句柄
    public static Thread getThreadByThreadId(long threadId){
        ThreadGroup group = Thread.currentThread().getThreadGroup();
        while(group != null){
            Thread[] threads = new Thread[(int)(group.activeCount() * 1.2)];
            int count = group.enumerate(threads, true);
            for(int i = 0; i < count; i++){
                if(threadId == threads[i].getId()){
                    return threads[i];
                }
            }
        }
    }
}

上述解决分布式锁失效的问题在分布式锁领域有一个专业的术语叫作 “异步续命” 。须要注意的是:当业务代码执行完毕后,咱们须要中止更新锁超时时间的线程。因此,这里,我对程序的改动是比较大的,首先,将更新锁超时的时间任务从新定义为一个UpdateLockTimeoutTask类,并将uuid和StringRedisTemplate注入到任务类中,在执行定时更新锁超时时间时,首先将当前线程保存到Redis中,其中Key为传递进来的uuid。

在首先获取分布式锁后,从新启动线程,并将uuid和StringRedisTemplate传递到任务类中执行任务。当业务代码执行完毕后,调用releaseLock()方法释放锁时,咱们会经过uuid从Redis中获取更新锁超时时间的线程id,并经过线程id获取到更新锁超时时间的线程,调用线程的interrupt()方法来中断线程。

此时,当分布式锁释放后,更新锁超时的线程就会因为线程中断而退出了。

实现分布式锁的基本要求

结合上述的案例,咱们能够得出实现分布式锁的基本要求:

  • 支持互斥性
  • 支持锁超时
  • 支持阻塞和非阻塞特性
  • 支持可重入性
  • 支持高可用

通用分布式解决方案

在互联网行业,分布式锁是一个绕不开的话题,同时,也有不少通用的分布式锁解决方案,其中,用的比较多的一种方案就是使用开源的Redisson框架来解决分布式锁问题。

有关Redisson分布式锁的使用方案你们能够参考《【高并发】你知道吗?你们都在使用Redisson实现分布式锁了!!

既然Redisson框架已经很牛逼了,咱们直接使用Redisson框架是否可以100%的保证分布式锁不出问题呢?答案是没法100%的保证。由于在分布式领域没有哪一家公司或者架构师可以保证100%的不出问题,就连阿里这样的大公司、阿里的首席架构师这样的技术大牛也不敢保证100%的不出问题。

在分布式领域,没法作到100%无端障,咱们追求的是几个9的目标,例如99.999%无端障。

CAP理论

在分布式领域,有一个很是重要的理论叫作CAP理论。

  • C:Consistency(一致性)
  • A:Availability(可用性)
  • P:Partition tolerance(分区容错性)

在分布式领域中,是必需要保证分区容错性的,也就是必需要保证“P”,因此,咱们只能保证CP或者AP。

这里,咱们可使用Redis和Zookeeper来进行简单的对比,咱们可使用Redis实现AP架构的分布式锁,使用Zookeeper实现CP架构的分布式锁。

  • 基于Redis的AP架构的分布式锁模型

在基于Redis实现的AP架构的分布式锁模型中,向Redis节点1写入数据后,会当即返回结果,以后在Redis中会以异步的方式来同步数据。

  • 基于Zookeeper的CP架构的分布式锁模型

在基于Zookeeper实现的CP架构的分布式模型中,向节点1写入数据后,会等待数据的同步结果,当数据在大多数Zookeeper节点间同步成功后,才会返回结果数据。

当咱们使用基于Redis的AP架构实现分布式锁时,须要注意一个问题,这个问题可使用下图来表示。

也就是Redis主从节点之间的数据同步失败,假设线程向Master节点写入了数据,而Redis中Master节点向Slave节点同步数据失败了。此时,另外一个线程读取的Slave节点中的数据,发现没有添加分布式锁,此时就会出现问题了!!!

因此,在设计分布式锁方案时,也须要注意Redis节点之间的数据同步问题。

红锁的实现

在Redisson框架中,实现了红锁的机制,Redisson的RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也能够用来将多个RLock对象关联为一个红锁,每一个RLock对象实例能够来自于不一样的Redisson实例。当红锁中超过半数的RLock加锁成功后,才会认为加锁是成功的,这就提升了分布式锁的高可用。

咱们可使用Redisson框架来实现红锁。

public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
	RLock lock1 = redisson1.getLock("lock1");
	RLock lock2 = redisson2.getLock("lock2");
	RLock lock3 = redisson3.getLock("lock3");
	RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
	try {
		// 同时加锁:lock1 lock2 lock3, 红锁在大部分节点上加锁成功就算成功。
		lock.lock();
		// 尝试加锁,最多等待100秒,上锁之后10秒自动解锁
		boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

其实,在实际场景中,红锁是不多使用的。这是由于使用了红锁后会影响高并发环境下的性能,使得程序的体验更差。因此,在实际场景中,咱们通常都是要保证Redis集群的可靠性。同时,使用红锁后,当加锁成功的RLock个数不超过总数的一半时,会返回加锁失败,即便在业务层面任务加锁成功了,可是红锁也会返回加锁失败的结果。另外,使用红锁时,须要提供多套Redis的主从部署架构,同时,这多套Redis主从架构中的Master节点必须都是独立的,相互之间没有任何数据交互。

高并发“黑科技”与致胜奇招

假设,咱们就是使用Redis来实现分布式锁,假设Redis的读写并发量在5万左右。咱们的商城业务须要支持的并发量在100万左右。若是这100万的并发所有打入Redis中,Redis极可能就会挂掉,那么,咱们如何解决这个问题呢?接下来,咱们就一块儿来探讨这个问题。

在高并发的商城系统中,若是采用Redis缓存数据,则Redis缓存的并发处理能力是关键,由于不少的前缀操做都须要访问Redis。而异步削峰只是基本的操做,关键仍是要保证Redis的并发处理能力。

解决这个问题的关键思想就是:分而治之,将商品库存分开放。

暗度陈仓

咱们在Redis中存储商品的库存数量时,能够将商品的库存进行“分割”存储来提高Redis的读写并发量。

例如,原来的商品的id为10001,库存为1000件,在Redis中的存储为(10001, 1000),咱们将原有的库存分割为5份,则每份的库存为200件,此时,咱们在Redia中存储的信息为(10001_0, 200),(10001_1, 200),(10001_2, 200),(10001_3, 200),(10001_4, 200)。

此时,咱们将库存进行分割后,每一个分割后的库存使用商品id加上一个数字标识来存储,这样,在对存储商品库存的每一个Key进行Hash运算时,得出的Hash结果是不一样的,这就说明,存储商品库存的Key有很大几率不在Redis的同一个槽位中,这就可以提高Redis处理请求的性能和并发量。

分割库存后,咱们还须要在Redis中存储一份商品id和分割库存后的Key的映射关系,此时映射关系的Key为商品的id,也就是10001,Value为分割库存后存储库存信息的Key,也就是10001_0,10001_1,10001_2,10001_3,10001_4。在Redis中咱们可使用List来存储这些值。

在真正处理库存信息时,咱们能够先从Redis中查询出商品对应的分割库存后的全部Key,同时使用AtomicLong来记录当前的请求数量,使用请求数量对从Redia中查询出的商品对应的分割库存后的全部Key的长度进行求模运算,得出的结果为0,1,2,3,4。再在前面拼接上商品id就能够得出真正的库存缓存的Key。此时,就能够根据这个Key直接到Redis中获取相应的库存信息。

同时,咱们能够将分隔的不一样的库存数据分别存储到不一样的Redis服务器中,进一步提高Redis的并发量。

移花接木

在高并发业务场景中,咱们能够直接使用Lua脚本库(OpenResty)从负载均衡层直接访问缓存。

这里,咱们思考一个场景:若是在高并发业务场景中,商品被瞬间抢购一空。此时,用户再发起请求时,若是系统由负载均衡层请求应用层的各个服务,再由应用层的各个服务访问缓存和数据库,其实,本质上已经没有任何意义了,由于商品已经卖完了,再经过系统的应用层进行层层校验已经没有太多意义了!!而应用层的并发访问量是以百为单位的,这又在必定程度上会下降系统的并发度。

为了解决这个问题,此时,咱们能够在系统的负载均衡层取出用户发送请求时携带的用户id,商品id和活动id等信息,直接经过Lua脚本等技术来访问缓存中的库存信息。若是商品的库存小于或者等于0,则直接返回用户商品已售完的提示信息,而不用再通过应用层的层层校验了。

写在最后

若是以为文章对你有点帮助,请微信搜索并关注「 冰河技术 」微信公众号,跟冰河学习高并发编程技术。

最后,附上并发编程须要掌握的核心技能知识图,祝你们在学习并发编程时,少走弯路。

相关文章
相关标签/搜索