基于ZooKeeper的分布式锁

1、简介java

  锁的概念,在Java平常开发和面试中,都是个很重要的知识点。锁能很好的控制生产数据的安全性,好比商品的数量超卖问题等。传统的作法中,能够直接利用数据库锁(行锁或者表锁)来进行数据访问控制。随着请求量逐步变多的状况下,将压力怼到数据库上会对其性能产生极大影响。这时候,单体应用中能够利用JVM锁,在程序层面进行访问的控制,将压力前移,对数据库友好。当请求量再进一步变多,这时候通常会考虑集群分布式去处理,不断的加机器来抗压。这时候,JVM锁就不能很好的控制压力了,同一时刻仍是会有大量请求怼到数据库上,这时就须要提高为分布式锁去控制了,将压力继续停留在程序层面。node

  Java的面向接口编程,能够很好很快的去切换实现而不须要动业务代码部分。下面,基于Lock接口去使用锁。git

2、JVM锁github

  基于ReentrantLock实现锁控制,业务控制层service部分代码以下,用 lock 锁去控制并发访问面试

package com.cfang.service;

import java.sql.Time;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

import com.cfang.dao.ProductDao;

import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
@Scope("prototype")
public class ProductWithLockService {
    
    private Lock lock = new ReentrantLock();

    @Autowired
    private ProductDao productDao;
    
    @Transactional
    public boolean buy(String userName, String productname, int number) {
        boolean result = false;
        try {
            lock.lock();
//            TimeUnit.SECONDS.sleep(1);
            log.info("用户{}欲购买{}个{}",  userName, number, productname);
            int stock = productDao.getStock(productname);
            log.info("{} 查询数量{}...", userName, stock);
            if(stock < number) {
                log.warn("库存不足...");
                return false;
            }
            result = productDao.buy(userName, productname, number);
        } catch (Exception e) {
            
        } finally {
            log.info("{} 释放锁...", userName);
            lock.unlock();
        }
        log.info("{}购买结果,{}",userName,  result);
        return result;
    }
}

  在单体应用中,这样子使用是能够的,可是当应用部署多套的时候,那么,就不能很好的保障并发控制了,同一时刻的请求可能会大量打到数据库上。因此,这就引入下面的分布式锁去控制了。spring

3、基于ZooKeeper的分布式锁sql

    首先,锁获取释放的工具类:数据库

package com.cfang.zkLockUtil;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.commons.lang3.StringUtils;

import com.cfang.zkClient.MyZkSerializer;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ZkLockUtil implements Lock{
    
    private String znode;
    private ZkClient zkClient;
    
    public ZkLockUtil(String znode) {
        if(StringUtils.isBlank(znode)) {
            throw new IllegalArgumentException("锁节点znode不能为空字符串");
        }
        this.znode = znode;
        this.zkClient = new ZkClient("111.231.51.200:2181,111.231.51.200:2182,111.231.51.200:2183");
        this.zkClient.setZkSerializer(new MyZkSerializer());
    }

    @Override
    public void lock() {
        if(!tryLock()) { //抢锁失败
            // 阻塞等待锁节点的释放
            waitLock();
            //递归调用,从新尝试去抢占锁
            lock();
        }
    }
    
    private void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);
        // 注册监听znode锁节点变化,当删除的时候,说明锁被释放
        IZkDataListener listener = new IZkDataListener() {
            
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                log.info("znode节点被删除,锁释放...");
                latch.countDown();
            }
            
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }
        };
        this.zkClient.subscribeDataChanges(this.znode, listener);
        try {
            // 阻塞等待锁znode节点的删除释放
            if(this.zkClient.exists(znode)) {
                latch.await();
            }
        } catch (Exception e) {
        }
        //取消znode节点监听
        this.zkClient.unsubscribeDataChanges(this.znode, listener);
    }
    
    @Override
    public boolean tryLock() {
        boolean result = false;
        try {
            this.zkClient.createEphemeral(znode); //建立临时节点
            result = true;
        } catch (ZkNodeExistsException e) {
            log.warn("锁节点znode已存在,抢占失败...");
            result = false;
        } catch (Exception e) {
            log.warn("建立锁节点znode异常,{}...", e.getMessage());
        }
        return result;
    }

    @Override
    public void unlock() {
        zkClient.delete(znode);
    }
    
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }
    
    @Override
    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub
        
    }

    @Override
    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }

}

  业务控制service中,就是将基本的JVM锁的service中,Lock的实现更换便可:apache

private Lock lock = new ZkLockUtil("/p1node");

  当程序运行中,全部的请求会去争抢建立zk节点,谁建立成功,则就得到锁资源,继续执行业务代码。其余全部线程基于递归等待,等待zk节点的删除,而后再去尝试争抢建立。达到控制并发的目的。编程

可是,这种可是有个很差的地方,也就是,当一个锁释放后,全部的线程都会一会儿全去争抢,每次都是轮回这样哄抢的过程,会有必定的压力,也没必要如此。因此,下面基于zk永久节点下临时顺序节点作点改善,每一个线程节点,只须要关注前面一个节点变化便可,不须要形成哄抢事件。

4、ZooKeeper的分布式锁提升版

   锁获取释放的工具类:

package com.cfang.zkLockUtil;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.lang3.StringUtils;

import com.cfang.zkClient.MyZkSerializer;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ZKLockImproveUtil implements Lock{
    
    private String znode;
    private ZkClient zkClient;
    private ThreadLocal<String> currentNode = new ThreadLocal<String>(); //当前节点
    private ThreadLocal<String> beforeNode = new ThreadLocal<String>();  //前一个节点
    
    public ZKLockImproveUtil(String znode) {
        if(StringUtils.isBlank(znode)) {
            throw new IllegalArgumentException("锁节点znode不能为空字符串");
        }
        this.znode = znode;
        this.zkClient = new ZkClient("111.231.51.200:2181,111.231.51.200:2182,111.231.51.200:2183");
        this.zkClient.setZkSerializer(new MyZkSerializer());
        
        try {
            if(!this.zkClient.exists(znode)) {
                this.zkClient.createPersistent(znode, true); // true是否建立层级目录
            }
        } catch (Exception e) {
        }
    }

    @Override
    public void lock() {
        if(!tryLock()) {
            waitLock();
            lock();
        }
    }
    
    private void waitLock() {
        CountDownLatch latch = new CountDownLatch(1);
        IZkDataListener listener = new IZkDataListener() {
            
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                log.info("{}节点删除,锁释放...", dataPath);
                latch.countDown();
            }
            
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }
        };
        
        this.zkClient.subscribeDataChanges(beforeNode.get(), listener);
        
        try {
            if(this.zkClient.exists(beforeNode.get())) {
                latch.await();
            }
        } catch (Exception e) {
        }
        
        this.zkClient.unsubscribeDataChanges(beforeNode.get(), listener);
    }

    @Override
    public boolean tryLock() {
        boolean result = false;
        // 建立顺序临时节点
        if(null == currentNode.get() || !this.zkClient.exists(currentNode.get())) {
            String enode = this.zkClient.createEphemeralSequential(znode + "/", "zk-locked");
            this.currentNode.set(enode);
        }
        // 获取znode节点下的全部子节点
        List<String> list = this.zkClient.getChildren(znode);
        Collections.sort(list);
        
        /**
         * 若是当前节点是第一个的话,则是为获取锁,继续执行
         * 不是头结点的话,则去查询其前面一个节点,而后准备监听前一个节点的删除释放操做
         */
        
        if(currentNode.get().equals(this.znode + "/" + list.get(0))) {
            log.info("{}节点为头结点,得到锁...", currentNode.get());
            result = true;
        } else {
            int currentIndex = list.indexOf(currentNode.get().substring(this.znode.length() + 1));
            String bnode = this.znode + "/" + list.get(currentIndex - 1);
            this.beforeNode.set(bnode);
        }
        return result;
    }

    @Override
    public void unlock() {
        if(null != this.currentNode) {
            this.zkClient.delete(currentNode.get());
            this.currentNode.set(null);
        }
    }
    
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        // TODO Auto-generated method stub
        return false;
    }
    
    @Override
    public void lockInterruptibly() throws InterruptedException {
        // TODO Auto-generated method stub
    }

    @Override
    public Condition newCondition() {
        // TODO Auto-generated method stub
        return null;
    }

    
}

  service中更换实现:

private Lock lock = new ZKLockImproveUtil("/pnode");

5、小结

  主要是学习测试使用,并未考虑到生产实际的问题,好比 若是业务处理中假死状态,致使zk不释放锁,那么就会致使死锁问题(能够对锁节点来个有效期处理)。

  上述为部分代码片断,总体工程能够在github上获取,地址:https://github.com/qiuhan00/zkLock

相关文章
相关标签/搜索