前段时间遇到一个问题,每一个应用启了一个定时任务,可是要求每一个定时任务不能同时操做同一条数据库记录,当时想到的是对表加锁。后面了解到能够经过zookeeper实现分布式锁,查了一些资料,本身总结了一下,写了一个分布式锁。java
实现步骤:node
1. ZooKeeper 调用 create (“lock/sub_lock”)方法来建立一个路径格式为“ lock/sub_lock”的节点,此节点类型为EPHEMERAL_SEQUENTIAL 。顺序自动编号的节点,这种节点会根据当前已近存在的节点数自动加 1。且建立的节点为临时节点,即客户端与服务器端 session 超时,或者链接主动断开时,该节点会自动删除,建立的节点格式为“lock/sub_lock0000000001”。git
2. 在建立的锁节点上调用 getChildren(“lock/sub_lock”)方法,来获取锁目录下的最小编号节点,而且不设置 watch 。github
3. 步骤 2 中获取的节点刚好是步骤 1 中客户端建立的节点,那么此客户端得到此种类型的锁,继续进行后续的操做,当操做完成后,删除当前节点。数据库
4. 客户端在锁目录上调用 exists ()方法,而且设置 watch 来监听锁目录下比本身小一个的连续临时节点是否被删除。apache
5. 若是监听节点已经被删除,重复步骤2。(不能直接得到锁,前一个节点被删除,多是因为客户端断开链接,而不是得到锁以后主动删除的)。服务器
实现代码:session
package com.peach.zk.peachLock; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.peach.zk.apacheLock.LockListener; import com.peach.zk.constant.Constant; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.concurrent.CountDownLatch; /** * Created by zengtao on 2016/8/19. */ public class DistributedLock { private ZooKeeper zk; private static Logger logger = LoggerFactory.getLogger(DistributedLock.class); private String lockPath = null; private LockListener callBack; private CountDownLatch countDownLatch = new CountDownLatch(1); public void init(ZooKeeper zk, LockListener lockListener) { this.zk = zk; this.callBack = lockListener; } public void getLock() { createLock(); lock(); try { countDownLatch.await(); } catch (InterruptedException e) { logger.error("", e); } } public void unLock() { try { Stat stat = zk.exists(lockPath, false); if (stat != null) { zk.delete(lockPath, -1); callBack.lockReleased(); } } catch (KeeperException | InterruptedException e) { logger.error("", e); } } private synchronized void lock() { logger.info("|| in lock"); List<String> sortedChildren = getSortedChildren(); try { int index = sortedChildren.indexOf(lockPath); switch (index) { case 0: logger.info("||so happy, I get my lock"); callBack.lockAcquired(); countDownLatch.countDown(); break; default: String preChildren = sortedChildren.get(index - 1); Stat stat = zk.exists(preChildren, new NodeChangeWatcher()); if (stat == null) { logger.warn("node {} has bean deleted", preChildren); // Thread.sleep(3000); lock(); } } } catch (KeeperException | InterruptedException e) { logger.error("", e); } logger.info("|| out lock"); } private void createLock() { try { Stat stat = zk.exists(Constant.ZK_LOCK_PATH, false); if (stat == null) { zk.create(Constant.ZK_LOCK_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } lockPath = zk.create(Constant.ZK_SUBLOCK_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } catch (KeeperException | InterruptedException e) { logger.error("|| create znode error", e); } } private List<String> getSortedChildren() { //获取按照字母排序的链表 Ordering<String> order = Ordering.natural(); List<String> fullpathChildrenList = Lists.newArrayList(); List<String> sortedChildren = Lists.newArrayList(); try { List<String> childrenList = zk.getChildren(Constant.ZK_LOCK_PATH, false); //须要获得children node 的全路径,后面能够匹配该node for (String children : childrenList) { fullpathChildrenList.add(Constant.ZK_LOCK_PATH + "/" + children); } sortedChildren = order.sortedCopy(fullpathChildrenList); } catch (KeeperException | InterruptedException e) { logger.error("||", e); } return sortedChildren; } private class NodeChangeWatcher implements Watcher { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.NodeDeleted) { logger.info("|| pre node changed"); lock(); } } } }
package com.peach.zk.constant; /** * Created by zengtao on 2016/8/16. */ public class Constant { public static final String ZK_CONNECTION_STRING = "localhost:2181,localhost:2182,localhost:2183"; public static final int ZK_SESSION_TIMEOUT = 50000; public static final String ZK_REGISTRY_PATH = "/registry"; public static final String ZK_PROVIDER_PATH = ZK_REGISTRY_PATH + "/provider"; public static final String ZK_LOCK_PATH = "/lock"; public static final String ZK_SUBLOCK_PATH = ZK_LOCK_PATH + "/sub_lock"; }
package com.peach.zk.apacheLock; public interface LockListener { public void lockAcquired() throws InterruptedException; public void lockReleased(); }
package com.peach.zk.util; import com.peach.zk.constant.Constant; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; /** * Created by zengtao on 2016/8/16. */ public class ZkUtil { private static final Logger logger = LoggerFactory.getLogger(ZkUtil.class); private static final CountDownLatch latch = new CountDownLatch(1); public static synchronized ZooKeeper connectServer( ) { ZooKeeper zk = null; try { zk = new ZooKeeper(Constant.ZK_CONNECTION_STRING, Constant.ZK_SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); latch.await(); } catch (Exception e) { logger.error("", e); } return zk; } }
驱动程序:app
package com.peach.zk.peachLock; import com.peach.zk.apacheLock.LockListener; import com.peach.zk.util.ZkUtil; import org.apache.zookeeper.ZooKeeper; /** * Created by zengtao on 2016/8/19. */ public class Driver { public static void main(String[] args) throws InterruptedException { ZooKeeper zk = ZkUtil.connectServer(); while (true) { System.out.println("while"); final DistributedLock distributedLock = new DistributedLock(); distributedLock.init(zk, new LockListener() { @Override public void lockAcquired() throws InterruptedException { System.out.println("|| get lock, do something"); Thread.sleep(10000); distributedLock.unLock(); } @Override public void lockReleased() { System.out.println("|| lock released"); } }); distributedLock.getLock(); } } }
执行五个进程,结果:分布式
进程1
进程2
进程3
进程4
进程5
执行结果分析:
5个进程都没有在同一时间得到锁,从进程1到进程5都是按照顺序执行,且间隔时间都是固定10s,对于每一个进程自己,都是间隔50s执行。
完整工程代码:https://github.com/zengtaotao3390/zookeeper.git
里面还有RMI+zookeeper 简单的分布式应用实现和apache提供的分布式锁实现。