💛分布式解决方案源码,请帮我点个star哦!
💛原文地址为http://www.javashuo.com/article/p-gquycsdz-c.html,转载请注明出处!html
在单体项目中jvm中的锁便可完成须要,可是微服务、分布式环境下,同一个服务可能部署在多台服务器上,多个jvm之间没法经过经常使用的jvm锁来完成同步操做,须要借用分布式锁来完成上锁、释放锁。例如在订单服务中,咱们须要根据日期来生成订单号流水,就有可能产生相同的时间日期,从而出现重复订单号。(jdk8使用LocalDateTime线程安全,不会存在这样的问题)java
第一种方案实现较为简单,逻辑就是谁建立成功该节点,谁就持有锁,建立失败的本身进行阻塞,A线程先持有锁,B线程获取失败就会阻塞,同时对/lockPath设置监听,A线程执行完操做后删除节点,触发监听器,B线程此时解除阻塞,从新去获取锁。node
咱们模仿原生jdk的lock接口设计,采用模板方法设计模式来编写分布式锁,这样的好处是扩展性强,咱们能够快速切换到redis分布式锁、数据库分布式锁等实现方式。git
建立Lock接口github
public interface Lock { /** * 获取锁 */ void getLock() throws Exception; /** * 释放锁 */ void unlock() throws Exception; }
AbstractTemplateLock抽象类redis
public abstract class AbstractTemplateLock implements Lock { @Override public void getLock() { if (tryLock()) { System.out.println(Thread.currentThread().getName() + "获取锁成功"); } else { //等待 waitLock();//事件监听 若是节点被删除则能够从新获取 //从新获取 getLock(); } } protected abstract void waitLock(); protected abstract boolean tryLock(); protected abstract void releaseLock(); @Override public void unlock() { releaseLock(); } }
zookeeper分布式锁逻辑数据库
@Slf4j public class ZkTemplateLock extends AbstractTemplateLock { private static final String zkServers = "127.0.0.1:2181"; private static final int sessionTimeout = 8000; private static final int connectionTimeout = 5000; private static final String lockPath = "/lockPath"; private ZkClient client; public ZkTemplateLock() { client = new ZkClient(zkServers, sessionTimeout, connectionTimeout); log.info("zk client 链接成功:{}",zkServers); } @Override protected void waitLock() { CountDownLatch latch = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("监听到节点被删除"); latch.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception {} }; //完成 watcher 注册 client.subscribeDataChanges(lockPath, listener); //阻塞本身 if (client.exists(lockPath)) { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //取消watcher注册 client.unsubscribeDataChanges(lockPath, listener); } @Override protected boolean tryLock() { try { client.createEphemeral(lockPath); System.out.println(Thread.currentThread().getName()+"获取到锁"); } catch (Exception e) { log.error("建立失败"); return false; } return true; } @Override public void releaseLock() { client.delete(this.lockPath); } }
缺点:设计模式
每次去竞争锁,都只会有一个线程拿到锁,当线程数庞大时会发生“惊群”现象,zookeeper节点可能会运行缓慢甚至宕机。这是由于其余线程没获取到锁时都会监听/lockPath节点,当A线程释放完毕,海量的线程都同时中止阻塞,去争抢锁,这种操做十分耗费资源,且性能大打折扣。安全
临时顺序节点与临时节点不一样的是产生的节点是有序的,咱们能够利用这一特色,只让当前线程监听上一序号的线程,每次获取锁的时候判断本身的序号是否为最小,最小即获取到锁,执行完毕就删除当前节点继续判断谁为最小序号的节点。
服务器
临时顺序节点操做源码
@Slf4j public class ZkSequenTemplateLock extends AbstractTemplateLock { private static final String zkServers = "127.0.0.1:2181"; private static final int sessionTimeout = 8000; private static final int connectionTimeout = 5000; private static final String lockPath = "/lockPath"; private String beforePath; private String currentPath; private ZkClient client; public ZkSequenTemplateLock() { client = new ZkClient(zkServers); if (!client.exists(lockPath)) { client.createPersistent(lockPath); } log.info("zk client 链接成功:{}",zkServers); } @Override protected void waitLock() { CountDownLatch latch = new CountDownLatch(1); IZkDataListener listener = new IZkDataListener() { @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("监听到节点被删除"); latch.countDown(); } @Override public void handleDataChange(String dataPath, Object data) throws Exception {} }; //给排在前面的节点增长数据删除的watcher,本质是启动另外一个线程去监听上一个节点 client.subscribeDataChanges(beforePath, listener); //阻塞本身 if (client.exists(beforePath)) { try { System.out.println("阻塞"+currentPath); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //取消watcher注册 client.unsubscribeDataChanges(beforePath, listener); } @Override protected boolean tryLock() { if (currentPath == null) { //建立一个临时顺序节点 currentPath = client.createEphemeralSequential(lockPath + "/", "lock-data"); System.out.println("current:" + currentPath); } //得到全部的子节点并排序。临时节点名称为自增加的字符串 List<String> childrens = client.getChildren(lockPath); //排序list,按天然顺序排序 Collections.sort(childrens); if (currentPath.equals(lockPath + "/" + childrens.get(0))) { return true; } else { //若是当前节点不是排第一,则获取前面一个节点信息,赋值给beforePath int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1)); beforePath = lockPath + "/" + childrens.get(curIndex - 1); } System.out.println("beforePath"+beforePath); return false; } @Override public void releaseLock() { System.out.println("delete:" + currentPath); client.delete(currentPath); } }
curator提供了如下种类的锁:
咱们采用第一种Shared Reentrant Lock中的InterProcessMutex
来完成上锁、释放锁的的操做
public class ZkLockWithCuratorTemplate implements Lock { // zk host地址 private String host = "localhost"; // zk自增存储node private String lockPath = "/curatorLock"; // 重试休眠时间 private static final int SLEEP_TIME_MS = 1000; // 最大重试1000次 private static final int MAX_RETRIES = 1000; //会话超时时间 private static final int SESSION_TIMEOUT = 30 * 1000; //链接超时时间 private static final int CONNECTION_TIMEOUT = 3 * 1000; //curator核心操做类 private CuratorFramework curatorFramework; InterProcessMutex lock; public ZkLockWithCuratorTemplate() { curatorFramework = CuratorFrameworkFactory.builder() .connectString(host) .connectionTimeoutMs(CONNECTION_TIMEOUT) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME_MS, MAX_RETRIES)) .build(); curatorFramework.start(); lock = new InterProcessMutex (curatorFramework, lockPath); } @Override public void getLock() throws Exception { //5s后超时释放锁 lock.acquire(5, TimeUnit.SECONDS); } @Override public void unlock() throws Exception { lock.release(); } }
源码以及测试类地址
https://github.com/Motianshi/distribute-tool