<!-- more -->html
转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/04/24/Distributed_lock/java
分布式的 CAP 理论告诉咱们:node
任何一个分布式系统都没法同时知足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时知足两项。mysql
目前不少大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。基于 CAP理论,不少系统在设计之初就要对这三者作出取舍。在互联网领域的绝大多数的场景中,都须要牺牲强一致性来换取系统的高可用性,系统每每只须要保证最终一致性。linux
此处主要指集群模式下,多个相同服务同时开启.git
在许多的场景中,咱们为了保证数据的最终一致性,须要不少的技术方案来支持,好比分布式事务、分布式锁等。不少时候咱们须要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,经过 Java 提供的并发 API 咱们能够解决,可是在分布式环境下,就没有那么简单啦。github
基于乐观锁redis
利用主键惟一的特性,若是有多个请求同时提交到数据库的话,数据库会保证只有一个操做能够成功,那么咱们就能够认为操做成功的那个线程得到了该方法的锁,当方法执行完毕以后,想要释放锁的话,删除这条数据库记录便可。算法
上面这种简单的实现有如下几个问题:spring
固然,咱们也能够有其余方式解决上面的问题。
这个策略源于 mysql 的 mvcc 机制,使用这个策略其实自己没有什么问题,惟一的问题就是对数据表侵入较大,咱们要为每一个表设计一个版本号字段,而后写一条判断 sql 每次进行判断,增长了数据库操做的次数,在高并发的要求下,对数据库链接的开销也是没法忍受的。
基于悲观锁
在查询语句后面增长for update
,数据库会在查询过程当中给数据库表增长排他锁 (注意: InnoDB 引擎在加锁的时候,只有经过索引进行检索的时候才会使用行级锁,不然会使用表级锁。这里咱们但愿使用行级锁,就要给要执行的方法字段名添加索引,值得注意的是,这个索引必定要建立成惟一索引,不然会出现多个重载方法之间没法同时被访问的问题。重载方法的话建议把参数类型也加上。)。当某条记录被加上排他锁以后,其余线程没法再在该行记录上增长排他锁。
咱们能够认为得到排他锁的线程便可得到分布式锁,当获取到锁以后,能够执行方法的业务逻辑,执行完方法以后,经过connection.commit()
操做来释放锁。
这种方法能够有效的解决上面提到的没法释放锁和阻塞锁的问题。
for update
语句会在执行成功后当即返回,在执行失败时一直处于阻塞状态,直到成功。可是仍是没法直接解决数据库单点和可重入问题。
这里还可能存在另一个问题,虽然咱们对方法字段名使用了惟一索引,而且显示使用 for update 来使用行级锁。可是,MySQL 会对查询进行优化,即使在条件中使用了索引字段,可是否使用索引来检索数据是由 MySQL 经过判断不一样执行计划的代价来决定的,若是 MySQL 认为全表扫效率更高,好比对一些很小的表,它就不会使用索引,这种状况下 InnoDB 将使用表锁,而不是行锁。若是发生这种状况就悲剧了。。。
还有一个问题,就是咱们要使用排他锁来进行分布式锁的 lock,那么一个排他锁长时间不提交,就会占用数据库链接。一旦相似的链接变得多了,就可能把数据库链接池撑爆。
优势:简单,易于理解
缺点:会有各类各样的问题(操做数据库须要必定的开销,使用数据库的行级锁并不必定靠谱,性能不靠谱)
setnx 的含义就是 SET if Not Exists,其主要有两个参数 setnx(key, value)。该方法是原子的,若是 key 不存在,则设置当前 key 成功,返回 1;若是当前 key 已经存在,则设置当前 key 失败,返回 0。
expire 设置过时时间,要注意的是 setnx 命令不能设置 key 的超时时间,只能经过 expire() 来对 key 设置。
一、setnx(lockkey, 1) 若是返回 0,则说明占位失败;若是返回 1,则说明占位成功
二、expire() 命令对 lockkey 设置超时时间,为的是避免死锁问题。
三、执行完业务代码后,能够经过 delete 命令删除 key。
这个方案实际上是能够解决平常工做中的需求的,但从技术方案的探讨上来讲,可能还有一些能够完善的地方。好比,若是在第一步 setnx 执行成功后,在 expire() 命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题,因此若是要对其进行完善的话,可使用 redis 的 setnx()、get() 和 getset() 方法来实现分布式锁。
这个方案的背景主要是在 setnx() 和 expire() 的方案上针对可能存在的死锁问题,作了一些优化。
这个命令主要有两个参数 getset(key,newValue)。该方法是原子的,对 key 设置 newValue 这个值,而且返回 key 原来的旧值。假设 key 原来是不存在的,那么屡次执行这个命令,会出现下边的效果:
import cn.com.tpig.cache.redis.RedisService; import cn.com.tpig.utils.SpringUtils; //redis分布式锁 public final class RedisLockUtil { private static final int defaultExpire = 60; private RedisLockUtil() { // } /** * 加锁 * @param key redis key * @param expire 过时时间,单位秒 * @return true:加锁成功,false,加锁失败 */ public static boolean lock(String key, int expire) { RedisService redisService = SpringUtils.getBean(RedisService.class); long status = redisService.setnx(key, "1"); if(status == 1) { redisService.expire(key, expire); return true; } return false; } public static boolean lock(String key) { return lock2(key, defaultExpire); } /** * 加锁 * @param key redis key * @param expire 过时时间,单位秒 * @return true:加锁成功,false,加锁失败 */ public static boolean lock2(String key, int expire) { RedisService redisService = SpringUtils.getBean(RedisService.class); long value = System.currentTimeMillis() + expire; long status = redisService.setnx(key, String.valueOf(value)); if(status == 1) { return true; } long oldExpireTime = Long.parseLong(redisService.get(key, "0")); if(oldExpireTime < System.currentTimeMillis()) { //超时 long newExpireTime = System.currentTimeMillis() + expire; long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime))); if(currentExpireTime == oldExpireTime) { return true; } } return false; } public static void unLock1(String key) { RedisService redisService = SpringUtils.getBean(RedisService.class); redisService.del(key); } public static void unLock2(String key) { RedisService redisService = SpringUtils.getBean(RedisService.class); long oldExpireTime = Long.parseLong(redisService.get(key, "0")); if(oldExpireTime > System.currentTimeMillis()) { redisService.del(key); } } }
public void drawRedPacket(long userId) { String key = "draw.redpacket.userid:" + userId; boolean lock = RedisLockUtil.lock2(key, 60); if(lock) { try { //领取操做 } finally { //释放锁 RedisLockUtil.unLock(key); } } else { new RuntimeException("重复领取奖励"); } }
Redlock 是 Redis 的做者 antirez 给出的集群模式的 Redis 分布式锁,它基于 N 个彻底独立的 Redis 节点(一般状况下 N 能够设置成 5)。
算法的步骤以下:
可是,有一位分布式的专家写了一篇文章《How to do distributed locking》,质疑 Redlock 的正确性。
https://mp.weixin.qq.com/s/1bPLk_VZhZ0QYNZS8LkviA
https://blog.csdn.net/jek123456/article/details/72954106
优势:
性能高
缺点:
失效时间设置多长时间为好?如何设置的失效时间过短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。若是设置的时间太长,其余获取锁的线程就可能要平白的多等一段时间。
redisson 是 redis 官方的分布式锁组件。GitHub 地址:https://github.com/redisson/redisson
上面的这个问题 ——> 失效时间设置多长时间为好?这个问题在 redisson 的作法是:每得到一个锁时,只设置一个很短的超时时间,同时起一个线程在每次快要到超时时间时去刷新锁的超时时间。在释放锁的同时结束这个线程。
import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; public class DistributedLock implements Lock, Watcher{ private ZooKeeper zk; private String root = "/locks";//根 private String lockName;//竞争资源的标志 private String waitNode;//等待前一个锁 private String myZnode;//当前锁 private CountDownLatch latch;//计数器 private int sessionTimeout = 30000; private List<Exception> exception = new ArrayList<Exception>(); /** * 建立分布式锁,使用前请确认config配置的zookeeper服务可用 * @param config 127.0.0.1:2181 * @param lockName 竞争资源标志,lockName中不能包含单词lock */ public DistributedLock(String config, String lockName){ this.lockName = lockName; // 建立一个与服务器的链接 try { zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ // 建立根节点 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } /** * zookeeper节点的监视器 */ public void process(WatchedEvent event) { if(this.latch != null) { this.latch.countDown(); } } public void lock() { if(exception.size() > 0){ throw new LockException(exception.get(0)); } try { if(this.tryLock()){ System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true"); return; } else{ waitForLock(waitNode, sessionTimeout);//等待锁 } } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } } public boolean tryLock() { try { String splitStr = "_lock_"; if(lockName.contains(splitStr)) throw new LockException("lockName can not contains \\u000B"); //建立临时子节点 myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(myZnode + " is created "); //取出全部子节点 List<String> subNodes = zk.getChildren(root, false); //取出全部lockName的锁 List<String> lockObjNodes = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if(_node.equals(lockName)){ lockObjNodes.add(node); } } Collections.sort(lockObjNodes); System.out.println(myZnode + "==" + lockObjNodes.get(0)); if(myZnode.equals(root+"/"+lockObjNodes.get(0))){ //若是是最小的节点,则表示取得锁 return true; } //若是不是最小的节点,找到比本身小1的节点 String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1); waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1); } catch (KeeperException e) { throw new LockException(e); } catch (InterruptedException e) { throw new LockException(e); } return false; } public boolean tryLock(long time, TimeUnit unit) { try { if(this.tryLock()){ return true; } return waitForLock(waitNode,time); } catch (Exception e) { e.printStackTrace(); } return false; } private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException { Stat stat = zk.exists(root + "/" + lower,true); //判断比本身小一个数的节点是否存在,若是不存在则无需等待锁,同时注册监听 if(stat != null){ System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower); this.latch = new CountDownLatch(1); this.latch.await(waitTime, TimeUnit.MILLISECONDS); this.latch = null; } return true; } public void unlock() { try { System.out.println("unlock " + myZnode); zk.delete(myZnode,-1); myZnode = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public void lockInterruptibly() throws InterruptedException { this.lock(); } public Condition newCondition() { return null; } public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } } }
优势:
有效的解决单点问题,不可重入问题,非阻塞问题以及锁没法释放的问题。实现起来较为简单。
缺点:
性能上可能并无缓存服务那么高,由于每次在建立锁和释放锁的过程当中,都要动态建立、销毁临时节点来实现锁功能。ZK 中建立和删除节点只能经过 Leader 服务器来执行,而后将数据同步到全部的 Follower 机器上。还须要对 ZK的原理有所了解。
DD 写过相似文章,其实主要利用 Consul 的 Key / Value 存储 API 中的 acquire 和 release 操做来实现。
文章地址:http://blog.didispace.com/spring-cloud-consul-lock-and-semphore/
一、注意分布式锁的开销
二、注意加锁的粒度
三、加锁的方式
不管你身处一个什么样的公司,最开始的工做可能都须要从最简单的作起。不要提阿里和腾讯的业务场景 qps 如何大,由于在这样的大场景中你未必能亲自参与项目,亲自参与项目未必能是核心的设计者,是核心的设计者未必能独自设计。但愿你们能根据本身公司业务场景,选择适合本身项目的方案。
http://www.hollischuang.com/archives/1716
http://www.spring4all.com/question/158
https://www.cnblogs.com/PurpleDream/p/5559352.html