此次咱们来简单说说分布式锁,我记得过去我也过一篇JMM的内存一致性算法,就是说拿到锁的能够继续操做,没拿到的自旋等待。html
思路与场景node
咱们在Zookeeper中提到过度布式锁,这里咱们先用redis实现一个简单的分布式锁,这里是咱们一个简单的售卖减库存的小实例,剩余库存假设存在数据库内。web
@GetMapping(value = "/getLock") public String getLock() { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售卖成功,剩余" + realStock + ""); return "success"; }else{ System.out.println("剩余库存不足"); return "fail"; } }
这样简单的实现了一个售卖的过程,如今看来确实没什么问题的,可是若是是一个并发下的场景就可能会出现超卖的状况了,咱们来改造一下代码。redis
@GetMapping(value = "/getLock") public String getLock() { synchronized (this) { int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售卖成功,剩余" + realStock + ""); return "success"; } else { System.out.println("剩余库存不足"); return "fail"; } } }
貌似这回就能够了,能够抗住高并发了,可是新的问题又来了,咱们若是是分布式的场景下,synchronized关键字是不起做用的啊。也就是说仍是会出现超卖的状况的啊,咱们再来改造一下算法
@GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "xiaocai");//至关于咱们的setnx命令 if(!bool){ return "error"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售卖成功,剩余" + realStock + ""); stringRedisTemplate.delete(lockKey); return "success"; } else { System.out.println("剩余库存不足"); stringRedisTemplate.delete(lockKey); return "fail"; } }
此次咱们看来基本能够了,使用咱们的setnx命令来作一次惟一的限制,万一报错了呢?解锁怎么办?再来改造一下。spring
@GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "xiaocai", 10, TimeUnit.SECONDS);//至关于咱们的setnx命令 try { if (!bool) { return "error"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售卖成功,剩余" + realStock + ""); return "success"; } else { System.out.println("剩余库存不足"); return "fail"; } } finally { if (bool) { stringRedisTemplate.delete(lockKey); } } }
此次貌似真的能够了,能够加锁,最后在finally解锁,若是解锁仍是不成功,咱们还设置了咱们的超时时间,貌似完美了,咱们再来提出一个场景。数据库
就是什么意思呢?咱们的线程来争抢锁,拿到锁的线程开始执行,可是咱们并不知道什么时候执行完成,咱们只是设定了10秒自动释放掉锁,若是说咱们的线程10秒尚未结束,其它线程会拿到锁资源,开始执行代码,可是过了一段时间(蓝色线程还未执行完成),这时咱们的绿色线程执行完毕了,开始释放锁资源,他释放的其实已经不是他本身的锁了,他本身的锁超时了,自动释放了,实则绿色线程释放的蓝色的资源,这也就形成了释放其它的锁,其它的线程又会重复的拿到锁,重复执行该操做。明显有点乱了,这不合理,咱们来改善一下。json
@GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; String lockValue = UUID.randomUUID().toString(); Boolean bool = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);//至关于咱们的setnx命令 try { if (!bool) { return "error"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售卖成功,剩余" + realStock + ""); return "success"; } else { System.out.println("剩余库存不足"); return "fail"; } } finally { if (lockValue.equals(stringRedisTemplate.opsForValue().get(lockKey))) { stringRedisTemplate.delete(lockKey); } } }
此次再来看一下流程,咱们设置一个UUID,设置为锁的值,也就是说,每次上锁的UUID都是不一致的,咱们的线程A的锁此次只能由咱们的线程A来释放掉,不会形成释放其它锁的问题了,仍是上次的图,咱们回过头来看一下,10秒?真的合理吗?万一10秒尚未执行完成呢?有的人还会问,那设置100秒?万一执行到delete操做的时候,服务宕机了呢?是否是还要等待100秒才能够释放锁。别说那只是万一,咱们的代码但愿达到咱们能力范围以内的最严谨。此次来讲一下咱们本节的其中一个重点,Lua脚本,后面会去说,咱们来先用咱们此次博文的Redisson吧api
Redissonspringboot
刚才咱们提到了咱们锁的时间设置,多长才是合理的,100秒?可能宕机,形成等待100秒自动释放,1秒?线程可能执行不完,咱们可不能够这样来作呢?咱们设置一个30秒,或者说设置10秒,而后咱们给予一个固定时间来检查咱们的主线程是否执行完成,执行完成再释放咱们的锁,思路有了,可是代码实现起来并不简单,别着急,咱们已经有了现成的包供咱们使用的,就是咱们的Redisson,首先咱们来引入咱们的依赖,修改一下pom文件。
<!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.11.4</version> </dependency>
而后经过@Bean的方式注入容器,三种方式我都写在上面了。
@Bean public Redisson redisson(){ Config config = new Config(); //主从(单机) config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0); //哨兵 // config.useSentinelServers().setMasterName("mymaster"); // config.useSentinelServers().addSentinelAddress("redis://192.168.1.1:26379"); // config.useSentinelServers().addSentinelAddress("redis://192.168.1.2:26379"); // config.useSentinelServers().addSentinelAddress("redis://192.168.1..3:26379"); // config.useSentinelServers().setDatabase(0); // //集群 // config.useClusterServers() // .addNodeAddress("redis://192.168.0.1:8001") // .addNodeAddress("redis://192.168.0.2:8002") // .addNodeAddress("redis://192.168.0.3:8003") // .addNodeAddress("redis://192.168.0.4:8004") // .addNodeAddress("redis://192.168.0.5:8005") // .addNodeAddress("redis://192.168.0.6:8006"); // config.useSentinelServers().setPassword("xiaocai");//密码设置 return (Redisson) Redisson.create(config); }
若是咱们的是springboot也能够经过配置来实现的。
application.properties
## 由于springboot-data-redis 是用到了jedis,所已这里得配置 spring.redis.database=10 spring.redis.pool.max-idle=8 spring.redis.pool.min-idle=0 spring.redis.pool.max-active=8 spring.redis.pool.max-wait=-1 ## jedis 哨兵配置 spring.redis.sentinel.master=mymaster spring.redis.sentinel.nodes=192.168.1.241:26379,192.168.1.241:36379,192.168.1.241:46379 spring.redis.password=admin ## 关键地方 redisson spring.redis.redisson.config=classpath:redisson.json
redisson.json
## redisson.json 文件 { "sentinelServersConfig":{ "sentinelAddresses": ["redis://192.168.1.241:26379","redis://192.168.1.241:36379","redis://192.168.1.241:46379"], "masterName": "mymaster", "database": 0, "password":"admin" } }
这样咱们就创建了咱们的Redisson的链接了,咱们来看一下如何使用吧。
package com.redisclient.cluster; import org.redisson.Redisson; import org.redisson.api.RLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RedisCluster { @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private Redisson redisson; @GetMapping(value = "/getLock") public String getLock() { String lockKey = "lock"; RLock redissonLock = redisson.getLock(lockKey); try { redissonLock.lock(); int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("售卖成功,剩余" + realStock + ""); return "success"; } else { System.out.println("剩余库存不足"); return "fail"; } } finally { redissonLock.unlock(); } } }
使用也是超级简单的,Redisson还有重入锁功能等等,有兴趣的能够去Redisson查看,地址:https://redisson.org/ 国外的地址打开可能会慢一些。Redis的分布式锁使用就差很少说到这里了,咱们来回到咱们刚才说到的Lua脚本这里。
Lua脚本和管道
Lua脚本
lua脚本就是一个事务控制的过程,咱们能够在lua脚本中写一些列的命令,一次性的塞入到咱们的redis客户端,保证了原子性,要么都成功,要么都失败。好处在于减小与reidis的屡次链接,能够替代redis的事务操做以及保证咱们的原子性。
String luaString = "";//Lua脚本 jedis.eval(luaString, Arrays.asList("keysList"),Arrays.asList("valueList"));
脚本我就不写了(我也不熟悉),我来解释一下eval的三个参数,第一个是咱们的写好的脚本,而后咱们的脚本可能传参数的,也就是咱们KEYS[1]或者是ARGV[4],意思就是咱们的KEYS[1]就是咱们的ArrayList("keysList")中的第一项,ARGV[4]就是咱们的ArrayList("valueList")的第四项。
管道
管道和咱们的和咱们的Lua脚本差很少,不同就是管道不会保证咱们的事务,也就是说咱们如今塞给管道10条命令 ,咱们执行到第三条时报错了,后面的依然会执行,前面执行过的两条仍是生效的。虽然能够减小咱们的网络开销,也别一次塞太多命令进去,毕竟redis的是单线程的,不建议使用管道来操做redis,想深刻了解的能够参照https://www.runoob.com/redis/redis-pipelining.html
redis的分布式锁差很少就说这么多了,关键是实现思路,使用Redisson却是很简单的,还有咱们的Lua脚本和管道,Lua脚本能够保证事务,管道一次性能够执行多条命令,减小网络开销,但不建议使用,下次咱们来讲下,大厂用redis的一些使用注意事项和优化吧。
最进弄了一个公众号,小菜技术,欢迎你们的加入