分布式服务注册中心zookeeper之分布式锁

一、分布式锁做用及其原理

  • 为何要有分布式锁 ?
    分布式服务中,若是各个服务节点须要去竞争资源,没办法使用单机多线程中JDK自带的锁,故此时须要分布式锁来协调。
  • 企业中有哪些常见的手段来实现分布式锁 ?
    zookeeper、redis、memcache
  • 分布式锁的原理
    zookeeper去建立相应的节点,建立成功,则表示获取到相应的锁,建立失败,则表示获取锁失败 redis、memcache:对应的去设置一个值作为锁的一标志,每次获取锁的时候,判断对应的值是否存在,存在则没法获取,不存在,则设置相应的值,表示获取到锁。(redis 使用setnx,memcache使用add)

二、基于zk实现分布式锁的多种方式

  • 注意事项: 建立节点的时候,必定要建立临时节点,避免应用获取到锁后,宕机,致使锁一直被持有。
  • 具体方式如图:
    在这里插入图片描述

三、实战–基于zk原生的api实现分布式锁

  • 添加zk依赖
<dependency>    
	  <groupId>org.apache.zookeeper</groupId>   
	  <artifactId>zookeeper</artifactId>   
	  <version>3.4.13</version> 
  </dependency>
  • 注意点
    原生api不支持递归建立节点
    若是是单一应用,尽可能不要使用分布式锁

demo:
ZkLock.javajava

package com.demo.lock;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
* 基于zk实现分布式锁
*/
public class ZkLock {

   private ZooKeeper zooKeeper;

   private static CountDownLatch countDownLatch = new CountDownLatch(1);

   private ZkLock() {
       try {
           zooKeeper = new ZooKeeper("192.1.1.101:2181,192.1.1.102:2181,192.1.1.103:2181", 5000, new ZkWatcher());
           System.out.println(zooKeeper.getState());
           try {
               countDownLatch.await();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           System.out.println("与zk创建链接=====>"+zooKeeper.getState());

       } catch (IOException e) {
           e.printStackTrace();
       }
   }

   public static ZkLock getInstance() {
       return Singleton.getInstance();
   }

   private class ZkWatcher implements Watcher {
       @Override
       public void process(WatchedEvent event) {
           System.out.println("接收到监听事件=====》"+event);
           if (Event.KeeperState.SyncConnected == event.getState()) {
               countDownLatch.countDown();
           }
       }
   }


   public void lock(Integer id) {
       String path = "/xdclass-product-lock-" + id;
       //建立临时节点,若是建立成功的话,就表示获取锁,若是失败,则不断尝试
       try {
           zooKeeper.create(path,"".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
           System.out.println("成功获取到锁");
       } catch (Exception e) {
           while (true) {
               try {
                   Thread.sleep(500L);
               } catch (InterruptedException e1) {
                   e1.printStackTrace();
               }
               try {
                   zooKeeper.create(path,"".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
               } catch (Exception e1) {
                   continue;
               }
               break;
           }
       }
   }

   /**
    * 释放锁,直接删除zk节点
    * @param id
    */
   public void unLock(Integer id) {
       String path = "/xdclass-product-lock-" + id;
       try {
           zooKeeper.delete(path,-1);
       } catch (InterruptedException e) {
           e.printStackTrace();
       } catch (KeeperException e) {
           e.printStackTrace();
       }
   }

   private static class Singleton {

       private static ZkLock instance;
       static {
           instance = new ZkLock();
       }

       private static ZkLock getInstance() {
           return instance;
       }

   }

}

UnSafeThread.javaweb

package com.demo.lock;
import java.util.concurrent.CountDownLatch;

/**
 * 线程不安全操做代码实例
 */
public class UnSafeThread {

    private static int num = 0;

    private static CountDownLatch countDownLatch = new CountDownLatch(10);
    private static ZkLock lock = ZkLock.getInstance();

    /**y
     * 每次调用对num进行++操做
     */
    public static void inCreate() {
        lock.lock(1);
        num++;
        System.out.println(num);
        lock.unLock(1);
    }

    public static void test() {
        System.out.println(Thread.currentThread().getName());
    }


    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    inCreate();
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //每一个线程执行完成以后,调用countdownLatch
                countDownLatch.countDown();
            }).start();
        }

        while (true) {
            if (countDownLatch.getCount() == 0) {
                System.out.println(num);
                break;
            }
        }
    }
}