微信公众号:内核小王子
关注可了解更多关于数据库,JVM内核相关的知识;
若是你有任何疑问也能够加我pigpdong[^1]java
在多线程状况下访问资源,咱们须要加锁来保证业务的正常进行,JDK中提供了不少并发控制相关的工具包,来保证多线程下能够高效工做,一样在分布式环境下,有些互斥操做咱们能够借助分布式锁来实现两个操做不能同时运行,必须等到另一个任务结束了把锁释放了才能获取锁而后执行,由于跨JVM咱们须要一个第三方系统来协助实现分布式锁,通常咱们能够用
数据库,redis,zookeeper,etcd等来实现.node
要实现一把分布式锁,咱们须要先分析下这把锁有哪些特性mysql
1.在分布式集群中,也就是不一样的JVM中,相互有冲突的方法,能够是不一样JVM相同实例内的同一个方法,也能够是不一样方法,也就是不一样业务间的隔离和同一个业务操做不能并行运行,而分布式锁须要保证这两个方法在同一时间只能有一个运行.redis
2.这把锁最好是可重入的,由于不可重入的锁很容易出现死锁sql
3.获取锁和释放锁的性能要很高数据库
4.支持获取锁的时候能够阻塞等待,以及等待时间微信
5.获取锁后支持设置一个期限,超过这个期限能够自动释放,防止程序没有本身释放的状况多线程
6.这是一把轻量锁,对业务侵入小并发
7.易用dom
因为数据库的锁无能是在性能高可用上都不及其余方式,这里咱们简单介绍下可能的方案
SET resource_name my_random_value NX PX 30000
咱们能够看看下面获取分布式锁的使用场景,假设咱们释放锁,直接del这个key
if (!redisComponent.acquireLock(lockKey) { LOGGER.warn(">>分布式并发锁获取失败"); return ; } try { // do business ... } catch (BusinessException e) { // exception handler ... } finally { redisComponent.releaseLock(lockKey); }
为了解决以上问题,咱们能够在释放锁的时候,判断下锁是否存在,这样进程A在释放锁的时候就不会将进程B加的锁释放了,
或者经过如下方式,将过时时间作为value存储在对应的key中,释放锁的时候,判断当前时间是否小于过时时间,只有小于当前时间才处理,咱们也能够在进行del操做的时候判断下对应的value是否相等,这个时候就须要在del操做的时候传人
my_random_value
下面咱们看下redis实现分布式锁java代码实现,咱们采用在del的时候判断下当前时间是否小于过时时间
public boolean acquireLock(String lockKey, long expired) { ShardedJedis jedis = null; try { jedis = pool.getResource(); String value = String.valueOf(System.currentTimeMillis() + expired + 1); int tryTimes = 0; while (tryTimes++ < 3) { /* * 1. 尝试锁 * setnx : set if not exist */ if (jedis.setnx(lockKey, value).equals(1L)) { return true; } /* * 2. 已经被别的线程锁住,判断是否失效 */ String oldValue = jedis.get(lockKey); if (StringUtils.isBlank(oldValue)) { /* * 2.1 value存的是超时时间,若是为空有2种状况 * 1. 异常数据,没有value 或者 value为空字符 * 2. 锁刚好被别的线程释放了 * 此时须要尝试从新尝试,为了不出现状况1时致使死循环,只重试3次 */ continue; } Long oldValueL = Long.valueOf(oldValue); if (oldValueL < System.currentTimeMillis()) { /* * 已超时,从新尝试锁 * * Redis:getSet 操做步骤: * 1.获取 Key 对应的 Value 做为返回值,不存在时返回null * 2.设置 Key 对应的 Value 为传入的值 * 这里若是返回的 getValue != oldValue 表示已经被其它线程从新修改了 */ String getValue = jedis.getSet(lockKey, value); return oldValue.equals(getValue); } else { // 未超时,则直接返回失败 return false; } } return false; } catch (Throwable e) { logger.error("acquireLock error", e); return false; } finally { returnResource(jedis); } } /** * 释放锁 * * @param lockKey * key */ public void releaseLock(String lockKey) { ShardedJedis jedis = null; try { jedis = pool.getResource(); long current = System.currentTimeMillis(); // 避免删除非本身获取到的锁 String value = jedis.get(lockKey); if (StringUtils.isNotBlank(value) && current < Long.valueOf(value)) { jedis.del(lockKey); } } catch (Throwable e) { logger.error("releaseLock error", e); } finally { returnResource(jedis); } }
这种方式没有用到刚刚说的my_random_value,咱们看下若是咱们按如下代码获取锁会有什么问题
if (!redisComponent.acquireLock(lockKey) { LOGGER.warn(">>分布式并发锁获取失败"); return ; } try { boolean locked = redisComponent.acquireLock(lockKey); if(locked) // do business ... } catch (BusinessException e) { // exception handler ... } finally { redisComponent.releaseLock(lockKey); }
一样这种方式当进程A没有获取到锁,以后进程B获取到锁,进程A会释放进程B的锁,这个时候咱们能够借助my_random_value来实现
/** * 释放锁 * * @param lockKey ,value */ public void releaseLock(String lockKey, long oldvalue) { ShardedJedis jedis = null; try { jedis = pool.getResource(); String value = jedis.get(lockKey); if (StringUtils.isNotBlank(value) && oldvalue == Long.valueOf(value)) { jedis.del(lockKey); } } catch (Throwable e) { logger.error("releaseLock error", e); } finally { returnResource(jedis); } }
这种方式须要保存以前获取锁时候的value值,并在释放锁的带上value值,不过这种实现方式,value的值为过时时间也不惟一
因为咱们用了redis得超时机制来释放锁,那么当进程在锁租约到期后尚未执行结束,那么其余进程获取到锁后则会产生并发写的状况,这种若是业务上须要精确控制,只能用乐观锁来控制了,每次写入数据都带一个锁的版本,若是下次获取锁的时候版本加1,这样上面那种状况,锁到期释放了新的进程获取到锁后会使用新的版本号,以前的进程锁已经释放了若是继续使用该锁则会发现版本已经不对了
能够借助zookeeper的顺序节点,在一个父节点下,全部须要争抢锁的资源都去这个目录下建立一个顺序节点,而后判断这个临时顺序节点是不是兄弟节点中顺序最小的,若是是最小的则获取到锁,若是不是则监听这个顺序最小的节点的删除事件,而后在继续根据这个流程获取最小节点
public void lock() { try { // 建立临时子节点 String myNode = zk.create(root + "/" + lockName , data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(j.join(Thread.currentThread().getName() + myNode, "created")); // 取出全部子节点 List<String> subNodes = zk.getChildren(root, false); TreeSet<String> sortedNodes = new TreeSet<>(); for(String node :subNodes) { sortedNodes.add(root +"/" +node); } String smallNode = sortedNodes.first(); String preNode = sortedNodes.lower(myNode); if (myNode.equals( smallNode)) { // 若是是最小的节点,则表示取得锁 System.out.println(j.join(Thread.currentThread().getName(), myNode, "get lock")); this.nodeId.set(myNode); return; } CountDownLatch latch = new CountDownLatch(1); Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同时注册监听。 // 判断比本身小一个数的节点是否存在,若是不存在则无需等待锁,同时注册监听 if (stat != null) { System.out.println(j.join(Thread.currentThread().getName(), myNode, " waiting for " + root + "/" + preNode + " released lock")); latch.await();// 等待,这里应该一直等待其余线程释放锁 nodeId.set(myNode); latch = null; } } catch (Exception e) { throw new RuntimeException(e); } } public void unlock() { try { System.out.println(j.join(Thread.currentThread().getName(), nodeId.get(), "unlock ")); if (null != nodeId) { zk.delete(nodeId.get(), -1); } nodeId.remove(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } }
固然若是咱们开发环境使用的是etcs也能够用etcd来实现分布式锁,原理和zookeeper相似