目前几乎不少大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉咱们“任何一个分布式系统都没法同时知足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时知足两项。”因此,不少系统在设计之初就要对这三者作出取舍。在互联网领域的绝大多数的场景中,都须要牺牲强一致性来换取系统的高可用性,系统每每只须要保证“最终一致性”,只要这个最终时间是在用户能够接受的范围内便可。java
在不少场景中,咱们为了保证数据的最终一致性,须要不少的技术方案来支持,好比分布式事务、分布式锁等。有的时候,咱们须要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,Java中其实提供了不少并发处理相关的API,可是这些API在分布式场景中就无能为力了。也就是说单纯的Java Api并不能提供分布式锁的能力。因此针对分布式锁的实现目前有多种方案。redis
针对分布式锁的实现,目前比较经常使用的有如下几种方案:数据库
基于数据库实现分布式锁 基于缓存(redis,memcached,tair)实现分布式锁 基于Zookeeper实现分布式锁apache
1、zookeeper中分布式锁实现原理缓存
(1)、普通节点思路并发
如今模拟一个使用Zookeeper实现分布式锁,假设有A,B,C三台客户端去访问资源,调用zookeeper得到锁。客户端三个在zookeeper的 /locks节点下建立一个/lock节点,因为节点是惟一性的特性,只有一我的会建立成功,其他两个建立失败,会进入监听/locks节点的变化,若是/locks下子节点/lock节点发生变化,其他两个能够去拿锁,这样是否好呢? 这样子会致使惊群效应。就是一个触发使得在短期呢会触发大量的watcher事件,可是只有一个客户端能拿到锁less
--众人抢,大量watcher事件分布式
(2)、有序节点思路ide
一、在获取分布式锁的时候在locker节点下建立临时顺序节点,释放锁的时候删除该临时节点。memcached
二、客户端调用createNode方法在locks下建立临时顺序节点,而后调用getChildren(“locks”)来获取locks下面的全部子节点,注意此时不用设置任何Watcher。
三、客户端获取到全部的子节点path以后,若是发现本身建立的子节点序号最小,那么就认为该客户端获取到了锁。
四、若是发现本身建立的节点并不是locks全部子节点中最小的,说明本身尚未获取到锁,此时客户端须要找到比本身小的那个节点,而后对其调用exist()方法,同时对其注册事件监听器。
五、以后,让这个被关注的节点删除,则客户端的Watcher会收到相应通知,此时再次判断本身建立的节点是不是locks子节点中序号最小的,若是是则获取到了锁,若是不是则重复以上步骤继续获取到比本身小的一个节点并注册监听。
2、代码实现
package com.lf.zookeeper.lock; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /* *实现分布式锁 */ public class DestributeLock implements Lock,Watcher{ private ZooKeeper zk = null; private String ROOT_LOCK ="/locks";//定义根节点 private String CURRENT_LOCK;//当前锁 private String WAIT_LOCK ;//等待前一个对象释放锁 private CountDownLatch countDownLatch; public DestributeLock() { try { zk= new ZooKeeper("192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181", 4000, this); //判断根节点是否存在 Stat stat = zk.exists(ROOT_LOCK, false); if(stat==null){ zk.create(ROOT_LOCK, "1".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE , CreateMode.PERSISTENT); } } catch (Exception e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { if(countDownLatch != null){ this.countDownLatch.countDown(); } } @Override public void lock() { if(tryLock()){ System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+",获取锁成功!"); return; } try { waitForLock(WAIT_LOCK);//若是没有得到锁,继续等待 } catch (Exception e) { e.printStackTrace(); } } private boolean waitForLock(String prev) throws Exception, InterruptedException { Stat stat = zk.exists(prev, true); if(stat!=null){ System.out.println(Thread.currentThread().getName()+"->等待"+ROOT_LOCK+prev+"释放锁"); countDownLatch = new CountDownLatch(1); countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"->"+"得到锁成功!"); } return true; } @Override public void lockInterruptibly() throws InterruptedException { // TODO Auto-generated method stub } @Override public Condition newCondition() { // TODO Auto-generated method stub return null; } @Override public boolean tryLock() { // TODO Auto-generated method stub try { CURRENT_LOCK= zk.create(ROOT_LOCK+"/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+",尝试竞争锁!"); //获取根节点下的全部子节点 List<String> childrens = zk.getChildren(ROOT_LOCK, false); SortedSet<String> sortedSet = new TreeSet();//定义一个有序的集合进行排序 for (String children : childrens) { sortedSet.add(ROOT_LOCK+"/"+children); } //获取最小的子节点 String firstNode = sortedSet.first(); SortedSet<String> lessthanMe = sortedSet.headSet(CURRENT_LOCK); if(CURRENT_LOCK.equals(firstNode)){//当前节点和最小锁比较,若是相同,则获取锁成功 return true; } if(!lessthanMe.isEmpty()){ WAIT_LOCK = lessthanMe.last();//获取比当前节点更小的最后一个节点,设置给WAIT_LOCK } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } return false; } @Override public boolean tryLock(long arg0, TimeUnit arg1) throws InterruptedException { // TODO Auto-generated method stub return false; } @Override public void unlock() { System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+"释放锁"); try { zk.delete(CURRENT_LOCK, -1); CURRENT_LOCK = null; zk.close(); } catch (Exception e) { // TODO: handle exception } } }
测试类
package com.lf.zookeeper.lock; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class LockDemo { public static void main(String[] args) throws IOException { CountDownLatch countDownLatch = new CountDownLatch(10); for (int i = 1; i <= 10; i++) { new Thread(()->{ try { countDownLatch.await(); DestributeLock destributeLock = new DestributeLock(); destributeLock.lock(); } catch (Exception e) { e.printStackTrace(); } },"Thread-"+i).start(); countDownLatch.countDown(); } System.in.read(); } }
运行结果
Thread-4->/locks/0000000072,尝试竞争锁! Thread-5->/locks/0000000073,尝试竞争锁! Thread-9->/locks/0000000074,尝试竞争锁! Thread-8->/locks/0000000075,尝试竞争锁! Thread-10->/locks/0000000076,尝试竞争锁! Thread-2->/locks/0000000077,尝试竞争锁! Thread-6->/locks/0000000078,尝试竞争锁! Thread-3->/locks/0000000079,尝试竞争锁! Thread-7->/locks/0000000080,尝试竞争锁! Thread-1->/locks/0000000071,获取锁成功! Thread-4->等待/locks/locks/0000000071释放锁 Thread-5->等待/locks/locks/0000000072释放锁 Thread-9->等待/locks/locks/0000000073释放锁 Thread-8->等待/locks/locks/0000000074释放锁 Thread-10->等待/locks/locks/0000000075释放锁 Thread-2->等待/locks/locks/0000000076释放锁 Thread-6->等待/locks/locks/0000000077释放锁 Thread-3->等待/locks/locks/0000000078释放锁 Thread-7->等待/locks/locks/0000000079释放锁
手动触发watcher事件,释放锁,delete /locks/0000000071
出现 Thread-4->/locks/0000000072,获取锁成功!
3、基于curator的实现分布式锁
代码
package com.lf.zookeeper.lock; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; public class CuratorLockDemo { public static void main(String[] args) { CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().build(); InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/locks");//关注节点 try { interProcessMutex.acquire();//获取锁 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }