ZooKeeper 分布式共享锁的实现

原创播客,如需转载请注明出处。原文地址:http://www.cnblogs.com/crawl/p/8352919.html html

----------------------------------------------------------------------------------------------------------------------------------------------------------java

笔记中提供了大量的代码示例,须要说明的是,大部分代码示例都是本人所敲代码并进行测试,不足之处,请你们指正~git

本博客中全部言论仅表明博主本人观点,如有疑惑或者须要本系列分享中的资料工具,敬请联系 qingqing_crawl@163.comgithub

GitHub:https://github.com/QingqingQiapache

-----------------------------------------------------------------------------------------------------------------------------------------------------------服务器

前言:ZooKeeper 是提供少许数据存储和管理的分布式协调服务。适合存储状态管理信息,能够进行数据的读写,同步,提供对数据节点的监听功能。利用 ZooKeeper 能够实现不少功能,好比:Hadoop2.0,使用 Zookeeper 的事件处理确保整个集群只有一个活跃的 NameNode,存储配置信息等;能够利用 ZooKeeper 感知集群中哪台主机宕机或者下线等等。今天介绍另外一个经常使用的功能,利用 Zookeeper 实现分布式共享锁。session

1、简要介绍

利用 Zookeeper 实现分布式共享锁,能够作到一次只有指定个数的客户端访问服务器的某些资源。分布式

2、实现步骤

利用 Zookeeper 实现分布式共享锁的步骤大体能够分为如下几步:ide

1. 客户端上线即向 Zookeeper 注册,建立一把锁工具

2. 判断是否只有一个客户端工做,若只有一个客户端工做,此客户端能够处理业务

3. 获取父节点下注册的全部锁,经过判断本身是不是号码最小的那一把锁,如果则能够处理业务,不然等待

值的注意的是,在某一客户端获取到锁处理完业务后,必须释放锁

3、实现代码

1. 新建一个 DistributedLock 类

private ZooKeeper zkClient = null;
    
    //链接字符串
    private static final String connectString = "zookeeper01:2181,zookeeper02:2181,zookeeper03:2181";
    
    //超时时间
    private static final int sessionTimeout = 2000;
    
    //父节点
    private static final String parentNode = "/locks";
    
    //记录本身建立子节点的路径
    private volatile String thisPath;
    
    public static void main(String[] args) throws Exception {
        //1.获取 ZooKeeper 的客户端链接
        DistributedLock distLock = new DistributedLock();
        distLock.getZKClient();
        
        //2.注册一把锁
        distLock.regiestLock();
        
        //3.监听父节点,判断是否只有本身在线
        distLock.watchParent();
 }

2. main 方法中定义了三个方法

1)getZKClient():用来获取 Zookeeper 客户端的链接

其中 process 方法是当监听节点发生变化时调用,其中获取定义的父节点的全部子节点,而后判断当前节点是不是最小节点,如果则进行业务逻辑处理阶段,并从新注册一把新的锁

//获取 zk 客户端
    public void getZKClient() throws Exception {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            
            @Override
            public void process(WatchedEvent event) {
                //判断事件类型,只处理子节点变化事件
                if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(parentNode)) {
                    try {
                        List<String> childrens = zkClient.getChildren(parentNode, true);
                        //判断本身是不是最小的
                        String thisNode = thisPath.substring((parentNode + "/").length());
                        Collections.sort(childrens);
                        if(childrens.indexOf(thisNode) == 0){
                            //处理业务逻辑
                            dosomething();
                            //从新注册一把新的锁
                            thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
}

2)main 中的第二个方法是 rediestLock()

调用 Zookeeper 客户端的 create() 方法,创建一个新的节点

//注册一把锁
    public void regiestLock() throws Exception {
        thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

3)第三个是 watchParent() 方法

在此方法中判断是否只有一个节点在线,若只有本身一个节点,则调用业务处理的方法

//监听父节点,判断是否只有本身在线
    public void watchParent() throws Exception {
        List<String> childrens = zkClient.getChildren(parentNode, true);
        if (childrens != null && childrens.size() == 1) {
            //只有本身在线,处理业务逻辑(处理完业务逻辑,必须删释放锁)
            dosomething();
        } else {
            //不是只有本身在线,说明别人已经获取到锁,等待
            Thread.sleep(Long.MAX_VALUE);
        }
    }

4)最后一个是自定义的业务逻辑方法

须要注意的是,当处理完业务逻辑后,必须释放锁

//业务逻辑方法,注意:须要在最后释放锁
    public void dosomething() throws Exception {
        System.out.println("或获得锁:" + thisPath);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("释放锁:" + thisPath);
            zkClient.delete(thisPath, -1);
        }
    }

3. 最后贴一下所有代码

package com.software.bigdata.zkdistlock;

import java.util.Collections;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

/**
 * @Description: 分布式共享锁
 * 
 * @author Crawl
 * @date 2018年1月25日 下午5:02:42
 */
public class DistributedLock {
    
    private ZooKeeper zkClient = null;
    
    //链接字符串
    private static final String connectString = "zookeeper01:2181,zookeeper02:2181,zookeeper03:2181";
    
    //超时时间
    private static final int sessionTimeout = 2000;
    
    //父节点
    private static final String parentNode = "/locks";
    
    //记录本身建立子节点的路径
    private volatile String thisPath;
    
    public static void main(String[] args) throws Exception {
        //1.获取 ZooKeeper 的客户端链接
        DistributedLock distLock = new DistributedLock();
        distLock.getZKClient();
        
        //2.注册一把锁
        distLock.regiestLock();
        
        //3.监听父节点,判断是否只有本身在线
        distLock.watchParent();
    }
    
    //业务逻辑方法,注意:须要在最后释放锁
    public void dosomething() throws Exception {
        System.out.println("或获得锁:" + thisPath);
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("释放锁:" + thisPath);
            zkClient.delete(thisPath, -1);
        }
    }
    
    //监听父节点,判断是否只有本身在线
    public void watchParent() throws Exception {
        List<String> childrens = zkClient.getChildren(parentNode, true);
        if (childrens != null && childrens.size() == 1) {
            //只有本身在线,处理业务逻辑(处理完业务逻辑,必须删释放锁)
            dosomething();
        } else {
            //不是只有本身在线,说明别人已经获取到锁,等待
            Thread.sleep(Long.MAX_VALUE);
        }
    }
    
    //注册一把锁
    public void regiestLock() throws Exception {
        thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }
    
    //获取 zk 客户端
    public void getZKClient() throws Exception {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            
            @Override
            public void process(WatchedEvent event) {
                //判断事件类型,只处理子节点变化事件
                if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(parentNode)) {
                    try {
                        List<String> childrens = zkClient.getChildren(parentNode, true);
                        //判断本身是不是最小的
                        String thisNode = thisPath.substring((parentNode + "/").length());
                        Collections.sort(childrens);
                        if(childrens.indexOf(thisNode) == 0){
                            //处理业务逻辑
                            dosomething();
                            //从新注册一把新的锁
                            thisPath = zkClient.create(parentNode + "/lock", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }

}
相关文章
相关标签/搜索