zookeeper分布式锁实现

前段时间遇到一个问题,每一个应用启了一个定时任务,可是要求每一个定时任务不能同时操做同一条数据库记录,当时想到的是对表加锁。后面了解到能够经过zookeeper实现分布式锁,查了一些资料,本身总结了一下,写了一个分布式锁。java

实现步骤:node

1. ZooKeeper 调用 create (“lock/sub_lock”)方法来建立一个路径格式为“ lock/sub_lock”的节点,此节点类型为EPHEMERAL_SEQUENTIAL 。顺序自动编号的节点,这种节点会根据当前已近存在的节点数自动加 1。且建立的节点为临时节点,即客户端与服务器端 session 超时,或者链接主动断开时,该节点会自动删除,建立的节点格式为“lock/sub_lock0000000001”。git

2. 在建立的锁节点上调用 getChildren(“lock/sub_lock”)方法,来获取锁目录下的最小编号节点,而且不设置 watch 。github

3. 步骤 2 中获取的节点刚好是步骤 1 中客户端建立的节点,那么此客户端得到此种类型的锁,继续进行后续的操做,当操做完成后,删除当前节点。数据库

4. 客户端在锁目录上调用 exists ()方法,而且设置 watch 来监听锁目录下比本身小一个的连续临时节点是否被删除。apache

5. 若是监听节点已经被删除,重复步骤2。(不能直接得到锁,前一个节点被删除,多是因为客户端断开链接,而不是得到锁以后主动删除的)。服务器

实现代码:session

package com.peach.zk.peachLock;

import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.peach.zk.apacheLock.LockListener;
import com.peach.zk.constant.Constant;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * Created by zengtao on 2016/8/19.
 */
public class DistributedLock {

    private ZooKeeper zk;
    private static Logger logger = LoggerFactory.getLogger(DistributedLock.class);
    private String lockPath = null;
    private LockListener callBack;
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void init(ZooKeeper zk, LockListener lockListener) {
        this.zk = zk;
        this.callBack = lockListener;
    }

    public void getLock() {
        createLock();
        lock();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.error("", e);
        }
    }

    public void unLock() {
        try {
            Stat stat = zk.exists(lockPath, false);
            if (stat != null) {
                zk.delete(lockPath, -1);
                callBack.lockReleased();
            }
        } catch (KeeperException | InterruptedException e) {
            logger.error("", e);
        }
    }

    private synchronized void lock() {
        logger.info("|| in lock");
        List<String> sortedChildren = getSortedChildren();
        try {
            int index = sortedChildren.indexOf(lockPath);
            switch (index) {
                case 0:
                    logger.info("||so happy, I get my lock");
                    callBack.lockAcquired();
                    countDownLatch.countDown();
                    break;
                default:
                    String preChildren = sortedChildren.get(index - 1);
                    Stat stat = zk.exists(preChildren, new NodeChangeWatcher());
                    if (stat == null) {
                        logger.warn("node {} has bean deleted", preChildren);
//                        Thread.sleep(3000);
                        lock();
                    }
            }
        } catch (KeeperException | InterruptedException e) {
            logger.error("", e);
        }
        logger.info("|| out lock");
    }

    private void createLock() {
        try {
            Stat stat = zk.exists(Constant.ZK_LOCK_PATH, false);

            if (stat == null) {
                zk.create(Constant.ZK_LOCK_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            lockPath = zk.create(Constant.ZK_SUBLOCK_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (KeeperException | InterruptedException e) {
            logger.error("|| create znode error", e);
        }
    }

    private List<String> getSortedChildren() {
        //获取按照字母排序的链表
        Ordering<String> order = Ordering.natural();
        List<String> fullpathChildrenList = Lists.newArrayList();
        List<String> sortedChildren = Lists.newArrayList();
        try {
            List<String> childrenList = zk.getChildren(Constant.ZK_LOCK_PATH, false);
            //须要获得children node 的全路径,后面能够匹配该node
            for (String children : childrenList) {
                fullpathChildrenList.add(Constant.ZK_LOCK_PATH + "/" + children);
            }
            sortedChildren = order.sortedCopy(fullpathChildrenList);
        } catch (KeeperException | InterruptedException e) {
            logger.error("||", e);
        }
        return sortedChildren;
    }

    private class NodeChangeWatcher implements Watcher {
        @Override
        public void process(WatchedEvent watchedEvent) {
            if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
                logger.info("|| pre node changed");
                lock();
            }
        }
    }
}
package com.peach.zk.constant;

/**
 * Created by zengtao on 2016/8/16.
 */
public class Constant {
    public static final String ZK_CONNECTION_STRING = "localhost:2181,localhost:2182,localhost:2183";
    public static final int ZK_SESSION_TIMEOUT = 50000;
    public static final String ZK_REGISTRY_PATH = "/registry";
    public static final String ZK_PROVIDER_PATH = ZK_REGISTRY_PATH + "/provider";
    public static final String ZK_LOCK_PATH = "/lock";
    public static final String ZK_SUBLOCK_PATH = ZK_LOCK_PATH + "/sub_lock";

}
package com.peach.zk.apacheLock;

public interface LockListener {

    public void lockAcquired() throws InterruptedException;
    
    public void lockReleased();
}
package com.peach.zk.util;

import com.peach.zk.constant.Constant;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;

/**
 * Created by zengtao on 2016/8/16.
 */
public class ZkUtil {

    private static final Logger logger = LoggerFactory.getLogger(ZkUtil.class);

    private static final CountDownLatch latch = new CountDownLatch(1);
    public static synchronized ZooKeeper connectServer( ) {
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            latch.await();
        } catch (Exception e) {
            logger.error("", e);
        }
        return zk;
    }
}

驱动程序:app

package com.peach.zk.peachLock;

import com.peach.zk.apacheLock.LockListener;
import com.peach.zk.util.ZkUtil;
import org.apache.zookeeper.ZooKeeper;

/**
 * Created by zengtao on 2016/8/19.
 */
public class Driver {

    public static void main(String[] args) throws InterruptedException {
        ZooKeeper zk = ZkUtil.connectServer();
        while (true) {
            System.out.println("while");
            final DistributedLock distributedLock = new DistributedLock();
            distributedLock.init(zk, new LockListener() {
                @Override
                public void lockAcquired() throws InterruptedException {
                    System.out.println("|| get lock, do something");
                    Thread.sleep(10000);
                    distributedLock.unLock();
                }

                @Override
                public void lockReleased() {
                    System.out.println("|| lock released");
                }
            });
            distributedLock.getLock();
        }

    }
}

执行五个进程,结果:分布式

进程1

进程2

进程3

进程4

进程5

执行结果分析:

5个进程都没有在同一时间得到锁,从进程1到进程5都是按照顺序执行,且间隔时间都是固定10s,对于每一个进程自己,都是间隔50s执行。

完整工程代码:https://github.com/zengtaotao3390/zookeeper.git

里面还有RMI+zookeeper 简单的分布式应用实现和apache提供的分布式锁实现。