目录html
疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 -26【 博客园 总入口 】node
你们好,我是做者尼恩。目前和几个小伙伴一块儿,组织了一个高并发的实战社群【疯狂创客圈】。正在开始高并发、亿级流程的 IM 聊天程序 学习和实战面试
前面,已经完成一个高性能的 Java 聊天程序的四件大事:redis
接下来,须要进入到分布式开发的环节了。 分布式的中间件,疯狂创客圈的小伙伴们,一致的选择了zookeeper,不只仅是因为其在大数据领域,太有名了。更重要的是,不少的著名框架,都使用了zk。算法
本篇介绍 ZK Curator 的分布式锁实现。安全
在咱们进行单机应用开发,涉及并发同步的时候,咱们每每采用synchronized或者Lock的方式来解决多线程间的代码同步问题。但当咱们的应用是分布式集群工做的状况下,那么就须要一种更加高级的锁机制,来处理种跨机器的进程之间的数据同步问题。服务器
这就是分布式锁。网络
分布式锁的概念和原理,比较抽象难懂。若是用一个简单的故事来类比,估计就简单多了。多线程
好久之前,在一个村子有一口井,水质很是的好,村民们都抢着取井里的水。井就那么一口,村里的人不少,村民为争抢取水打架斗殴,甚至头破血流。并发
问题老是要解决,因而村长绞尽脑汁,最终想出了一个凭号取水的方案。井边安排一个看井人,维护取水的秩序。
提及来,秩序很简单,取水以前,先取号。号排在前面的,就能够先取水。先到的排在前面,那些后到的,没有排在最前面的人,一个一个挨着,在井边排成一队。取水示意图以下 :
这种排队取水模型,就是一种锁的模型。排在最前面的号,拥有取水权,就是一种典型的独占锁。另外,先到先得,号排在前面的人先取到水,取水以后就轮到下一个号取水,至少,看起来挺公平的,说明它是一种公平锁。
在公平独占锁的基础上,再进一步,看看可重入锁的模型。
假定,取水时以家庭为单位,哪一个家庭任何人拿到号,就能够排号取水,并且若是一个家庭有一我的拿到号,其它家人这时候过来打水不用再取号。新的排号取水示意图以下 :
如上图的1号,老公有号,他的老婆来了,直接排第一个,妻凭夫贵。再看上图的2号,父亲正在打水,他的儿子和女儿也到井边了,直接排第二个,这个叫作子凭父贵。 等等,若是是同一个家庭,能够直接复用排号,不用从新取号从后面排起。
以上这个故事模型,就是能够重入锁的模型。只要知足条件,同一个排号,能够用来屡次取水。在锁的模型中,至关于一把锁,能够被屡次锁定,这就叫作可重入锁。
理解了锁的原理后,就会发现,Zookeeper 天生就是一副分布式锁的胚子。
首先,Zookeeper的每个节点,都是一个自然的顺序发号器。
在每个节点下面建立子节点时,只要选择的建立类型是有序(EPHEMERAL_SEQUENTIAL 临时有序或者PERSISTENT_SEQUENTIAL 永久有序)类型,那么,新的子节点后面,会加上一个次序编号。这个次序编号,是上一个生成的次序编号加一
好比,建立一个用于发号的节点“/test/lock”,而后以他为父亲节点,能够在这个父节点下面建立相同前缀的子节点,假定相同的前缀为“/test/lock/seq-”,在建立子节点时,同时指明是有序类型。若是是第一个建立的子节点,那么生成的子节点为/test/lock/seq-0000000000,下一个节点则为/test/lock/seq-0000000001,依次类推,等等。
其次,Zookeeper节点的递增性,能够规定节点编号最小的那个得到锁。
一个zookeeper分布式锁,首先须要建立一个父节点,尽可能是持久节点(PERSISTENT类型),而后每一个要得到锁的线程都会在这个节点下建立个临时顺序节点,因为序号的递增性,能够规定排号最小的那个得到锁。因此,每一个线程在尝试占用锁以前,首先判断本身是排号是否是当前最小,若是是,则获取锁。
第三,Zookeeper的节点监听机制,能够保障占有锁的方式有序并且高效。
每一个线程抢占锁以前,先抢号建立本身的ZNode。一样,释放锁的时候,就须要删除抢号的Znode。抢号成功后,若是不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不须要其余人,只须要等前一个Znode 的通知就能够了。当前一个Znode 删除的时候,就是轮到了本身占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。
Zookeeper的节点监听机制,能够说可以很是完美的,实现这种击鼓传花似的信息传递。具体的方法是,每个等通知的Znode节点,只须要监听linsten或者 watch 监视排号在本身前面那个,并且紧挨在本身前面的那个节点。 只要上一个节点被删除了,就进行再一次判断,看看本身是否是序号最小的那个节点,若是是,则得到锁。
为何说Zookeeper的节点监听机制,能够说是很是完美呢?
一条龙式的首尾相接,后面监视前面,就不怕中间截断吗?好比,在分布式环境下,因为网络的缘由,或者服务器挂了或则其余的缘由,若是前面的那个节点没能被程序删除成功,后面的节点不就永远等待么?
其实,Zookeeper的内部机制,能保证后面的节点可以正常的监听到删除和得到锁。在建立取号节点的时候,尽可能建立临时znode 节点而不是永久znode 节点,一旦这个 znode 的客户端与Zookeeper集群服务器失去联系,这个临时 znode 也将自动删除。排在它后面的那个节点,也能收到删除事件,从而得到锁。
说Zookeeper的节点监听机制,是很是完美的。还有一个缘由。
Zookeeper这种首尾相接,后面监听前面的方式,能够避免羊群效应。所谓羊群效应就是每一个节点挂掉,全部节点都去监听,而后作出反映,这样会给服务器带来巨大压力,因此有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才作出反映。
接下来就是基于zookeeper,实现一下分布式锁。
首先定义了一个锁的接口,很简单,一个加锁方法,一个解锁方法。
/** * create by 尼恩 @ 疯狂创客圈 **/ public interface Lock { boolean lock() throws Exception; boolean unlock(); }
使用zookeeper实现分布式锁的算法流程,大体以下:
(1)若是锁空间的根节点不存在,首先建立Znode根节点。这里假设为“/test/lock”。这个根节点,表明了一把分布式锁。
(2)客户端若是须要占用锁,则在“/test/lock”下建立临时的且有序的子节点。
这里,尽可能使一个有意义的子节点前缀,好比“/test/lock/seq-”。则第一个客户端对应的子节点为“/test/lock/seq-000000000”,第二个为 “/test/lock/seq-000000001”,以此类推。
若是前缀为“/test/lock/”,则第一个客户端对应的子节点为“/test/lock/000000000”,第二个为 “/test/lock/000000001” ,以此类推,也很是直观。
(3)客户端若是须要占用锁,还须要判断,判断本身建立的子节点是否为当前子节点列表中序号最小的子节点。若是是则认为得到锁,不然监听前一个Znode子节点变动消息,得到子节点变动通知后重复此步骤直至得到锁;
(4)获取锁后,开始处理业务流程。完成业务流程后,删除对应的子节点,完成释放锁的工做。以便后面的节点得到分布式锁。
lock方法的具体算法是,首先尝试着去加锁,若是加锁失败就去等待,而后再重复。
代码以下:
@Override public boolean lock() { try { boolean locked = false; locked = tryLock(); if (locked) { return true; } while (!locked) { await(); if (checkLocked()) { locked=true; } } return true; } catch (Exception e) { e.printStackTrace(); unlock(); } return false; }
尝试加锁的tryLock方法是关键。作了两件重要的事情:
(1)建立临时顺序节点,而且保存本身的节点路径
(2)判断是不是第一个,若是是第一个,则加锁成功。若是不是,就找到前一个Znode节点,而且保存其路径到prior_path。
tryLock方法代码节选以下:
private boolean tryLock() throws Exception { //建立临时Znode List<String> waiters = getWaiters(); locked_path = ZKclient.instance .createEphemeralSeqNode(LOCK_PREFIX); if (null == locked_path) { throw new Exception("zk error"); } locked_short_path = getShorPath(locked_path); //获取等待的子节点列表,判断本身是否第一个 if (checkLocked()) { return true; } // 判断本身排第几个 int index = Collections.binarySearch(waiters, locked_short_path); if (index < 0) { // 网络抖动,获取到的子节点列表里可能已经没有本身了 throw new Exception("节点没有找到: " + locked_short_path); } //若是本身没有得到锁,则要监听前一个节点 prior_path = ZK_PATH + "/" + waiters.get(index - 1); return false; }
建立临时顺序节点后,其完整路径存放在 locked_path 成员中。另外还截取了一个后缀路径,放在 locked_short_path 成员中。 这个后缀路径,是一个短路径,只有完整路径的最后一层。在和取到的远程子节点列表中的其余路径进行比较时,须要用到短路径。由于子节点列表的路径,都是短路径,只有最后一层。
而后,调用checkLocked方法,判断是不是锁定成功。若是是则返回。若是本身没有得到锁,则要监听前一个节点。找出前一个节点的路径,保存在 prior_path 成员中,供后面的await 等待方法,去监听使用。
在进入await等待方法的介绍前,先说下checkLocked 锁定判断方法。
在checkLocked方法中,判断是否能够持有锁。判断规则很简单:当前建立的节点,是否在上一步获取到的子节点列表的第一个位置:
若是是,说明能够持有锁,返回true,表示加锁成功;
若是不是,说明有其余线程早已先持有了锁,返回false。
checkLocked方法的代码以下:
private boolean checkLocked() { //获取等待的子节点列表 List<String> waiters = getWaiters(); //节点按照编号,升序排列 Collections.sort(waiters); // 若是是第一个,表明本身已经得到了锁 if (locked_short_path.equals(waiters.get(0))) { log.info("成功的获取分布式锁,节点为{}", locked_short_path); return true; } return false; }
checkLocked方法比较简单,就是获取到全部子节点列表,而且从小到大根据节点名称进行排序,主要依靠后10位数字,由于前缀都是同样的。
排序的结果,若是本身的locked_short_path位置在第一个,表明本身已经得到了锁。
如今正式进入等待方法await的介绍。
等待方法await,表示在争夺锁失败之后的等待逻辑。那么此处该线程应该作什么呢?
private void await() throws Exception { if (null == prior_path) { throw new Exception("prior_path error"); } final CountDownLatch latch = new CountDownLatch(1); //订阅比本身次小顺序节点的删除事件 Watcher w = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("监听到的变化 watchedEvent = " + watchedEvent); log.info("[WatchedEvent]节点删除"); latch.countDown(); } }; client.getData().usingWatcher(w).forPath(prior_path); latch.await(WAIT_TIME, TimeUnit.SECONDS); }
首先添加一个watcher监听,而监听的地址正是上面一步返回的prior_path 成员。这里,仅仅会监听本身前一个节点的变更,而不是父节点下全部节点的变更。而后,调用latch.await,进入等待状态,等到latch.countDown()被唤醒。
一旦prior_path节点发生了变更,那么就将线程从等待状态唤醒,从新一轮的锁的争夺。
至此,关于加锁的算法基本完成。可是,上面尚未实现锁的可重入。
什么是可重入呢?
只须要保障同一个线程进入加锁的代码,能够重复加锁成功便可。
修改前面的lock方法,在前面加上可重入的判断逻辑。代码以下: public boolean lock() { synchronized (this) { if (lockCount.get() == 0) { thread = Thread.currentThread(); lockCount.incrementAndGet(); } else { if (!thread.equals(Thread.currentThread())) { return false; } lockCount.incrementAndGet(); return true; } } //... }
为了变成可重入,在代码中增长了一个加锁的计数器lockCount ,计算重复加锁的次数。若是是同一个线程加锁,只须要增长次数,直接返回,表示加锁成功。
释放锁主要有两个工做:
(1)减小重入锁的计数,若是不是0,直接返回,表示成功的释放了一次;
(2)若是计数器为0,移除Watchers监听器,而且删除建立的Znode临时节点;
代码以下:
@Override public boolean unlock() { if (!thread.equals(Thread.currentThread())) { return false; } int newLockCount = lockCount.decrementAndGet(); if (newLockCount < 0) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + locked_path); } if (newLockCount != 0) { return true; } try { if (ZKclient.instance.isNodeExist(locked_path)) { client.delete().forPath(locked_path); } } catch (Exception e) { e.printStackTrace(); return false; } return true; }
这里,为了尽可能保证线程安全,可重入计数器的类型,不是int类型,而是Java并发包中的原子类型——AtomicInteger。
前面的实现,主要的价值是展现一下分布式锁的基础开发和原理。实际的开发中,若是须要使用到分布式锁,并不须要本身造轮子,能够直接使用curator客户端中的各类官方实现的分布式锁,好比其中的InterProcessMutex 可重入锁。
InterProcessMutex 可重入锁的使用实例以下:
@Test public void testzkMutex() throws InterruptedException { CuratorFramework client=ZKclient.instance.getClient(); final InterProcessMutex zkMutex = new InterProcessMutex(client,"/mutex"); ; for (int i = 0; i < 10; i++) { FutureTaskScheduler.add(() -> { try { zkMutex.acquire(); for (int j = 0; j < 10; j++) { count++; } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("count = " + count); zkMutex.release(); } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(Integer.MAX_VALUE); }
最后,总结一下Zookeeper分布式锁。
Zookeeper分布式锁,能有效的解决分布式问题,不可重入问题,实现起来较为简单。
可是,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能并不过高。由于每次在建立锁和释放锁的过程当中,都要动态建立、销毁瞬时节点来实现锁功能。ZK中建立和删除节点只能经过Leader服务器来执行,而后Leader服务器还须要将数据同不到全部的Follower机器上。
因此,在高性能,高并发的场景下,不建议使用Zk的分布式锁。
目前分布式锁,比较成熟、主流的方案是基于redis及基于zookeeper的二种方案。这两种锁,应用场景不一样。而 zookeeper只是其中的一种。Zk的分布式锁的应用场景,主要高可靠,而不是过高并发的场景下。
在并发量很高,性能要求很高的场景下,推荐使用基于redis的分布式锁。
下一篇: zookeeper + netty 实现高并发IM 聊天
Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战
疯狂创客圈 【 博客园 总入口 】