随着数据量的增大,用户的增多,系统的并发访问愈来愈大,传统的单机已经知足不了需求,分布式系统成为一种必然的趋势。分布式系统错综复杂,今天,咱们着重对分布式系统的互斥性与幂等性进行分析与解决。java
互斥性问题也就是共享资源的抢占问题。如何解决呢?也就是锁,保证对共享资源的串行化访问。互斥性要如何实现?。在java中,最经常使用的是synchronized和lock这两种内置的锁,但这只适用于单进程中的多线程。对于在同一操做系统下的多个进程间,常见的锁实现有pv信号量等。然而,当问题扩展到多台机器的多个操做系统时,也就是分布式锁,状况就复杂多了。node
今天重点讲解使用zookeeper实现分布式锁。我的感受zookeeper是最适合实现分布式锁。它的几个特性:apache
zk实现分布式锁的流程以下
我这里用zk实现了一个可重入的、阻塞的、公平的分布式锁,代码以下:多线程
package locks; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import utils.ZkUtils; import watcher.PredecessorNodeWatcher; import watcher.SessionWatcher; import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * Created by huangwt on 2018/3/21. */ @Slf4j public class ReentrantZKLock { private final static String BASE_NODE = "/baseNode"; private final static String CHILDREN_NODE = "/node_"; private final Lock localLock; private final Condition condition; //用于重入检测 private static ThreadLocal<AtomicInteger> threadLocal = new ThreadLocal<AtomicInteger>(); private ZooKeeper zooKeeper = null; private String node = null; ReentrantZKLock(String addr, int timeout) { try { zooKeeper = new ZooKeeper(addr, timeout, new SessionWatcher()); localLock = new ReentrantLock(); condition = localLock.newCondition(); } catch (IOException e) { log.error("get zookeeper failed", e); throw new RuntimeException(e); } } public void lock() { //重入检测 if (checkReentrant()) { return; } try { node = zooKeeper.create(BASE_NODE + CHILDREN_NODE, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); while (true) { localLock.lock(); try { List<String> childrenNodes = zooKeeper.getChildren(BASE_NODE, false); ZkUtils.childNodeSort(childrenNodes); //当前节点的索引 int myNodeIndex = childrenNodes.indexOf(node); //当前节点的前一个节点 int beforeNodeIndex = myNodeIndex - 1; Stat stat = null; while (beforeNodeIndex >= 0) { stat = zooKeeper.exists(childrenNodes.get(beforeNodeIndex), new PredecessorNodeWatcher(condition)); if (stat != null) { break; } } if (stat != null) { //前序节点存在,等待前序节点被删除,释放锁 condition.await(); } else { // 获取到锁 threadLocal.set(new AtomicInteger(1)); return; } } finally { localLock.unlock(); } } } catch (Exception e) { log.error("lock failed", e); throw new RuntimeException(e); } } public void unlock() { AtomicInteger times = threadLocal.get(); if (times == null) { return; } if (times.decrementAndGet() == 0) { threadLocal.remove(); try { zooKeeper.delete(node, -1); } catch (Exception e) { log.error("unlock faild", e); throw new RuntimeException(e); } } } private boolean checkReentrant() { AtomicInteger times = threadLocal.get(); if (times != null) { times.incrementAndGet(); return true; } return false; } }
package utils; import java.util.Collections; import java.util.Comparator; import java.util.List; /** * Created by huangwt on 2018/3/24. */ public class ZkUtils { /** * 对子节点排序 * * @param node */ public static void childNodeSort(List<String> node) { Collections.sort(node, new ChildNodeCompare()); } private static class ChildNodeCompare implements Comparator<String> { public int compare(String childNode1, String childNode2) { return childNode1.compareTo(childNode2); } } }
package watcher; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.locks.Condition; /** * Created by huangwt on 2018/3/24. */ public class PredecessorNodeWatcher implements Watcher { private Condition condition = null; public PredecessorNodeWatcher(Condition condition) { this.condition = condition; } public void process(WatchedEvent event) { //前序节点被删除,锁被释放,唤醒当前等待线程 if(event.getType() == Event.EventType.NodeDeleted){ condition.signal(); } } }
package watcher; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; /** * Created by huangwt on 2018/3/24. */ @Slf4j public class SessionWatcher implements Watcher { public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { log.info("get zookeeper success"); } } }
主要是使用了ThreadLocal实现了锁的可重入性,使用watch机制实现了阻塞锁,使用临时节点实现的公平锁。
这段代码只是一个demo供你们参考,还有不少问题没解决。好比当zookper挂掉的时候,阻塞的线程就没法被唤醒,这时候就须要监听zk的心跳。并发
幂等性是系统接口对外的一种承诺,数学表达为:f(f(x)) = f(x)。
幂等性指的是,使用相同参数对同一资源重复调用某个接口的结果与调用一次的结果相同。分布式
假设如今有一个方法 :Boolean withdraw(account_id, amount) ,做用是从account_id对应的帐户中扣除amount数额的钱,若是扣除成功则返回true,帐户余额减小amount; 若是扣除失败则返回false,帐户余额不变。
如以上流程,接口没法幂等,可能致使重复扣款。性能