ZooKeeper(七)-- ZK原生API实现分布式锁

1、使用场景

在分布式应用,每每存在多个进程提供同一服务。这些进程有可能在相同的机器上,也有可能分布在不一样的机器上。 若是这些进程共享了一些资源,可能就须要分布式锁来锁定对这些资源的访问。java

2、实现分布式锁结构图

img

3、代码实现

package com.xbq.zookeeper.javaApi;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/** * 使用Zookeeper原生API实现分布式锁 * @author xbq */
public class ZookeeperLock implements Watcher{

    // 声明zk对象
    private ZooKeeper zk = null;
    // 此demo使用的集群,因此有多个ip和端口
    private static String CONNECT_SERVER = "192.168.242.130:2181,192.168.242.130:2182,192.168.242.130:2183";
    // session过时时间
    private static int SESSION_TIMEOUT = 3000;
    // 根节点
    private String root = "/locks";
    // 当前等待的节点
    private String waitNode;
    // 等待的时间
    private int waitTime;
    // 锁节点
    private String myzkNode;
    // 计数器
    private CountDownLatch latch;
    
    /** * 构造函数 初始化 */
    public ZookeeperLock(){
        try {
            zk = new ZooKeeper(CONNECT_SERVER, SESSION_TIMEOUT, this);
            // 判断是否存在根节点,不须要监听根节点
            Stat stat = zk.exists(root, false);
            // 为空,说明不存在
            if(stat == null){
                // 添加一个永久节点,即添加一个根节点
                zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /** * 尝试获取锁 * @return */
    private boolean tryLock(){
        String splitStr = "lock_";  // 格式 lock_000000001
        try {
            // 建立一个临时有序节点,并赋值给 myzkNode
            myzkNode = zk.create(root + "/" + splitStr, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 获取根节点下的全部子节点
            List<String> children = zk.getChildren(root, false);
            // 对子节点 排序
            Collections.sort(children);
            // 若是刚刚建立的节点 等于 获取最小的一个子节点,则说明 获取到锁
            if(myzkNode.equals(root + "/" + children.get(0))){
                return true;
            }
            // 若是刚刚建立的节点 不等于 获取到的最小的一个子节点,则 监控比本身小的一个节点
            // 获取刚刚创建的子节点(不包含根节点的子节点)
            String childNode = myzkNode.substring(myzkNode.lastIndexOf("/") + 1);
            // 获取比本身小的节点
            waitNode = children.get(Collections.binarySearch(children, childNode) - 1);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }
    
    /** * 等待锁释放 * @param waitNode * @param waidTime * @return * @throws KeeperException * @throws InterruptedException */
    private boolean waitLock(String waitNode, int waidTime) throws KeeperException, InterruptedException{
        // 判断比本身小的节点是否存在,并添加监听
        Stat stat = zk.exists(root + "/" + waitNode, true);
        // 若是存在 比本身小的节点
        if(stat != null){
            this.latch = new CountDownLatch(1);
            this.latch.await(waidTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }
    
    /** * 获取锁 */
    public void lock(){
        // 若是尝试获取锁成功
        if(tryLock()){
            // 获取 成功
            System.out.println("Thread" + Thread.currentThread().getName() + " -- hold lock!");
            return;
        }
        // 等待并获取锁
        try {
            waitLock(waitNode, waitTime);
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    /** * 解锁 */
    public void unLock(){
        try {
            zk.delete(myzkNode, -1);
            zk.close();
            System.out.println("Thread" + Thread.currentThread().getName() +" unlock success! 锁节点:" + myzkNode);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /** * 删除的时候 触发事件 */
    @Override
    public void process(WatchedEvent event) {
        // 若是计数器不为空的话,释放计数器锁
        if(this.latch != null){
            this.latch.countDown();
        }
    }
    
    /** * 测试 * @param args */
    public static void main(String[] args) {
        // 定义线程池
        ExecutorService service = Executors.newCachedThreadPool();
        // 只能10个线程同时运行,即模拟并发数为10
        final Semaphore semaphore = new Semaphore(10);
        // 10个客户端链接
        for(int i=0;i<10;i++){
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        ZookeeperLock zkLock = new ZookeeperLock();
                        zkLock.lock();
                        // 业务操做代码
                        Thread.sleep(3000);
                        zkLock.unLock();
                        semaphore.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        service.shutdown();
    }
}
复制代码

欢迎关注个人公众号,第一时间接收最新文章~ 搜索公众号: 码咖 或者 扫描下方二维码:

img
相关文章
相关标签/搜索