由于疫情影响,面试成为你们关注的一个重点,今天讲解分布式锁的相关知识,因此找到几个对于分布式锁面试的时候该怎么回答java
为此,将分布式锁相关面试知识点进行总结和讲解,
从面试出发,带你学习分布式锁node
福利以后,话归正题,咱们来看一下分布式锁面试
你们都知道,若是咱们一台机器上多个不一样线程抢占同一个资源,而且若是屡次执行会有异常,咱们称之为非线程安全。通常,咱们为了解决这种问题,一般使用锁来解决,像java语言,咱们可使用synchronized。若是是同一台机器里面不一样的java实例,咱们可使用系统的文件读写锁来解决,若是再扩展到不一样的机器呢?咱们一般用分布式锁来解决。redis
分布式锁的特色以下:apache
分布式锁的实现有不少种,常见的有redis,zookeeper,谷歌的chubby等安全
简单介绍一下。相信你们这里已经想到了解决方案,那就是每次执行任务的时候,先查询redis里面是否已经有锁的key,若是没有就写入,而后就开始执行任务。架构
这个看起来很对,不过存在什么问题呢,例如进程A跟进程B同时查询Redis,他们都发现Redis中没有对应的值,而后都开始写入,因为不是带版本读写,两我的都写成功了,都得到了锁。还好,Redis给咱们提供原子写入的操做,setnx(SET if Not eXists, 一个命令咱们最好把全称也了解一下,有助于咱们记住这个命令)。分布式
若是你觉得只要这样就完成一个分布式锁,那就太天真了,咱们不妨考虑一些极端状况,例如某个线程取到了锁,可是很不幸,这个机器死机了,那么这个锁没有被释放,这个任务永远就不会有人执行了。因此一种比较好的解决方案是,申请锁的时候,预估一个程序的执行时间,而后给锁设置一个超时时间,若是超过这个时间其余人也能取到这个锁。但这又引起另一个问题,有时候负载很高,任务执行得很慢,结果过了超时时间任务还没执行完,这个时候又起了另一个任务来执行。ide
架构设计的魅力正是如此,当你解决一个问题的时候,总会引起一些新的问题,须要逐步攻破逐个解决。这种方法,咱们通常能够在抢占到锁以后,就开一个守护线程,定时去redis哪里询问,是否是仍是由我抢占着当前的锁,还有多久就要过时,若是发现要过时了,就赶忙续期。函数
好了,看到这里,相信你已经学会了如何用Redis实现一个分布式锁服务了
Zookeeper 实现分布式锁的示意图以下:
上图中左边是Zookeeper集群, lock是数据节点,node_1到node_n表示一系列的顺序临时节点,右侧client_1到client_n表示要获取锁的客户端。Service是互斥访问的服务。
下面的源码是根据Zookeeper的开源客户端Curator实现分布式锁。采用zk的原生API实现会比较复杂,因此这里就直接用Curator这个轮子,采用Curator的acquire和release两个方法就能实现分布式锁。
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorDistributeLock {
public static void main(String\[\] args) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("111.231.83.101:2181",retryPolicy); client.start(); CuratorFramework client2 = CuratorFrameworkFactory.newClient("111.231.83.101:2181",retryPolicy); client2.start(); //建立分布式锁, 锁空间的根节点路径为/curator/lock InterProcessMutex mutex = new InterProcessMutex(client,"/curator/lock"); final InterProcessMutex mutex2 = new InterProcessMutex(client2,"/curator/lock"); try { mutex.acquire(); } catch (Exception e) { e.printStackTrace(); } //得到了锁, 进行业务流程 System.out.println("clent Enter mutex"); Thread client2Th = new Thread(new Runnable() { @Override public void run() { try { mutex2.acquire(); System.out.println("client2 Enter mutex"); mutex2.release(); System.out.println("client2 release lock"); }catch (Exception e){ e.printStackTrace(); } } }); client2Th.start(); //完成业务流程, 释放锁 try { Thread.sleep(5000); mutex.release(); System.out.println("client release lock"); client2Th.join(); } catch (Exception e) { e.printStackTrace(); } //关闭客户端 client.close(); }
}
上述代码的执行结果以下:
能够看到client客户端首先拿到锁再执行业务,而后再轮到client2尝试获取锁并执行业务。
一直追踪acquire()的加锁方法,能够追踪到加锁的核心函数为attemptLock。
String attemptLock(long time, TimeUnit unit, byte\[\] lockNodeBytes) throws Exception { ..... while ( !isDone ) { isDone = true; try { //建立临时有序节点 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); //判断本身是否最小序号的节点,若是不是添加监听前面节点被删的通知 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } } //若是获取锁返回节点路径 if ( hasTheLock ) { return ourPath; } .... }
深刻internalLockLoop函数源码:
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { ....... while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { //获取子节点列表按照序号从小到大排序 List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash //判断本身是不是当前最小序号节点 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { //成功获取锁 haveTheLock = true; } else { //拿到前一个节点 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); //若是没有拿到锁,调用wait,等待前一个节点删除时,经过回调notifyAll唤醒当前线程 synchronized(this) { try { //设置监听器,getData会判读前一个节点是否存在,不存在就会抛出异常从而不会设置监听器 client.getData().usingWatcher(watcher).forPath(previousSequencePath); //若是设置了millisToWait,等一段时间,到了时间删除本身跳出循环 if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } //等待一段时间 wait(millisToWait); } else { //一直等待下去 wait(); } } catch ( KeeperException.NoNodeException e ) { //getData发现前一个子节点被删除,抛出异常 } } } } } ..... }
采用zk实现分布式锁在实际应用中不是很常见,须要一套zk集群,并且频繁监听对zk集群来讲也是有压力,因此不推荐你们用。不能去面试的时候,能具体说一下使用zk实现分布式锁,我想应该也是一个加分项 。
好了,今天就从redis和zookeeper两个方面实现分布式锁,以为有收获的小伙伴,欢迎关注+点赞呀须要更多资料的,关注公众号Java技术联盟,每日更新哦