老铁们好,这里是java研究所。
今天咱们来探讨一下分布式锁的4种实现:
一、经过MySQL实现分布式锁
二、经过redis实现分布式锁
三、经过zookeeper实现分布式锁
四、经过etcd实现分布式锁java
如何确保共享资源在同一时刻只能被一个线程访问?
你们可能以为这个很简单吧,在一个jvm中,经过synchronized或者ReentrantLock是很容易实现的。
确实,单个jvm中确实没有问题。
可是,一般咱们的系统会采用集群的方式部署,此时集群中的每一个节点都是一个jvm环境,那么经过synchronized或者ReentrantLock是没法解决共享资源访问的问题了。
此时就要用到分布式锁了:分布式锁就是解决分布式环境中共享资源顺序访问的问题,同一时刻,集群中全部节点中,只容许有一个线程能够访问共享资源。mysql
分布式锁使用者位于不一样的机器中,锁获取成功以后,才能够对共享资源进行操做
同一时刻全部机器中只有一个使用者能够获取到分布式锁
锁具备重入的功能:即一个使用者能够屡次获取某个分布式锁
获取锁的过程容许指定超时功能:在指定的时间内尝试获取锁,过了超时时间,若还未获取到锁,则获取失败
防止死锁:如:A机器获取锁以后,在释放锁以前,A机器挂了,致使锁未释放,结果锁一直被A机器占有着,遇到这种状况时,分布式锁要可以自动解决;解决方式:持有锁的时候能够加个持有超时时间,超过了这个时间锁将自动释放,此时其余机器将有机会获取锁
下面咱们来看一下分布式锁的4种实现。面试
锁的获取过程
假如:一个集群环境中有n个系统,每一个系统中有一个jvm,每一个jvm中有m个线程去获取分布式锁,那么同时可能就有n*m个线程去获取分布式锁,此时分布式锁的压力是比较大的,每一个jvm中多个线程同时去获取锁实际上是没有意义的,能够在每一个jvm中先加一把本地的锁,获取分布式锁以前须要先获取jvm本地的锁,本地锁获取成功以后,才能够尝试获取分布式锁,此时n个系统中最多有n个线程尝试获取分布式锁,获取锁的步骤主要2步:redis
一、先尝试获取jvm本地锁 二、jvm本地锁获取成功以后尝试获取分布式锁
获取锁的时候能够传递获取锁最大等待时间,在指定的时间内屡次尝试获取锁,获取失败以后,休眠一会,再继续尝试获取,直到时间耗尽。sql
获取锁的时候须要指定有效期,有效期就是获取锁以后,使用者但愿使用多长时间,为何须要有效期?
若是没有有效期,当使用者获取成功以后,系统忽然down机了,那么这个锁就没法释放,其余线程就再也没法获取到这个锁了。
因此须要有有效期,超过了有效期,锁将失效,其余线程将能够尝试获取锁。数据库
什么是锁续命?
好比:使用者获取锁的时候,指定有效期是5分钟,可是5分钟以后,使用者事情还未干完,还想继续使用一会,那么可使用续命功能,延迟锁的有效期。
能够启动一个子线程,自动完成续命的操做,好比:本来有效期是5分钟,当使用4分钟的时候,续命2分钟,那么有效期是7分钟,这个比较简单,你们能够随意发挥。服务器
create table t_lock( lock_key varchar(32) PRIMARY KEY NOT NULL COMMENT '锁惟一标志', request_id varchar(64) NOT NULL DEFAULT '' COMMENT '用来标识请求对象的', lock_count INT NOT NULL DEFAULT 0 COMMENT '当前上锁次数', timeout BIGINT NOT NULL DEFAULT 0 COMMENT '锁超时时间', version INT NOT NULL DEFAULT 0 COMMENT '版本号,每次更新+1' )COMMENT '锁信息表';
注意:表中有个版本号字段,版本号主要用于乐观锁的方式更新数据,确保并发状况下更新数据的正确性。markdown
代码比较简单,你们主要看获取锁的lock方法和释放锁的unlock方法,注释比较详细,你们看看就懂了。
代码中的重点是更新数据的时候,经过比对版本号,采用cas的方式,确保并发状况下更新数据的正确性。
本代码实现了获取锁和释放锁的操做,续命操做未实现,你们能够尝试实现一下。并发
package lock; import lombok.Builder; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.sql.*; import java.util.Map; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @Slf4j public class DbLockUtil { //将requestid保存在该变量中 static ThreadLocal<String> requestIdTL = new ThreadLocal<>(); //jvm锁:当多个线程并发获取分布式锁时,须要先获取jvm锁,jvm锁获取成功,则尝试获取分布式锁 static Map<String, ReentrantLock> jvmLockMap = new ConcurrentHashMap<>(); /** * 获取当前线程requestid * * @return */ public static String getRequestId() { String requestId = requestIdTL.get(); if (requestId == null || "".equals(requestId)) { requestId = UUID.randomUUID().toString(); requestIdTL.set(requestId); } log.info("requestId:{}", requestId); return requestId; } /** * 获取锁 * * @param lockKey 锁key * @param lockTimeOut(毫秒) 持有锁的有效时间,防止死锁 * @param getTimeOut(毫秒) 获取锁的超时时间,这个时间内获取不到将重试 * @return */ public static boolean lock(String lockKey, long lockTimeOut, int getTimeOut) throws Exception { log.info("start"); boolean lockResult = false; /** * 单个jvm中可能有多个线程并发获取一个锁 * 此时咱们只容许一个线程去获取分布式锁 * 因此若是同一个jvm中有多个线程尝试获取分布式锁,须要先获取jvm中的锁 */ ReentrantLock jvmLock = new ReentrantLock(); ReentrantLock oldJvmLock = jvmLockMap.putIfAbsent(lockKey, jvmLock); oldJvmLock = oldJvmLock != null ? oldJvmLock : jvmLock; boolean jvmLockSuccess = oldJvmLock.tryLock(getTimeOut, TimeUnit.MILLISECONDS); //jvm锁获取失败,则直接失败 if (!jvmLockSuccess) { return lockResult; } else { //jvm锁获取成功,则继续尝试获取分布式锁 try { String request_id = getRequestId(); long startTime = System.currentTimeMillis(); //循环尝试获取锁 while (true) { //经过lockKey获取db中的记录 LockModel lockModel = DbLockUtil.get(lockKey); if (Objects.isNull(lockModel)) { //记录不存在,则先插入一条 DbLockUtil.insert(LockModel.builder().lock_key(lockKey).request_id("").lock_count(0).timeout(0L).version(0).build()); } else { //获取请求id,稍后请求id会放入ThreadLocal中 String requestId = lockModel.getRequest_id(); //若是requestId为空字符,表示锁未被占用 if ("".equals(requestId)) { lockModel.setRequest_id(request_id); lockModel.setLock_count(1); lockModel.setTimeout(System.currentTimeMillis() + lockTimeOut); //并发状况下,采用cas方式更新记录 if (DbLockUtil.update(lockModel) == 1) { lockResult = true; break; } } else if (request_id.equals(requestId)) { //若是requestId和表中request_id同样表示锁被当前线程持有者,此时须要加剧入锁 lockModel.setTimeout(System.currentTimeMillis() + lockTimeOut); lockModel.setLock_count(lockModel.getLock_count() + 1); if (DbLockUtil.update(lockModel) == 1) { lockResult = true; break; } } else { //锁不是本身的,而且已经超时了,则重置锁,继续重试 if (lockModel.getTimeout() < System.currentTimeMillis()) { DbLockUtil.resetLock(lockModel); } else { //若是未超时,休眠100毫秒,继续重试 if (startTime + getTimeOut > System.currentTimeMillis()) { TimeUnit.MILLISECONDS.sleep(100); } else { break; } } } } } } finally { //释放jvm锁,将其从map中异常 jvmLock.unlock(); jvmLockMap.remove(lockKey); } } log.info("end"); return lockResult; } /** * 释放锁 * * @param lock_key * @throws Exception */ private static void unlock(String lock_key) throws Exception { //获取当前线程requestId String requestId = getRequestId(); LockModel lockModel = DbLockUtil.get(lock_key); //当前线程requestId和库中request_id一致 && lock_count>0,表示能够释放锁 if (Objects.nonNull(lockModel) && requestId.equals(lockModel.getRequest_id()) && lockModel.getLock_count() > 0) { if (lockModel.getLock_count() == 1) { //重置锁 resetLock(lockModel); } else { lockModel.setLock_count(lockModel.getLock_count() - 1); DbLockUtil.update(lockModel); } } } /** * 重置锁 * * @param lockModel * @return * @throws Exception */ private static int resetLock(LockModel lockModel) throws Exception { lockModel.setRequest_id(""); lockModel.setLock_count(0); lockModel.setTimeout(0L); return DbLockUtil.update(lockModel); } /** * 更新lockModel信息,内部采用乐观锁来更新 * * @param lockModel * @return * @throws Exception */ private static int update(LockModel lockModel) throws Exception { return exec(conn -> { String sql = "UPDATE t_lock SET request_id = ?,lock_count = ?,timeout = ?,version = version + 1 WHERE lock_key = ? AND version = ?"; PreparedStatement ps = conn.prepareStatement(sql); int colIndex = 1; ps.setString(colIndex++, lockModel.getRequest_id()); ps.setInt(colIndex++, lockModel.getLock_count()); ps.setLong(colIndex++, lockModel.getTimeout()); ps.setString(colIndex++, lockModel.getLock_key()); ps.setInt(colIndex++, lockModel.getVersion()); return ps.executeUpdate(); }); } private static LockModel get(String lock_key) throws Exception { return exec(conn -> { String sql = "select * from t_lock t WHERE t.lock_key=?"; PreparedStatement ps = conn.prepareStatement(sql); int colIndex = 1; ps.setString(colIndex++, lock_key); ResultSet rs = ps.executeQuery(); if (rs.next()) { return LockModel.builder(). lock_key(lock_key). request_id(rs.getString("request_id")). lock_count(rs.getInt("lock_count")). timeout(rs.getLong("timeout")). version(rs.getInt("version")).build(); } return null; }); } private static int insert(LockModel lockModel) throws Exception { return exec(conn -> { String sql = "insert into t_lock (lock_key, request_id, lock_count, timeout, version) VALUES (?,?,?,?,?)"; PreparedStatement ps = conn.prepareStatement(sql); int colIndex = 1; ps.setString(colIndex++, lockModel.getLock_key()); ps.setString(colIndex++, lockModel.getRequest_id()); ps.setInt(colIndex++, lockModel.getLock_count()); ps.setLong(colIndex++, lockModel.getTimeout()); ps.setInt(colIndex++, lockModel.getVersion()); return ps.executeUpdate(); }); } private static <T> T exec(SqlExec<T> sqlExec) throws Exception { Connection conn = getConn(); try { return sqlExec.exec(conn); } finally { closeConn(conn); } } @FunctionalInterface public interface SqlExec<T> { T exec(Connection conn) throws Exception; } @Getter @Setter @Builder public static class LockModel { private String lock_key; private String request_id; private Integer lock_count; private Long timeout; private Integer version; } private static final String url = "jdbc:mysql://localhost:3306/dlock?useSSL=false"; //数据库地址 private static final String username = ""; //数据库用户名 private static final String password = ""; //数据库密码 private static final String driver = "com.mysql.jdbc.Driver"; //mysql驱动 /** * 链接数据库 * * @return */ private static Connection getConn() { Connection conn = null; try { Class.forName(driver); //加载数据库驱动 try { conn = DriverManager.getConnection(url, username, password); //链接数据库 } catch (SQLException e) { e.printStackTrace(); } } catch (ClassNotFoundException e) { e.printStackTrace(); } return conn; } /** * 关闭数据库连接 * * @return */ private static void closeConn(Connection conn) { if (conn != null) { try { conn.close(); //关闭数据库连接 } catch (SQLException e) { e.printStackTrace(); } } } }
setnx
命令格式:SETNX key value;是『SET if Not eXists』(若是不存在,则 SET)的简写,只在键 key 不存在的状况下,将键 key 的值设置为 value 。若键 key 已经存在, 则 SETNX 命令不作任何动做。命令在设置成功时返回 1 ,设置失败时返回 0 。dom
getset
命令格式:GETSET key value,将键 key 的值设为 value ,并返回键 key 在被设置以前的旧的value。返回值:若是键 key 没有旧值, 也便是说, 键 key 在被设置以前并不存在, 那么命令返回 nil 。当键 key 存在但不是字符串类型时,命令返回一个错误。
一、A尝试去获取锁lockkey,经过setnx(lockkey,currenttime+timeout)命令,对lockkey进行setnx,将value值设置为当前时间+锁超时时间;
二、若是返回值为1,说明redis服务器中尚未lockkey,也就是没有其余用户拥有这个锁,A就能获取锁成功;
三、在进行相关业务执行以前,先执行expire(lockkey),对lockkey设置有效期,防止死锁;由于若是不设置有效期的话,lockkey将一直存在于redis中,其余用户尝试获取锁时,执行到setnx(lockkey,currenttime+timeout)时,将不能成功获取到该锁;
四、执行相关业务
五、释放锁,A完成相关业务以后,要释放拥有的锁,也就是删除redis中该锁的内容,del(lockkey),接下来的用户才能进行从新设置锁新值
六、当A经过setnx(lockkey,currenttime+timeout)命令不能成功设置lockkey时,这是不能直接判定获取锁失败;由于咱们在设置锁时,设置了锁的超时时间timeout,当当前时间大于redis中存储键值为lockkey的value值时,能够认为上一任的拥有者对锁的使用权已经失效了,A就能够强行拥有该锁;具体断定过程以下;
七、A经过get(lockkey),获取redis中的存储键值为lockkey的value值,即获取锁的相对时间lockvalueA
八、lockvalueA!=null && currenttime>lockvalue,A经过当前的时间与锁设置的时间作比较,若是当前时间已经大于锁设置的时间临界,便可以进一步判断是否能够获取锁,不然说明该锁还在被占用,A就还不能获取该锁,结束,获取锁失败;
九、步骤4返回结果为true后,经过getSet设置新的超时时间,并返回旧值lockvalueB,以做判断,由于在分布式环境,在进入这里时可能另外的进程获取到锁并对值进行了修改,只有旧值与返回的值一致才能说明中间未被其余进程获取到这个锁
十、lockvalueB == null || lockvalueA==lockvalueB,判断:若果lockvalueB为null,说明该锁已经被释放了,此时该进程能够获取锁;旧值与返回的lockvalueB一致说明中间未被其余进程获取该锁,能够获取锁;不然不能获取锁,结束,获取锁失败。
留给给你们,按照上面的过程实现下。
zookeeper是什么?是一个开源的中间件,能够作高可用配置中心使用,简单点理解:能够用来保存用户的一些数据。
zookeeper有3个特色比较重要,这2个特色是实现分布式锁的关
键。
zookeeper中存储数据是树结构,树下面能够建立不少节点,节点中能够存储用户的数据。
在每个节点下面建立子节点时,只要选择的建立类型是有序类型,那么,此节点将自动在客户端指定的节点名后面添加一个单调递增序号,重点是,并发建立子节点的状况下,也能够确保多个子节点的有序性。
好比并发在/lock/lock1下面建立4个有序子节点,以下:
客户端能够判断建立的节点序号是否是最小的,若是编号是子节点中最小的,则获取锁成功。
客户端操做zookeeper,须要和zookeeper之间创建链接,若是客户端请求在zookeeper上建立的节点类型是临时节点,那么当客户端和zookeeper之间链接断开的时候,建立的临时节点自动会被zookeeper删除。
这个能够防止死锁多功能,好比客户端获取锁以后挂了,那么节点会自动被删除,此时锁的其余获取者才有机会获取锁。
客户端能够对某个节点添加监听器,当节点信息发生变化的时候,zookeeper会通知客户端,好比节点数据被修改、节点被删除等等,都会通知客户端;
这个特性特别牛逼:这个特别爽,后面的节点只须要监听他前面的一个节点,当前面的一个节点被删除时,zookeeper会通知监听者,监听者能够判断本身建立的节点编号是否是最小的,若是是最小的,即获取锁成功,这个是否是比上面数据库和redis的方式好一些,db和redis的方式须要自旋(获取失败了,休眠稍许,继续循环尝试),而zookeeper不须要自旋,锁被释放的时候,zookeeper会通知等待者。
重点理解原理,代码你们能够在网上找找,比较多,这里就不贴出来了。
etcd 和 zookeeper功能差很少,也能够做为高可用配置中心,不过etcd基于Go语言实现,也能够用来实现分布式锁,实现原理上和zookeeper差很少,这里就不细说了。
本文主要介绍了4种方式实现分布式锁,你们重点要理解每种方式的原理。
db和redis的方式原理差很少,内部在获取失败的状况下,都须要采用自旋的方式从新尝试获取锁,而zookeeper采用监听的方式。
redis和zookeeper这2种方式用的比较多,性能上面redis更好一些,并发量比较大的能够采用redis的方式。
设计中还有一点:获取锁的时候分2步走,先获取jvm中的锁,而后在尝试获取分布式锁。
·END·扫描二维码 | 关注咱们