Spring Cloud 分布式环境下,同一个服务都是部署在不一样的机器上,这种状况没法像单体架构下数据一致性问题采用加锁就实现数据一致性问题,在高并发状况下,对于分布式架构显然是不合适的,针对这种状况咱们就须要用到分布式锁了。java
场景一:比较敏感的数据好比金额修改,同一时间只能有一我的操做,想象下2我的同时修改金额,一个加金额一个减金额,为了防止同时操做形成数据不一致,须要锁,若是是数据库须要的就是行锁或表锁,若是是在集群里,多个客户端同时修改一个共享的数据就须要分布式锁。git
场景二:好比多台机器均可以定时执行某个任务,若是限制任务每次只能被一台机器执行,不能重复执行,就能够用分布式锁来作标记。github
场景三:好比秒杀场景,要求并发量很高,那么同一件商品只能被一个用户抢到,那么就可使用分布式锁实现。redis
为何不使用数据库?算法
数据库是单点?搞两个数据库,数据以前双向同步。一旦挂掉快速切换到备库上。spring
没有失效时间?只要作一个定时任务,每隔必定时间把数据库中的超时数据清理一遍。sql
非阻塞的?搞一个while循环,直到insert成功再返回成功。数据库
非重入的?在数据库表中加个字段,记录当前得到锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,若是当前机器的主机信息和线程信息在数据库能够查到的话,直接把锁分配给他就能够了。浏览器
大量请求下数据库每每是系统的瓶颈,大量链接,而后sql查询,几乎全部时间都浪费到这些上面,因此每每状况下能内存操做就在内存操做,使用基于内存操做的Redis实现分布式锁,也能够根据需求选择ZooKeeper 来实现。缓存
经过 Redis 的 Redlock 和 ZooKeeper 来加锁,性能有了比较大的提高,通常状况咱们根据实际场景选择使用。
Redis实现分布式锁利用 SETNX
和 SETEX
基本命令主要有:
当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不作任何动做,并返回0。
分布式锁
分布式锁其实大白话,本质上要实现的目标(客户端)在redis中占一个位置,等到这个客户试用,别的人进来就必须得等着,等我试用完了,走了,你再来。 感受跟多线程锁同样,意思大体是同样的,多线程是针对单机的,在同一个Jvm中,可是分布式石锁,是跨机器的,多个进程不一样机器上发来得请求,去对同一个数据进行操做。
好比,分布式架构下的秒杀系统,几万人对10个商品进行抢购,10个商品存在redis中,就是表示10个位置,第一我的进来了,商品就剩9个了,第二我的进来就剩8个,在第一我的进来的时候,其余人必须等到10个商品数量成功减去1以后你才能进来。
这个过程当中第一我的进来的时候还没操做减1而后异常了,没有释放锁,而后后面人一直等待着,这就是死锁。真对这种状况能够设置超时时间,若是超过10s中仍是没出来,就让他超时失效。
redis中提供了 setnx(set if not exists)
指令
> setnx lock:codehole true -- 锁定 OK ... do something xxxx... 数量减1 > del lock:codehole -- 释放锁 (integer) 1 --成功
若是在减1期间发生异常 del 指令没有被调用 而后就一直等着,锁永远不会释放。
redis Redis 2.8 版本中提供了 setex(set if not exists) 指令
setnx 和 expire 两个指令构成一个原子操做
给锁加上一个过时时间
> setex lock:codehole true OK > expire lock:codehole 5 ... do something xxxx ... > del lock:codehole (integer) 1
SETEX 实现原理
经过 SETNX 设置 Key-Value 来得到锁,随即进入死循环,每次循环判断,若是存在 Key 则继续循环,若是不存在 Key,则跳出循环,当前任务执行完成后,删除 Key 以释放锁。
实现步骤
pom.xml 导入Redis依赖
<!-- redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.10</version> <scope>provided</scope> </dependency>
添加配置文件 application.yml:
server: port: 8080 spring: profiles: dev data: redis: # Redis数据库索引(默认为0) database: 0 # Redis服务器地址 host: 127.0.0.1 # Redis服务器链接端口 port: 6379 # Redis服务器链接密码(默认为空) password:
全局锁类
@Data public class Lock { /** * key名 */ private String name; /** * value值 */ private String value; public Lock(String name, String value) { this.name = name; this.value = value; } }
分布式锁类
@Slf4j @Component public class DistributedLockConfig { /** * 单个业务持有锁的时间30s,防止死锁 */ private final static long LOCK_EXPIRE = 30 * 1000L; /** * 默认30ms尝试一次 */ private final static long LOCK_TRY_INTERVAL = 30L; /** * 默认尝试20s */ private final static long LOCK_TRY_TIMEOUT = 20 * 1000L; private RedisTemplate template; public void setTemplate(RedisTemplate template) { this.template = template; } /** * 尝试获取全局锁 * * @param lock 锁的名称 * @return true 获取成功,false获取失败 */ public boolean tryLock(Lock lock) { return getLock(lock, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE); } /** * 尝试获取全局锁 * SETEX:能够设置超时时间 * * @param lock 锁的名称 * @param timeout 获取超时时间 单位ms * @return true 获取成功,false获取失败 */ public boolean tryLock(Lock lock, long timeout) { return getLock(lock, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE); } /** * 尝试获取全局锁 * * @param lock 锁的名称 * @param timeout 获取锁的超时时间 * @param tryInterval 多少毫秒尝试获取一次 * @return true 获取成功,false获取失败 */ public boolean tryLock(Lock lock, long timeout, long tryInterval) { return getLock(lock, timeout, tryInterval, LOCK_EXPIRE); } /** * 尝试获取全局锁 * * @param lock 锁的名称 * @param timeout 获取锁的超时时间 * @param tryInterval 多少毫秒尝试获取一次 * @param lockExpireTime 锁的过时 * @return true 获取成功,false获取失败 */ public boolean tryLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) { return getLock(lock, timeout, tryInterval, lockExpireTime); } /** * 操做redis获取全局锁 * * @param lock 锁的名称 * @param timeout 获取的超时时间 * @param tryInterval 多少ms尝试一次 * @param lockExpireTime 获取成功后锁的过时时间 * @return true 获取成功,false获取失败 */ public boolean getLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) { try { if (StringUtils.isEmpty(lock.getName()) || StringUtils.isEmpty(lock.getValue())) { return false; } long startTime = System.currentTimeMillis(); do { if (!template.hasKey(lock.getName())) { ValueOperations<String, String> ops = template.opsForValue(); ops.set(lock.getName(), lock.getValue(), lockExpireTime, TimeUnit.MILLISECONDS); return true; } else { //存在锁 log.debug("lock is exist!!!"); } //尝试超过了设定值以后直接跳出循环 if (System.currentTimeMillis() - startTime > timeout) { return false; } //每隔多长时间尝试获取 Thread.sleep(tryInterval); } while (template.hasKey(lock.getName())); } catch (InterruptedException e) { log.error(e.getMessage()); return false; } return false; } /** * 获取锁 * SETNX(SET If Not Exists):当且仅当 Key 不存在时,则能够设置,不然不作任何动做。 */ public Boolean getLockNoTime(Lock lock) { if (!StringUtils.isEmpty(lock.getName())) { return false; } // setIfAbsent 底层封装命令 是 setNX() boolean falg = template.opsForValue().setIfAbsent(lock.getName(), lock.getValue()); return false; } /** * 释放锁 */ public void releaseLock(Lock lock) { if (!StringUtils.isEmpty(lock.getName())) { template.delete(lock.getName()); } } }
测试方法
@RequestMapping("test") public String index() { distributedLockConfig.setTemplate(redisTemplate); Lock lock = new Lock("test", "test"); if (distributedLockConfig.tryLock(lock)) { try { //为了演示锁的效果,这里睡眠5000毫秒 System.out.println("执行方法"); Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } distributedLockConfig.releaseLock(lock); } return "hello world!"; }
开启两个浏览器窗口,执行方法,咱们能够看到两个浏览器在等待执行,当一个返回 hello world! 以后,若是没超时执行另外一个也会返回hello world! 两个方法彼此前后返回,说明分布式锁执行成功。
可是存在一个问题:
这段方法是先去查询key是否存在redis中,若是存在走循环,而后根据间隔时间去等待尝试获取,若是不存在则进行获取锁,若是等待时间超过超时时间返回false。
若是在集群环境下也会存在问题
假如在哨兵模式中 主节点获取到锁以后,数据没有同步到从节点主节点挂掉了,这样数据完整性不能保证,另外一个客户端请求过来,就会一把锁被两个客户端持有,会致使数据一致性出问题。
对此Redis中还提供了另一种实现分布式锁的方法 Redlock
Redlock是redis官方提出的实现分布式锁管理器的算法。这个算法会比通常的普通方法更加安全可靠。
为何选择红锁?
在集群中须要半数以上的节点赞成才能得到锁,保证了数据的完整性,不会由于主节点数据存在,主节点挂了以后没有同步到从节点,致使数据丢失。
Redlock 算法
使用场景
对于Redis集群模式尽可能采用这种分布式锁,保证高可用,数据一致性,就使用Redlock 分布式锁。
pom.xml 增长依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.7.0</version> </dependency>
获取锁后须要处理的逻辑
/** * 获取锁后须要处理的逻辑 */ public interface AquiredLockWorker<T> { T invokeAfterLockAquire() throws Exception; }
获取锁管理类
/** * 获取锁管理类 */ public interface DistributedLocker { /** * 获取锁 * @param resourceName 锁的名称 * @param worker 获取锁后的处理类 * @param <T> * @return 处理完具体的业务逻辑要返回的数据 * @throws UnableToAquireLockException * @throws Exception */ <T> T lock(String resourceName, AquiredLockWorker<T> worker) throws UnableToAquireLockException, Exception; <T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception; }
异常类
/** * 异常类 */ public class UnableToAquireLockException extends RuntimeException { public UnableToAquireLockException() { } public UnableToAquireLockException(String message) { super(message); } public UnableToAquireLockException(String message, Throwable cause) { super(message, cause); } }
获取RedissonClient链接类
/** * 获取RedissonClient链接类 */ @Component public class RedissonConnector { RedissonClient redisson; @PostConstruct public void init(){ redisson = Redisson.create(); } public RedissonClient getClient(){ return redisson; } }
分布式锁实现
@Component public class RedisLocker implements DistributedLocker{ private final static String LOCKER_PREFIX = "lock:"; @Autowired RedissonConnector redissonConnector; @Override public <T> T lock(String resourceName, AquiredLockWorker<T> worker) throws InterruptedException, UnableToAquireLockException, Exception { return lock(resourceName, worker, 100); } @Override public <T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception { RedissonClient redisson= redissonConnector.getClient(); RLock lock = redisson.getLock(LOCKER_PREFIX + resourceName); // Wait for 100 seconds seconds and automatically unlock it after lockTime seconds boolean success = lock.tryLock(100, lockTime, TimeUnit.SECONDS); if (success) { try { return worker.invokeAfterLockAquire(); } finally { lock.unlock(); } } throw new UnableToAquireLockException(); } }
测试方法
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10); for (int i = 0; i < 50; i++) { scheduledExecutorService.execute(new Worker()); } scheduledExecutorService.shutdown(); //任务 class Worker implements Runnable { public Worker() { } @Override public void run() { try { redisLocker.lock("tizz1100", new AquiredLockWorker<Object>() { @Override public Object invokeAfterLockAquire() { doTask(); return null; } }); } catch (Exception e) { } } void doTask() { System.out.println(Thread.currentThread().getName() + " ---------- " + LocalDateTime.now()); System.out.println(Thread.currentThread().getName() + " start"); Random random = new Random(); int _int = random.nextInt(200); System.out.println(Thread.currentThread().getName() + " sleep " + _int + "millis"); try { Thread.sleep(_int); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " end"); } }
参考资料:
https://blog.csdn.net/yue_201...