缓存和分布式锁

在这里插入图片描述

@[toc]html

1、缓存

一、缓存使用

为了系统性能的提高,咱们通常都会将部分数据放入缓存中,加速访问。而db承担数据落盘工做。java

哪些教据适合放入緩存?node

  • 即时性、数据一致性要求不高的git

  • 访问量大且更新频率不高的数据(读多,写少)github

举例:电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据更新频率来定),后台若是发布一个商品,买家须要5分钟才能看到新的商品通常仍是能够接受的。redis

在这里插入图片描述

伪代码逻辑:算法

data = cache.load(id);//从缓存加载数据
if(data == null){
	data = db.loadid);//从数据库加载数据
	cache.put(id,data);//保存到cache中
}
retum data;
复制代码

注意:在开发中,凡是放入缓存中的数据都应该指定过时时间,使其能够在系统即便没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃致使的数据永久不一致题。spring

本地缓存: 适合单体应用 shell

在这里插入图片描述

分布式缓存-本地模式在分布式下的问题: 缓存一致性问题、拓展性问题、高可用问题 数据库

在这里插入图片描述

分布式缓存: 能够解决前面两个的不足,目前最常使用的是redis

在这里插入图片描述

二、整合 redis 做为缓存

须要建立一个 Spring Boot 项目来整合 Redis。若是尚未安装 Redis,那么 Redis 的安装能够参考:在CentOS中安装和使用Docker 这篇内容。或者使用 Windows 版本的 Redis 也是能够的。

一、配置pom 文件

SpringBoot 项目的 pom 文件中引入 redis 依赖,能够不用写版本号,使用SpringBoot的默认配置项:

<!-- 引入redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
复制代码

引入依赖以后,项目中就会有 RedisAutoConfiguration.java 自动配置类,能够进行 redis 的自动配置。 RedisAutoConfiguration.java 将 redis 的全部配置属性都放在 RedisProperties.java 类中。

二、配置application.yml文件:

spring: 
 redis:
 host: 192.168.56.10 # redis地址
 port: 6379 # 端口号,默认为6379.相同的话也能够不配
复制代码

三、测试 redis

RedisAutoConfiguration.java 类中,已经为咱们提供了RedisTemplate<Object, Object>StringRedisTemplate 两个类,来封装 redis 的操做,下面来使用 StringRedisTemplate 测试一下。

在测试类中添加下面的代码:

@Autowired
StringRedisTemplate stringRedisTemplate;

@Test
public void testStringRedisTemplate(){
    // 操做字符串
    ValueOperations<String, String> ops = stringRedisTemplate.opsForValue();

    // 保存
    ops.set("hello", "world_"+ UUID.randomUUID().toString());

    // 查询
    String hello = ops.get("hello");
    System.out.println("获取以前保存的数据:"+hello);
}
复制代码

测试输出结果:获取以前保存的数据:world_90bf25e1-2e84-4f50-b6e2-5eaab32b4175

还能够经过安装 redis 可视化工具 RedisDesktopManager 来查看以前保存的数据:

在这里插入图片描述

2、缓存失效问题

一、高并发下缓存失效问题-缓存穿透

在这里插入图片描述
缓存穿透: 指查询一个 必定不存在的数据,因为缓存是不命中,将去查询数据库,可是数据库也无此记录,咱们没有将此次查询的null写入缓存,这将致使这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义

风险: 利用不存在的数据进行攻击,数据库瞬时压力增大,最终致使崩溃

解决null结果缓存,并加入短暂过时时间

二、高并发下缓存失效问题-缓存雪崩

在这里插入图片描述

缓存雪崩: 缓存雪崩是指在咱们设置缓存时key采用了相同的过时时间,致使缓存在某一时刻同时失效,请求所有转发到DB,DB瞬时 压力太重雪崩。

解决原有的失效时间基础上增长一个随机值,好比1-5分钟随机,这样每个缓存的过时时间的重复率就会下降,就很难引起集体失效的事件。

三、高并发下缓存失效问题-缓存击穿

在这里插入图片描述

缓存击穿: • 对于一些设置了过时时间的key,若是这些key可能会在某些时间点被超高并发地访问,是一种很是“热点”的数据。 • 若是这个key在大量请求同时进来前正好失效,那么全部对这个key的数据查询都落到db,咱们称为缓存击穿。

解决加锁,大量并发只让一个去查,其余人等待,查到之后释放锁,其余人获取到锁,先查缓存,就会有数据,不用去db

3、缓存数据一致性

4、分布式锁

一、分布式下如何加锁?

先来看一个本地锁的例子:咱们有一个商品服务,每个服务都部署在一个独立的tomcat中,每个服务中都使用一个锁。假设目前有8个服务,则须要加8把锁,且这8把锁相互独立。

在这里插入图片描述
本地锁,只能锁住当前进程,因此咱们须要分布式锁

二、锁-时序问题

在加锁的时候,须要将设置查数据库和设置缓存这一步同时放入加锁的方法中,斗则会出现屡次查询数据库的状况,这是因为第一次查询数据库的时候,数据尚未放入缓存,而设置缓存也是须要时间的,在设置缓存的这段时间内,缓存中尚未数据,就有可能因为并发较高,致使屡次查询数据库,没有命中缓存,因此就须要就将设置缓存放到加锁查数据库的逻辑里。

在这里插入图片描述

三、分布式锁演进-基本原理

因为本地锁只能锁住当前进程,若是咱们在进行秒杀活动或者说抢优惠券活动的时候,若是只剩了1件商品或者1张优惠券,若是使用的是本地锁,同时多个服务一块请求获取数据,就有可能产生“超卖”的现象,为了不这种状况的发生,咱们就须要使用分布式锁。

咱们能够同时去一个地方“占坑(加锁)”,若是占到,就执行逻辑。不然就必须等待,直到释放锁。 “占坑(加锁)”能够去 redis,也能够去数据库,能够去任何服务都能访问的地。 若是没有获取到锁,则能够能够以自旋的方式进行等待。

在这里插入图片描述

一、分布式锁演进-V1,setnx("lock","1")

在这里插入图片描述

/** * 从数据库获取数据,使用redis的分布式锁 V1 * 问题: * 一、setnx占好了位,业务代码异常或者程序在页面过程 * 中宕机。没有执行删除锁逻辑,这就形成了死锁 * 解决: * 设置锁的自动过时,即便没有删除,会自动删除 * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV2() * * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV1() {
    //一、占分布式锁,去redis占锁,对用redis命令 set lock 1 NX
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
    if (lock){
        //加锁成功,执行业务
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //删除锁
        stringRedisTemplate.delete("lock");
        return dataFromDb;
    }else {
        //加锁失败...重试
        //休眠100ms重试
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}
复制代码

存在问题:

setnx占好了位,业务代码异常或者程序在执行过程当中宕机。没有执行删除锁逻辑,这就形成了死锁

如何解决:

设置锁的自动过时,即便没有删除,会自动删除

二、分布式锁演进-V2,setnx("lock","1")+设置锁过时时间

在这里插入图片描述

/** * 从数据库获取数据,使用redis的分布式锁 V2 * 问题: * 一、setnx设置好,正要去设置过时时间,宕机。又死锁了。 * 解决: * 设置过时时间和占位必须是原子的。redis支持使用 setnx ex 命令 (set lock 1 EX 30 NX 加锁和设置过时时间在一个语句中完成,设置30秒过时) * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV3() * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV2() {
    //一、占分布式锁,去redis占锁,对用redis命令 set lock 1 NX
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
    if (lock){
        //加锁成功,执行业务
        //二、设置过时时间
        stringRedisTemplate.expire("lock",30, TimeUnit.SECONDS);
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //删除锁
        stringRedisTemplate.delete("lock");
        return dataFromDb;
    }else {
        //加锁失败...重试
        //休眠100ms重试
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}
复制代码

问题:

setnx设置好,正要去设置过时时间,结果忽然断电,服务宕机。又死锁了。

解决:

设置过时时间和占位必须是原子的。redis支持使用 setnx ex 命令

三、分布式锁演进-V3,setnx ex 原子操做

在这里插入图片描述

/** * 从数据库获取数据,使用redis的分布式锁 V3 * 问题: * 一、删除锁直接删除的问题? * 若是因为业务时间很长,锁本身过时了,咱们直接删除,有可能把别人正在持有的锁删除了。 * 解决: * 占锁的时候,值指定为uuid,每一个人匹配是本身的锁才删除。 * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV4() * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV3() {
    //一、占分布式锁,去redis占锁,对用redis命令
    //二、设置过时时间,必须和加锁是同步的,原子的 set lock 1 EX 30 NX
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1", 30, TimeUnit.SECONDS);
    if (lock){
        //加锁成功,执行业务
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //删除锁
        stringRedisTemplate.delete("lock");
        return dataFromDb;
    }else {
        //加锁失败...重试
        //休眠100ms重试
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}
复制代码

问题:

删除锁直接删除的问题? 若是因为业务时间很长,锁本身过时了,咱们直接删除,有可能把别人正在持有的锁删除了。

解决:

占锁的时候,值指定为 uuid每一个人匹配是本身的锁才删除。

四、分布式锁演进-V4,setnx ex 原子操做+惟一锁值

在这里插入图片描述

/** * 从数据库获取数据,使用redis的分布式锁 V4 * 问题: * 一、若是正好判断是当前值,正要删除锁的时候,锁已通过期,别人已经设置到了新的值。那么咱们删除的是别人的锁 * 解决: * 删除锁必须保证原子性。使用redis+Lua脚本完成 * @see CategoryServiceImpl#getCatalogJsonFromDbWithRedisLockV5() * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV4() {
    //一、占分布式锁,去redis占锁,对用redis命令
    //二、设置过时时间,必须和加锁是同步的,原子的 set lock 1 EX 30 NX
    //使用uuid做为锁的值
    String uuid = UUID.randomUUID().toString();
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
    if (lock){
        //加锁成功,执行业务
        Map<String, List<Catelog2Vo>> dataFromDb = getDataFromDb();
        //删除锁前先进行获取,判断是否是本身的锁编号uuid,是的话再删除
        String lockValue = stringRedisTemplate.opsForValue().get("lock");
        if (uuid.equals(lockValue)){
            //删除本身的锁
            stringRedisTemplate.delete("lock");
        }
        return dataFromDb;
    }else {
        //加锁失败...重试
        //休眠100ms重试
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}
复制代码

问题:

若是正好判断是当前值,正要删除锁的时候,锁已通过期,别人已经设置到了新的值。那么咱们删除的是别人的锁。

也就是说,业务执行时间大于锁的过时时间,这个时候,删除的锁就不是以前业务的锁,而是后来业务的锁。

解决:

删除锁必须保证原子性。使用 redis+Lua脚本完成

五、分布式锁演进-V5,setnx ex 原子操做+惟一锁值+Lua脚本删除锁保证原子性

在这里插入图片描述

/** * 从数据库获取数据,使用redis的分布式锁 V5 * 保证加锁【占位+过时时间】和删除锁【判断+删除】的原子性。使用redis+Lua脚本完成 * 更难的事情,是锁的自动续期 * @return */
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDbWithRedisLockV5() {
    //一、占分布式锁,去redis占锁,对用redis命令
    //二、设置过时时间,必须和加锁是同步的,原子的 set lock 1 EX 30 NX
    //使用uuid做为锁的值
    String uuid = UUID.randomUUID().toString();
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
    if (lock){
        log.info("获取分布式锁成功....");
        Map<String, List<Catelog2Vo>> dataFromDb;
        try {
            //加锁成功,执行业务
            dataFromDb = getDataFromDb();
        } finally {
            //删除锁前先进行获取,判断是否是本身的锁编号uuid,是的话再删除
            //获取对比值+对比成删除==原子操做 使用lua脚本解锁
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            //删除锁,删除成功返回 1,删除失败返回 0
            Long lock1 = stringRedisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class),
                    Arrays.asList("lock"), uuid);
        }
        return dataFromDb;
    }else {
        log.info("获取分布式锁失败,等待重试....");
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //加锁失败...重试
        //休眠100ms重试
        //自旋的方式
        return getCatalogJsonFromDbWithRedisLockV1();
    }
}
复制代码

存在问题:

一、更难的事情,是锁的自动续期

二、使用前面 v1 - v5 的操做太麻烦,加锁解锁都须要本身完成,若是有不少锁则须要写不少重复的代码

如何解决:

使用封装好的 reids 分布式锁工具类,下一节来介绍

四、分布式锁-Redisson简介&整合

先来援引一段文档:

命令 SET resource-name anystring NX EX max-lock-time 是一种用 Redis 来实现锁机制的简单方法。

若是上述命令返回OK,那么客户端就能够得到锁(若是上述命令返回Nil,那么客户端能够在一段时间以后从新尝试),而且能够经过DEL命令来释放锁。

客户端加锁以后,若是没有主动释放,会在过时时间以后自动释放。

能够经过以下优化使得上面的锁系统变得更加鲁棒:

  • 不要设置固定的字符串,而是设置为随机的大字符串,能够称为token。
  • 经过脚步删除指定锁的key,而不是DEL命令。

上述优化方法会避免下述场景:a客户端得到的锁(键key)已经因为过时时间到了被redis服务器删除,可是这个时候a客户端还去执行DEL命令。而b客户端已经在a设置的过时时间以后从新获取了这个一样key的锁,那么a执行DEL就会释放了b客户端加好的锁。

解锁脚本的一个例子将相似于如下:

if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end
复制代码

注意: 上这种设计模式并不推荐用来实现redis分布式锁。应该参考the Redlock algorithm的实现,由于这个方法只是复杂一点,可是却能保证更好的使用效果。

一、回顾:单Redis实例实现分布式锁的正确方法

在尝试克服上述单实例设置的限制以前,让咱们先讨论一下在这种简单状况下实现分布式锁的正确作法,实际上这是一种可行的方案,尽管存在竞态,结果仍然是可接受的,另外,这里讨论的单实例加锁方法也是分布式加锁算法的基础。

获取锁使用命令:

SET resource_name my_random_value NX PX 30000
复制代码

这个命令仅在不存在key的时候才能被执行成功(NX选项),而且这个key有一个30秒的自动失效时间(PX属性)。这个key的值是“my_random_value”(一个随机值),这个值在全部的客户端必须是惟一的,全部同一key的获取者(竞争者)这个值都不能同样。

value的值必须是随机数主要是为了更安全的释放锁,释放锁的时候使用脚本告诉Redis:只有key存在而且存储的值和我指定的值同样才能告诉我删除成功。能够经过如下Lua脚本实现:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
复制代码

使用这种方式释放锁能够避免删除别的客户端获取成功的锁。举个例子:客户端A取得资源锁,可是紧接着被一个其余操做阻塞了,当客户端A运行完毕其余操做后要释放锁时,原来的锁早已超时而且被Redis自动释放,而且在这期间资源锁又被客户端B再次获取到。若是仅使用DEL命令将key删除,那么这种状况就会把客户端B的锁给删除掉。使用Lua脚本就不会存在这种状况,由于脚本仅会删除value等于客户端A的value的key(value至关于客户端的一个签名)。

这个随机字符串应该怎么设置?我认为它应该是从/dev/urandom产生的一个20字节随机数,可是我想你能够找到比这种方法代价更小的方法,只要这个数在你的任务中是惟一的就行。例如一种安全可行的方法是使用/dev/urandom做为RC4的种子和源产生一个伪随机流;一种更简单的方法是把以毫秒为单位的unix时间和客户端ID拼接起来,理论上不是彻底安全,可是在多数状况下能够知足需求.

key的失效时间,被称做“锁定有效期”。它不只是key自动失效时间,并且仍是一个客户端持有锁多长时间后能够被另一个客户端从新得到。

截至到目前,咱们已经有较好的方法获取锁和释放锁。基于Redis单实例,假设这个单实例老是可用,这种方法已经足够安全。如今让咱们扩展一下,假设Redis没有老是可用的保障。

二、Redlock算法

在Redis的分布式环境中,咱们假设有N个Redis master。这些节点彻底互相独立,不存在主从复制或者其余集群协调机制。以前咱们已经描述了在Redis单实例下怎么安全地获取和释放锁。咱们确保将在每(N)个实例上使用此方法获取和释放锁。在这个样例中,咱们假设有5个Redis master节点,这是一个比较合理的设置,因此咱们须要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。

为了取到锁,客户端应该执行如下操做:

  1. 获取当前Unix时间,以毫秒为单位。
  2. 依次尝试从N个实例,使用相同的key和随机值获取锁。在步骤2,当向Redis设置锁时,客户端应该设置一个网络链接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样能够避免服务器端Redis已经挂掉的状况下,客户端还在死死地等待响应结果。若是服务器端没有在规定时间内响应,客户端应该尽快尝试另一个Redis实例。
  3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就获得获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,而且使用的时间小于锁失效时间时,锁才算获取成功。
  4. 若是取到了锁,key的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
  5. 若是由于某些缘由,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取锁时间已经超过了有效时间),客户端应该在全部的Redis实例上进行解锁(即使某些Redis实例根本就没有加锁成功)。

一、这个算法是异步的么?

算法基于这样一个假设:虽然多个进程之间没有时钟同步,但每一个进程都以相同的时钟频率前进,时间差相对于失效时间来讲几乎能够忽略不计。这种假设和咱们的真实世界很是接近:每一个计算机都有一个本地时钟,咱们能够容忍多个计算机之间有较小的时钟漂移。

从这点来讲,咱们必须再次强调咱们的互相排斥规则:只有在锁的有效时间(在步骤3计算的结果)范围内客户端可以作完它的工做,锁的安全性才能获得保证(锁的实际有效时间一般要比设置的短,由于计算机之间有时钟漂移的现象)。.

想要了解更多关于须要时钟漂移间隙的类似系统, 这里有一个很是有趣的参考: Leases: an efficient fault-tolerant mechanism for distributed file cache consistency.

二、失败时重试

当客户端没法取到锁时,应该在一个随机延迟后重试,防止多个客户端在同时抢夺同一资源的锁(这样会致使脑裂,没有人会取到锁)。一样,客户端取得大部分Redis实例锁所花费的时间越短,脑裂出现的几率就会越低(必要的重试),因此,理想状况一下,客户端应该同时(并发地)向全部Redis发送SET命令。

须要强调,当客户端从大多数Redis实例获取锁失败时,应该尽快地释放(部分)已经成功取到的锁,这样其余的客户端就没必要非得等到锁过完“有效时间”才能取到(然而,若是已经存在网络分裂,客户端已经没法和Redis实例通讯,此时就只能等待key的自动释放了,等于被惩罚了)。

三、释放锁

释放锁比较简单,向全部的Redis实例发送释放锁命令便可,不用关心以前有没有从Redis实例成功获取到锁.

三、Redisson简介&整合

1. 概述

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不只提供了一系列的分布式的Java经常使用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者可以将精力更集中地放在处理业务逻辑上。

关于Redisson项目的详细介绍能够在官方网站找到。

每一个Redis服务实例都能管理多达1TB的内存。

可以完美的在云计算环境里使用,而且支持AWS ElastiCache主备版AWS ElastiCache集群版Azure Redis Cache阿里云(Aliyun)的云数据库Redis版

如下是Redisson的结构:

若是你如今正在使用其余的Redis的Java客户端,那么Redis命令和Redisson对象匹配列表 可以帮助你轻松的将现有代码迁徙到Redisson框架里来。

Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。

二、整合

一、方式一:使用 Redisson

一、在 pom 文件中引入依赖:

<!-- 引入 redisson 依赖 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.5</version>
</dependency>
复制代码

二、配置 redisson

使用配置类的方式配置

@Configuration
public class MyRedissonConfig {

    /** * 对全部的 Redisson 的使用都是经过 RedissonClient 对象 * @return * @throws IOException */
    @Bean(destroyMethod = "shutdown")
    RedissonClient redisson() throws IOException {
// // 默认链接地址 127.0.0.1:6379
// RedissonClient redisson = Redisson.create();

        // 一、建立配置
        Config config = new Config();
        // Redis url should start with redis:// or rediss:// (for SSL connection)
        config.useSingleServer().setAddress("redis://192.168.56.10:6379");

        // 二、根据 Config 建立出 RedissonClient 实例
        RedissonClient redisson = Redisson.create(config);

        return redisson;
    }
}
复制代码

三、测试

@Autowired
RedissonClient redissonClient;

@Test
public void testRedissonClient(){
    System.out.println(redissonClient);
}
复制代码

在这里插入图片描述

参考文档:

二、方式二:使用Redisson/Spring Boot Starter

一、在项目中添加依赖项:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.13.1</version>
</dependency>
复制代码

二、添加配置到application.settings配置文件

# common spring boot settings

spring.redis.database=redis
spring.redis.host=192.168.56.10
spring.redis.port=6379
spring.redis.password=
spring.redis.ssl=
spring.redis.timeout=
spring.redis.cluster.nodes=
spring.redis.sentinel.master=
spring.redis.sentinel.nodes=

# Redisson settings

#path to config - redisson.yaml
spring.redis.redisson.config=classpath:redisson.yaml
复制代码

三、经过带有RedissonClient接口或RedisTemplate/ ReactiveRedisTemplate对象的spring bean使用Redisson

@Autowired
RedissonClient redisson;
@ResponseBody
@GetMapping("/hello")
public String hello() {
    //一、获取一把锁,只要锁的名字同样,就是同一把锁
    RLock lock = redisson.getLock("my-lock");
    //二、加锁
    //lock.lock(); //阻塞式等待。默认加的锁都是30s时间
    
    // 加锁之后10秒钟自动解锁
    // 无需调用unlock方法手动解锁;在锁时间到了之后,不会自动续期,自动解锁时间必定要大于业务执行时间
    lock.lock(10, TimeUnit.SECONDS);
    try {
        System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
        Thread.sleep(30000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println("释放锁..." + Thread.currentThread().getId());
        lock.unlock();
    }
    return "hello";
}
复制代码

参考文档:

四、分布式锁-Redisson-lock锁测试

基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)反射式(Reactive)RxJava2标准的接口。

RLock lock = redisson.getLock("anyLock");
// 最多见的使用方法
lock.lock();
复制代码

你们都知道,若是负责储存这个分布式锁的Redisson节点宕机之后,并且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了不这种状况的发生,Redisson内部提供了一个监控锁的看门狗,它的做用是在Redisson实例被关闭前,不断的延长锁的有效期。默认状况下,看门狗的检查锁的超时时间是30秒钟,也能够经过修改Config.lockWatchdogTimeout来另行指定。

接下来我本身来测试一下,首先实现一个简单的测试接口:

@ResponseBody
@GetMapping("/hello")
public String hello() {
    //一、获取一把锁,只要锁的名字同样,就是同一把锁
    RLock lock = redisson.getLock("my-lock");
    //二、加锁
    lock.lock(); //阻塞式等待。默认加的锁都是30s时间
    try {
        //redisson解决了两个问题:
        //1)、锁的自动续期,若是业务执行时间超长,运行期间自动给锁续上新的30s,不用担忧业务时间长,锁自动过时被删掉
        //2)、加锁的业务只要运行完成,就不会给当前锁续期,即便不手动解锁,锁默认在30s后自动删除
        System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        //三、解锁 假设当前服务执行时宕机,解锁代码没有运行,redisson会不会出现死锁?
        System.out.println("释放锁..." + Thread.currentThread().getId());
        lock.unlock();
    }
    return "hello";
}
复制代码

同时启动两个不一样端口的相同服务,记做服务A、B。在请求A、B以后,手动关闭服务A,模拟遭遇宕机解锁代码没有执行的状况,看最后是否解锁,服务B是否能够得到锁:

在这里插入图片描述

在这里插入图片描述

从上面的执行结果中,能够看到,服务宕机,redisson依然解锁成功。

redisson解决了两个问题: 1)、锁的自动续期,若是业务执行时间超长,运行期间自动给锁续上新的30s,不用担忧业务时间长,锁自动过时被删掉 2)、加锁的业务只要运行完成,就不会给当前锁续期,即便不手动解锁,锁默认在30s后自动删除

这些都是基于看门狗实现的,写一节来了解一下看门狗的实现原理。

参考文档:

五、分布式锁-Redisson-lock看门狗原理-redisson如何解决死锁

Redisson还经过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。

// 加锁之后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
复制代码

下面咱们来测试一下,咱们设置10s自动解锁,设置业务执行时间是30s:

@ResponseBody
@GetMapping("/hello")
public String hello() {
    //一、获取一把锁,只要锁的名字同样,就是同一把锁
    RLock lock = redisson.getLock("my-lock");
    //二、加锁
    // 加锁之后10秒钟自动解锁
    lock.lock(10, TimeUnit.SECONDS);
    try {
        System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
        Thread.sleep(30000); //业务执行时间30s
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println("释放锁..." + Thread.currentThread().getId());
        lock.unlock();
    }
    return "hello";
}
复制代码

执行后的效果:

在这里插入图片描述
在这里插入图片描述

RLock对象彻底符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其余进程解锁则会抛出IllegalMonitorStateException错误

问题:lock.lock(10, TimeUnit.SECONDS);在锁时间到了之后,不会自动续期

  • 一、若是咱们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是咱们指定的时间
  • 二、若是咱们未指定锁的超时时间,就使用 lockWatchdogTimeout = 30000L;【lockWatchdogTimeout看门狗的默认时间】。只要占锁成功,就会启动一个定时任务【从新给锁设定过时时间,新的过时时间就是看门狗的默认时间】,每隔10s都会自动续期,续成30s,续期时间的间隔是【internalLockLeaseTime(看门狗时间) / 3L】 10s 续期一次

下面来看一下源码:

先看不设置过时时间的加锁方法:lock()

public void lock() {
    try {
        //leaseTime:-1,在后边的判断会用到;TimeUnit:null;是否可中断:false
        this.lock(-1L, (TimeUnit)null, false);
    } catch (InterruptedException var2) {
        throw new IllegalStateException();
    }
}

//看一下再点击来看一下 lock(long leaseTime, TimeUnit unit, boolean interruptibly) 方法的实现
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    //获取当前线程的id
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁,这个方法是重点,下面进入这个方法中
    Long ttl = this.tryAcquire(leaseTime, unit, threadId);
    ... //略
}

// 查看 tryAcquire 方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    // 进入 尝试获取异步 tryAcquireAsync 这个方法
    return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}

//查看 尝试获取异步 tryAcquireAsync 方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    //若是leaseTime不是-1,则进入这个逻辑,根据前面的代码知道lock()默认leaseTime=-1,因此lock()方法不进这个逻辑,因此设置自动过时时间的方法 lock.lock(10, TimeUnit.SECONDS) 是会进入这个逻辑的
    if (leaseTime != -1L) {
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        //获取一个 RFuture,和java中的Future是相似的, 设置锁的默认过时时间this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 这个是设置默认锁过时时间,也就是下面Config类中的lockWatchdogTimeout
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        //占锁成功,进行监听
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            //没有抛出异常说明,占锁成功
            if (e == null) {
                if (ttlRemaining == null) {
                    //启动一个定时任务【从新给锁设定过时时间,新的过时时间就是看门狗的默认时间】,每隔10s都会自动续期,续成30s,下面来看这个方法
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

// 对应上面 this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 中的时间
public Config() {
    ...
    this.lockWatchdogTimeout = 30000L;
	...
}

// 时间表到期续订方法
private void scheduleExpirationRenewal(long threadId) {
    RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
    RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        //进入续期方法
        this.renewExpiration();
    }
}

//续期方法
private void renewExpiration() {
    RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
    if (ee != null) {
        Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            public void run(Timeout timeout) throws Exception {
                RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                if (ent != null) {
                    Long threadId = ent.getFirstThreadId();
                    if (threadId != null) {
                        RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                        future.onComplete((res, e) -> {
                            if (e != null) {
                                RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                            } else {
                                if (res) {
                                    RedissonLock.this.renewExpiration();
                                }

                            }
                        });
                    }
                }
            }
            // this.internalLockLeaseTime / 3L 续期时间,在RedissonLock(CommandAsyncExecutor commandExecutor, String name)方法中能够看到internalLockLeaseTime就是 lockWatchdogTimeout看门狗的默认时间30s,因此是每隔10s续期一次,续成30s
        }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }
}

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    ...
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    ...
}
复制代码

再来设置过时时间的加锁方法:lock.lock(10, TimeUnit.SECONDS)

public void lock(long leaseTime, TimeUnit unit) {
    try {
        this.lock(leaseTime, unit, false);
    } catch (InterruptedException var5) {
        throw new IllegalStateException();
    }
}

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    //一样是进入tryAcquire尝试获取锁这个方法,和lock()方法同样
    Long ttl = this.tryAcquire(leaseTime, unit, threadId);
	...
}

//尝试获取锁
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
     // 进入 尝试获取异步 tryAcquireAsync 这个方法,和lock()方法同样
    return (Long)this.get(this.tryAcquireAsync(leaseTime, unit, threadId));
}

// 尝试获取异步
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1L) {
        // lock.lock(10, TimeUnit.SECONDS),进入这个逻辑
        return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e == null) {
                if (ttlRemaining == null) {
                    this.scheduleExpirationRenewal(threadId);
                }

            }
        });
        return ttlRemainingFuture;
    }
}

// 尝试获取异步,获得lua脚本
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    this.internalLockLeaseTime = unit.toMillis(leaseTime);
    return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
复制代码

参考文档:

六、分布式锁-Redisson-读写锁测试

读写锁测试,加读写锁能够保证必定能读到最新数据修改期间,写锁是一个排它锁(互斥锁),读锁是一个共享锁,写锁没释放,读写就必须等待。

  • 读+读:至关于无锁,并发读,只会在redis中记录好,全部当前的读锁。它们都会同时加锁成功
  • 写+读:等待写锁释放
  • 写+写:阻塞方式
  • 读+写:有读锁,写也须要等待
  • 只要有写的存在,都必须等待

实现一个write和read接口,分别用来测试写锁和读锁:

@GetMapping("/write")
@ResponseBody
public String writeValue(){
    RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");

    String s = "";
    RLock rLock = lock.writeLock();
    try {
        //一、修改数据加写锁,读数据加读锁
        rLock.lock();
        s = UUID.randomUUID().toString();
        stringRedisTemplate.opsForValue().set("writeValue", s);
        Thread.sleep(10000);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
    }

    return s;
}

@GetMapping("/read")
@ResponseBody
public String readValue(){
    RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");

    String s = "";
    //获取读锁
    RLock rLock = lock.readLock();
    try {
        rLock.lock();
        s = stringRedisTemplate.opsForValue().get("writeValue");
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
    }

    return s;
}
复制代码

测试效果:

在这里插入图片描述

参考文档:

七、分布式锁-Redisson-闭锁测试

redisson 的闭锁和 java 中的 java.util.concurrent.CountDownLatch 是相似的。

测试闭锁:

  • 一、模拟一个放假锁门的场景
  • 二、学校一共5个班,只有等5个班都没人了才能够锁学校大门
/** * 测试闭锁:锁门方法 */
@GetMapping("/lockdoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
    RCountDownLatch door = redisson.getCountDownLatch("door");
    door.trySetCount(5);
    door.await();//等待闭锁所有完成

    return "放假了";

}
/** * 模拟班级学生全都离开班级的方法 */
@GetMapping("/go/{id}")
@ResponseBody
public String go(@PathVariable("id") Long id) {
    RCountDownLatch door = redisson.getCountDownLatch("door");
    door.countDown();//每离开一个班就计数减一

    return id + "班的人都走了...";
}
复制代码

测试效果:

在这里插入图片描述

参考文档:

八、分布式锁-Redisson-信号量测试

测试信号量:相似于 java 中的java.util.concurrent.Semaphore

模拟车库停车:3个车位,同时只能有3辆车停,只有有车位了才能停车

/** * 车库停车 * @return * @throws InterruptedException */
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
    RSemaphore park = redisson.getSemaphore("park");
    park.acquire();//阻塞式的

    return "ok";
}
/** * 车位上的车离开 */
@GetMapping("/leave")
@ResponseBody
public String leave(){
    RSemaphore park = redisson.getSemaphore("park");
    park.release();//释放一个车位,释放一个信号量
    return "ok";
}
复制代码

应用:

好比信号量也能够用做分布式限流的场景,好比同时在线人数只容许100000人等。

参考文档:

5、Spring Cache

一、简介

Spring 从3.1开始定义了org.springframework.cache.Cacheorg.springframework.cache.CacheManager 接口来统一不一样的缓存技术;并支持使用 JCache (ISR-107) 注解简化咱们开发;

Cache 接口为缓存的组件规范定义,包含缓存的各类操做集合;Cache 接口下 Spring 提供了各类 xxCache 的实现;如 RedisCacheEhCacheCacheConcurrentMapCache等;

每次调用须要缓存功能的方法时, Spring会检查检查指定参数的指定的目标方法是否已经被调用过;若是有就直接从缓存中获取方法调用后的结果,若是没有就调用方法并缓存结果后返回给用户。下次调用直接从缓存中获取。

使用 Spring 缓存抽象时咱们须要关注如下两点:

  • 一、肯定方法须要被缓存以及他们的缓存策略

  • 二、从缓存中读取以前缓存存储的数据

二、基础概念

CacheManager 管理众多 Cache。缓存管理器是定义规则的,真正实际上处理缓存的是不一样的缓存组件。

代码结构图:

在这里插入图片描述

代码模块图:

在这里插入图片描述

三、SpringCache-整合

一、整合

一、引入依赖

spring-boot-starter-cache、spring-boot-starter-data-redis(使用redis做为缓存就要引入redis的依赖)

<!-- 引入 spring-boot-starter-cache 依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<!-- 引入redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <exclusions>
        <exclusion>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>
复制代码

二、写配置

(1)、自动配置了那些?

​ org.springframework.boot.autoconfigure.cache.CacheAutoConfiguration 缓存自动配置类 ​ org.springframework.boot.autoconfigure.cache.CacheProperties 全部在xml文件中配置的属性都封装在这里 ​ org.springframework.boot.autoconfigure.cache.CacheConfigurations 获取每一种类型的缓存 ​ org.springframework.boot.autoconfigure.cache.CacheConfigurations#getConfigurationClass 获得对应缓存的映射 ​ org.springframework.boot.autoconfigure.cache.CacheType 一个枚举类,封装了各类类型的缓存 ​ org.springframework.boot.autoconfigure.cache.RedisCacheConfiguration 使用redis做为缓存时的各类配置

`CacheAutoConfiguration 会导入 RedisCacheConfiguration`
`RedisCacheConfiguration 会自动装配好了redis缓存管理器 RedisCacheManager`
复制代码
(2)、咱们本身须要配置的内容?

配置使用redis做为缓存。在application.properties、或application.ymlbootstrap.properties配置中心中配置 spring.cache.type=redis

三、测试使用缓存

@Cacheable: Triggers cache population. 触发缓存保存 @CacheEvict: Triggers cache eviction. 触发删除缓存 @CachePut: Updates the cache without interfering with the method execution. 更新缓存,而不影响方法的执行 @Caching: Regroups multiple cache operations to be applied on a method. 从新组合要在一个方法上应用的多个缓存操做 @CacheConfig: Shares some common cache-related settings at class-level. 在类级别共享一些与缓存相关的常见设置

(1)、开启缓存功能:

在启动类 XxxApplication 上使用 @EnableCaching 注解,开启缓存功能

(2)、在方法上使用 @Cacheable 注解

只须要在须要缓存数据的方法上使用注解就能完成缓存操做

// @Cacheable(value = {"category"},key = "'level1Categorys'")
@Cacheable(value = {"category"},key = "#root.method.name") // #root.methodName spel表达式,使用调用的方法名做为缓存的key
@Override
public List<CategoryEntity> getLevel1Categorys() {

    long l = System.currentTimeMillis();
    List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("cat_level", 1));
    System.out.println("DB耗时:"+(System.currentTimeMillis()-l));
    return categoryEntities;
}
复制代码

二、注解:

对于缓存声明,Spring的缓存抽象提供了一组Java注释:

  • @Cacheable:触发缓存保存。
  • @CacheEvict:触发删除缓存。
  • @CachePut:更新缓存,而不影响方法的执行。
  • @Caching:从新组合要在一个方法上应用的多个缓存操做。
  • @CacheConfig:在类级别共享一些与缓存相关的常见设置。

在业务中每个须要缓存的数据都要指定放到对应的那个名字的缓存中。至关于缓存的分区,通常建议按照业务类型来划分。

(1)、@Cacheable

表明当前方法的结果须要缓存,若是缓存中有,方法不用调用。若是缓存中没有,会调用方法,最后将方法的结果放入缓存。

@Cacheable 的默认行为:

  • 若是缓存中有,方法不用调用
  • key默认自动生成,缓存名字::SimpleKey [](自动生成的key)
  • 缓存的value值,默认使用的是jdk的序列化机制,将序列化后的值存在redis中
  • 默认时间ttl=-1

若是咱们须要自定义属性,该怎么作呢?

  • 指定生成的缓存使用的key:key属性指定,使用spel表达式 SPEL表达式:docs.spring.io/spring/docs…
  • 指定缓存的数据的存活时间:配置文件中修改ttl,spring.cache.redis.time-to-live=3600000
  • 将数据保存为json格式
// @Cacheable(value = {"category"},key = "'level1Categorys'")
@Cacheable(value = {"category"},key = "#root.method.name") // #root.methodName spel表达式,使用调用的方法名做为缓存的key
@Override
public List<CategoryEntity> getLevel1Categorys() {

    long l = System.currentTimeMillis();
    List<CategoryEntity> categoryEntities = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("cat_level", 1));
    System.out.println("DB耗时:"+(System.currentTimeMillis()-l));
    return categoryEntities;
}
复制代码

(2)、@CacheEvict

@CacheEvict注解是支持缓存一致性——失效模式的注解。@CachePut是支持缓存一致性——双写模式的注解。

要让一个缓存在更新数据的时候失效,就须要使用@CacheEvict注解:

清空单个缓存: @CacheEvict(value = {"category"},key = "'getLevel1Categorys'")

/** * 级联更全部关联数据 * @CacheEvict:缓存一致性——失效模式 * @CachePut:缓存一致性——双写模式 * @param category */
@CacheEvict(value = {"category"},key = "'getLevel1Categorys'") //清空单个缓存
@Transactional // 开启事务
@Override
public void updateCascade(CategoryEntity category) {
    // 一、先更新当前表的内容
    this.updateById(category);
    //二、更新级联表的冗余内容
    categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
复制代码

清空多个缓存:

(1)、同时操做多个缓存:@Caching

@Caching(evict = { //清空多个缓存 @CacheEvict(value = {"category"},key = "'getLevel1Categorys'"), @CacheEvict(value = {"category"},key = "'getCatalogJson'") })

@Caching(evict = { //清空多个缓存
    @CacheEvict(value = {"category"},key = "'getLevel1Categorys'"),
    @CacheEvict(value = {"category"},key = "'getCatalogJson'")
})
@Transactional // 开启事务
@Override
public void updateCascade(CategoryEntity category) {
    // 一、先更新当前表的内容
    this.updateById(category);
    //二、更新级联表的冗余内容
    categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
复制代码

(2) 指定删除某个分区下的全部数据:@CacheEvict(value = {"category"},allEntries = true)

@CacheEvict(value = {"category"},allEntries = true) //清空整个分区的缓存

@CacheEvict(value = {"category"},allEntries = true) //清空整个分区的缓存
@Transactional // 开启事务
@Override
public void updateCascade(CategoryEntity category) {
// 一、先更新当前表的内容
this.updateById(category);
//二、更新级联表的冗余内容
categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());
}
复制代码

因此之后使用缓存能够定义以下规则:

  • 一、存储同一类型的数据,均可以指定成同一个分区分区名默认就是缓存的前缀,因此不须要须要设置缓存前缀 spring.cache.redis.key-prefix=CACHE_,这样缓存中的键值就是分区名::方法名
  • 二、指定删除某个分区下的全部数据: @CacheEvict(value = {"category"}, allEntries = true)

测试效果:

在这里插入图片描述

四、自定义缓存配置

自定义缓存配置,须要定义一个缓存配置类:

@EnableConfigurationProperties(CacheProperties.class) // 让 CacheProperties 的绑定生效
@Configuration
@EnableCaching // 开启缓存(配置在 XxxApplication 主类上也能够)
public class MyCacheConfig {

// @Autowired
// CacheProperties cacheProperties;

    /** * 使用缓存配置类后,缓存配置文件中设置的属性将会失效,好比: * spring.cache.redis.time-to-live=3600000 # 设置缓存存活时间,单位是ms * 因此须要另外在这里配置从新绑定 * * 一、原来和配置文件绑定的配置类是这样子的 * @ConfigurationProperties( * prefix = "spring.cache" * ) * public class CacheProperties { * * 二、要让它生效: * 1)、@EnableConfigurationProperties(CacheProperties.class) 让 CacheProperties 的绑定生效 * 2)、注入 CacheProperties 或者在配置方法中加上 CacheProperties 参数 * * @return */
    @Bean
    RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) {
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();

        //设置key的序列化
        config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
        //设置value的序列化,使用fastjson
        config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));

        CacheProperties.Redis redisProperties = cacheProperties.getRedis();
        // 将配置文件中的全部配置都生效
        if (redisProperties.getTimeToLive() != null) {
            config = config.entryTtl(redisProperties.getTimeToLive());
        }

        if (redisProperties.getKeyPrefix() != null) {
            config = config.prefixKeysWith(redisProperties.getKeyPrefix());
        }

        if (!redisProperties.isCacheNullValues()) {
            config = config.disableCachingNullValues();
        }

        if (!redisProperties.isUseKeyPrefix()) {
            config = config.disableKeyPrefix();
        }

        return config;
    }
}
复制代码

其余的缓存属性配置:

# 设置缓存存活时间,单位是ms
spring.cache.redis.time-to-live=3600000
# 设置缓存的前缀,若是指定了前缀就使用咱们指定的前缀,不然就默认使用缓存的名字做为前缀
spring.cache.redis.key-prefix=CACHE_
# 设置是否启用缓存前缀,默认是true
spring.cache.redis.use-key-prefix=true
# 是否缓存空值,设置为true能够防止缓存穿透
spring.cache.redis.cache-null-values=true
复制代码

自定义测试效果:

在这里插入图片描述

五、缓存穿透问题解决

在配置文件中配置容许缓存空值,解决缓存穿透问题

# 是否缓存空值,设置为true能够防止缓存穿透
spring.cache.redis.cache-null-values=true
复制代码

六、Spring-Cache的不足

原理:

CacheManagerRedisCacheManager) --建立--> CacheRedisCache)--> Cache负责缓存的读写操做

不足:

(1)、读模式

  • 缓存穿透:查询一个null数据。解决:缓存空数据,添加配置 spring.cache.redis.cache-null-values=true
  • 缓存击穿:大量并发进来同时查询一个正好过时的数据。解决:加锁。可是Spring-Cache默认put时是不加锁的,因此没有办法解决这个问题。可是能够设置 sync = true @Cacheable(value = xxx, key = xxx, sync = true),在查缓存的时候调用使用了同步的get方法org.springframework.data.redis.cache.RedisCache#get(java.lang.Object, java.util.concurrent.Callable) 获取到获取到空数据时在put中放一份空的数据。
  • 缓存雪崩:大量的key同时过时。解决:加随机时间。加上过时时间:spring.cache.redis.time-to-live=3600000

(2)、写模式(缓存与数据库一致)

  • 1)读写加锁:使用读多写少场景
  • 2)`引入Canal:感知到MySQL的更新就去更新缓存
  • 3)读多写多:直接去数据库查询

总结:

  • 常规数据(读多写少、即时性、一致性要求不高的数据):彻底可使用Spring-Cache;写模式:只要缓存设置了过时时间就足够了

  • 特殊数据:特殊设计


参考:

  1. www.redis.cn/topics/dist…
  2. github.com/redisson/re…
  3. docs.spring.io/spring/docs…
相关文章
相关标签/搜索