zookeeper 实现分布式锁

前言:

单机应用架构中,秒杀案例使用ReentrantLcok或者synchronized来达到操做业务互斥的目的。然而在分布式系统中,会存在多台机器并行去实现同一个功能。也就是说,在多进程中,若是还使用以上JDK提供的进程锁,来并发访问数据库资源就可能会出现数据重复、不一致的状况。所以,须要咱们来实现本身的分布式锁。数据库

实现一个分布式锁应该具有的特性:缓存

  • 高可用、高性能的获取锁与释放锁
  • 在分布式系统环境下,一个方法或者变量同一时间只能被一个线程操做
  • 具有锁失效机制,网络中断或宕机没法释放锁时,锁必须被删除,防止死锁
  • 具有阻塞锁特性,即没有获取到锁,则继续等待获取锁
  • 具有非阻塞锁特性,即没有获取到锁,则直接返回获取锁失败
  • 具有可重入特性,一个线程中能够屡次获取同一把锁,好比一个线程在执行一个带锁的方法,该方法中又调用了另外一个须要相同锁的方法,则该线程能够直接执行调用的方法,而无需从新得到锁

分布式锁几种实现方式:bash

  • 基于数据库实现分布式锁
  • 基于 Redis 实现分布式锁
  • 基于 Zookeeper 实现分布式锁

前两种对于分布式生产环境来讲并非特别推荐,高并发下数据库锁性能太差,Redis在锁时间限制和缓存一致性存在必定问题。重点实现一下 Zookeeper 如何实现分布式锁。网络

实现原理

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能存在惟一文件名。session

å¾çæºèªç½ç»

数据模型架构

  • PERSISTENT 持久化节点,节点建立后,不会由于会话失效而消失
  • EPHEMERAL 临时节点, 客户端session超时此类节点就会被自动删除
  • EPHEMERAL_SEQUENTIAL 临时自动编号节点
  • PERSISTENT_SEQUENTIAL 顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1

监视器(watcher)并发

当建立一个节点时,能够注册一个该节点的监视器,当节点状态发生改变时,watch被触发时,ZooKeeper将会向客户端发送且仅发送一条通知,由于watch只能被触发一次。分布式

根据zookeeper的这些特性,咱们来看看如何利用这些特性来实现分布式锁:ide

  • 建立一个锁目录lock
  • 线程A获取锁会在lock目录下,建立临时顺序节点
  • 获取锁目录下全部的子节点,而后获取比本身小的兄弟节点,若是不存在,则说明当前线程顺序号最小,得到锁
  • 线程B建立临时节点并获取全部兄弟节点,判断本身不是最小节点,设置监听(watcher)比本身次小的节点(只关注比本身次小的节点是为了防止发生“羊群效应”)
  • 线程A处理完,删除本身的节点,线程B监听到变动事件,判断本身是最小的节点,得到锁

代码实现

尽管ZooKeeper已经封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。可是若是让一个普通开发者去手撸一个分布式锁仍是比较困难的,在秒杀案例中咱们直接使用 Apache 开源的curator 开实现 Zookeeper 分布式锁。 高并发

同时参考:blog.csdn.net/u011663149/…

/**
 * 基于curator的zookeeper分布式锁
 */
public class CuratorUtil {
    private static String address = "192.168.1.180:2181";
    
    public static void main(String[] args) {
        //一、重试策略:初试时间为1s 重试3次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 
        //二、经过工厂建立链接
        CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
        //三、开启链接
        client.start();
        //4 分布式锁
        final InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock"); 
        //读写锁
        //InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter");
        
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        
        for (int i = 0; i < 5; i++) {
            fixedThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    boolean flag = false;
                    try {
                        //尝试获取锁,最多等待5秒
                        flag = mutex.acquire(5, TimeUnit.SECONDS);
                        Thread currentThread = Thread.currentThread();
                        if(flag){
                            System.out.println("线程"+currentThread.getId()+"获取锁成功");
                        }else{
                            System.out.println("线程"+currentThread.getId()+"获取锁失败");
                        }
                        //模拟业务逻辑,延时4秒
                        Thread.sleep(4000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally{
                        if(flag){
                            try {
                                mutex.release();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        }
    }
}复制代码

这里咱们开启5个线程,每一个线程获取锁的最大等待时间为5秒,为了模拟具体业务场景,方法中设置4秒等待时间。开始执行main方法,经过ZooInspector或者zk-ui 可视化监控/curator/lock下的节点以下图:

观察控制台,咱们会发现只有两个线程获取锁成功,另外三个线程超时获取锁失败会自动删除节点。线程执行完毕咱们刷新一下/curator/lock节点,发现刚才建立的五个子节点已经不存在了。

相关文章
相关标签/搜索