工做中须要写一个定时任务,因为是集群环境,天然而然想到须要经过分布式锁来保证单台执行..相信你们都会想到使用zk来实现对应的分布式锁.下面就简单介绍一下几种实现java
准备工做
有几个帮助类,先把代码放上来apache
ZKClient 对zk的操做作了一个简单的封装服务器
Java代码 session
- package zk.lock;
-
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
- import zk.util.ZKUtil;
-
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
-
- /**
- * User: zhenghui
- * Date: 14-3-26
- * Time: 下午8:50
- * 封装一个zookeeper实例.
- */
- public class ZKClient implements Watcher {
-
- private ZooKeeper zookeeper;
-
- private CountDownLatch connectedSemaphore = new CountDownLatch(1);
-
-
- public ZKClient(String connectString, int sessionTimeout) throws Exception {
- zookeeper = new ZooKeeper(connectString, sessionTimeout, this);
- System.out.println("connecting zk server");
- if (connectedSemaphore.await(10l, TimeUnit.SECONDS)) {
- System.out.println("connect zk server success");
- } else {
- System.out.println("connect zk server error.");
- throw new Exception("connect zk server error.");
- }
- }
-
- public void close() throws InterruptedException {
- if (zookeeper != null) {
- zookeeper.close();
- }
- }
-
- public void createPathIfAbsent(String path, boolean isPersistent) throws Exception {
- CreateMode createMode = isPersistent ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL;
- path = ZKUtil.normalize(path);
- if (!this.exists(path)) {
- zookeeper.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, createMode);
- }
- }
-
- public boolean exists(String path) throws Exception {
- path = ZKUtil.normalize(path);
- Stat stat = zookeeper.exists(path, null);
- return stat != null;
- }
-
- public String getData(String path) throws Exception {
- path = ZKUtil.normalize(path);
- try {
- byte[] data = zookeeper.getData(path, null, null);
- return new String(data);
- } catch (KeeperException e) {
- if (e instanceof KeeperException.NoNodeException) {
- throw new Exception("Node does not exist,path is [" + e.getPath() + "].", e);
- } else {
- throw new Exception(e);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new Exception(e);
- }
- }
-
- @Override
- public void process(WatchedEvent event) {
- if (event == null) return;
-
- // 链接状态
- Watcher.Event.KeeperState keeperState = event.getState();
- // 事件类型
- Watcher.Event.EventType eventType = event.getType();
- // 受影响的path
- // String path = event.getPath();
- if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
- // 成功链接上ZK服务器
- if (Watcher.Event.EventType.None == eventType) {
- System.out.println("zookeeper connect success");
- connectedSemaphore.countDown();
- }
- }
- //下面能够作一些重连的工做.
- else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
- System.out.println("zookeeper Disconnected");
- } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
- System.out.println("zookeeper AuthFailed");
- } else if (Watcher.Event.KeeperState.Expired == keeperState) {
- System.out.println("zookeeper Expired");
- }
- }
- }
ZKUtil 针对zk路径的一个工具类mybatis
Java代码 mvc
- package zk.util;
-
- /**
- * User: zhenghui
- * Date: 14-3-26
- * Time: 下午9:56
- */
- public class ZKUtil {
-
- public static final String SEPARATOR = "/";
-
- /**
- * 转换path为zk的标准路径 以/开头,最后不带/
- */
- public static String normalize(String path) {
- String temp = path;
-
- if(!path.startsWith(SEPARATOR)) {
- temp = SEPARATOR + path;
- }
- if(path.endsWith(SEPARATOR)) {
- temp = temp.substring(0, temp.length()-1);
- return normalize(temp);
- }else {
- return temp;
- }
- }
-
- /**
- * 连接两个path,并转化为zk的标准路径
- */
- public static String contact(String path1,String path2){
- if(path2.startsWith(SEPARATOR)) {
- path2 = path2.substring(1);
- }
- if(path1.endsWith(SEPARATOR)) {
- return normalize(path1 + path2);
- } else {
- return normalize(path1 + SEPARATOR + path2);
- }
- }
-
- /**
- * 字符串转化成byte类型
- */
- public static byte[] toBytes(String data) {
- if(data == null || data.trim().equals("")) return null;
- return data.getBytes();
- }
- }
NetworkUtil 获取本机IP的工具方法框架
Java代码 分布式
- package zk.util;
-
- import java.net.InetAddress;
- import java.net.NetworkInterface;
- import java.util.Enumeration;
-
- /**
- * User: zhenghui
- * Date: 14-4-1
- * Time: 下午4:47
- */
- public class NetworkUtil {
-
- static private final char COLON = ':';
-
- /**
- * 获取当前机器ip地址
- * 听说多网卡的时候会有问题.
- */
- public static String getNetworkAddress() {
- Enumeration<NetworkInterface> netInterfaces;
- try {
- netInterfaces = NetworkInterface.getNetworkInterfaces();
- InetAddress ip;
- while (netInterfaces.hasMoreElements()) {
- NetworkInterface ni = netInterfaces
- .nextElement();
- Enumeration<InetAddress> addresses=ni.getInetAddresses();
- while(addresses.hasMoreElements()){
- ip = addresses.nextElement();
- if (!ip.isLoopbackAddress()
- && ip.getHostAddress().indexOf(COLON) == -1) {
- return ip.getHostAddress();
- }
- }
- }
- return "";
- } catch (Exception e) {
- return "";
- }
- }
- }
--------------------------- 正文开始 -----------------------------------ide
这种实现很是简单,具体的流程以下工具

对应的实现以下
Java代码
- package zk.lock;
-
-
- import zk.util.NetworkUtil;
- import zk.util.ZKUtil;
-
- /**
- * User: zhenghui
- * Date: 14-3-26
- * Time: 下午8:37
- * 分布式锁实现.
- *
- * 这种实现的原理是,建立某一个任务的节点,好比 /lock/tasckname 而后获取对应的值,若是是当前的Ip,那么得到锁,若是不是,则没得到
- * .若是该节点不存在,则建立该节点,并把改节点的值设置成当前的IP
- */
- public class DistributedLock01 {
-
- private ZKClient zkClient;
-
-
- public static final String LOCK_ROOT = "/lock";
- private String lockName;
-
-
- public DistributedLock01(String connectString, int sessionTimeout,String lockName) throws Exception {
- //先建立zk连接.
- this.createConnection(connectString,sessionTimeout);
-
- this.lockName = lockName;
- }
-
- public boolean tryLock(){
- String path = ZKUtil.contact(LOCK_ROOT,lockName);
- String localIp = NetworkUtil.getNetworkAddress();
- try {
- if(zkClient.exists(path)){
- String ownnerIp = zkClient.getData(path);
- if(localIp.equals(ownnerIp)){
- return true;
- }
- } else {
- zkClient.createPathIfAbsent(path,false);
- if(zkClient.exists(path)){
- String ownnerIp = zkClient.getData(path);
- if(localIp.equals(ownnerIp)){
- return true;
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return false;
- }
-
-
- /**
- * 建立zk链接
- *
- */
- protected void createConnection(String connectString, int sessionTimeout) throws Exception {
- if(zkClient != null){
- releaseConnection();
- }
- zkClient = new ZKClient(connectString,sessionTimeout);
- zkClient.createPathIfAbsent(LOCK_ROOT,true);
- }
- /**
- * 关闭ZK链接
- */
- protected void releaseConnection() throws InterruptedException {
- if (zkClient != null) {
- zkClient.close();
- }
- }
-
- }
总结
网上有不少文章,你们的方法大多数都是建立一个root根节点,每个trylock的客户端都会在root下建立一个 EPHEMERAL_SEQUENTIAL 的子节点,同时设置root的child 变动watcher(为了不羊群效应,能够只添加前一个节点的变动通知) .若是建立的节点的序号是最小,则获取到锁,不然继续等待root的child 变动