分布式的几件小事(十)分布式锁是啥

1.什么是分布式锁

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,经常须要协调他们的动做。若是不一样的系统或是同一个系统的不一样主机之间共享了一个或一组资源,那么访问这些资源的时候,每每须要互斥来防止彼此干扰来保证一致性,在这种状况下,便须要使用到分布式锁。
分布式系统中,经常须要协调他们的动做。若是不一样的系统或是同一个系统的不一样主机之间共享了一个或一组资源,那么访问这些资源的时候,每每须要互斥来防止彼此干扰来保证一致性,这个时候,便须要使用到分布式锁。html

2.实现方式

1.基于数据库实现

①基于数据库表实现。
实现方式 : 这是最简单的方法,能够在数据库中创建一张表,而后经过操做该表中的数据来实现。
当咱们想要锁住某个方法或资源的时候,就在表中增长一条记录,想要释放锁的时候就删除这条记录。
好比建立以下的表结构:node

CREATE TABLE myLock(
    id INT(11) NOT NULL AUTO_INCREMENT COMMENT '自增主键id',
    method_name VARCHAR(256) NOT NULL DEFAULT '' COMMENT '要加锁的方法名',
    create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '数据建立时间',
    PRIMARY key(id),
    UNIQUE key `idx_method_name`(method_name)
) ENGINE=INNODB DEFAULT CHARSET=utf8 COMMENT='分布式锁';

想要锁住某个方法的时间,就去插入一条数据。mysql

insert into myLock(method_name) values ("method_name")

方法执行完毕以后,就删除这条数据,释放锁。redis

delete from myLock where method_name = "method_name"

存在的问题
A: 强依赖数据库,若是数据是单点,一旦数据库挂掉就不可用。
B: 没有失效时间,一旦解锁失败,或者加锁服务挂掉,锁将一直存在。
C: 锁是非阻塞的,引入插入若是失败叫报错了,不会等待去获取锁,想要从新获取锁就要去从新触发该操做。
D: 锁是非重入的,同一个线程在释放锁以前没法再次获取锁。算法

解决方案
A: 数据库使用主从架构,进行数据的主从双向同步,主库挂掉里面切换从库。
B: 能够在系统里面作一个定时任务,每隔必定时间就去清理一下超时的锁。
C: 在进行获取锁的时候,在执行insert操做时搞一个while循环不断去尝试,知道成功。
D: 能够在表中再增长信息,好比当前得到锁的机器信息和线程信息,判断若是是同一个,直接获取锁。sql

②基于数据库排它锁实现
仍是使用刚才建立的那张表,可使用数据库的排它锁来实现,基于InnoDB引擎,能够在获取锁时关闭自动提交,而后在查询语句中加上 for update来获取数据库的排它锁(注意:where后面的字段必定要加索引,只有这样才会使用行锁),当获取排它锁成功后,能够进行操做,操做完毕提交事物来释放该锁。数据库

优点
A: 自然阻塞,在获取排它锁失败时会一直阻塞,知道成功。
B: 服务宕机以后会释放锁。
缺点
A: 数据库单点问题。
B: 锁的可重入问题。
C: mysql可能会进行查询优化,可能不使用索引,而去全表扫描,这样将加表锁。
D: 若是一个排它锁一直不去提交,就会占用数据库的链接,一旦变多就有可能撑爆数据库链接池。缓存

2.基于redis实现。

①最普通的锁实现
在redis里面经过下面的指令建立一个key,网络

// NX   表示只有key不存的时候,才会成功,存在就失败     
// PX 30000  表示30秒后锁自动释放
SET mylock 随机值 NX PX 30000

若是建立成功就表示获取到了锁,不然就表示没有获取到锁。
释放锁就是删除key,可是通常能够用lua脚本删除,这样能够去判断value同样才会删除。session

为啥要用随机值呢?由于若是某个客户端获取到了锁,可是阻塞了很长时间才执行完,此时可能已经自动释放锁了,此时可能别的客户端已经获取到了这个锁,要是你这个时候直接删除key的话就会删除原本不属于你本身的锁,因此得用随机值加上面的lua脚原本释放锁。

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

存在的问题
A: redis单点问题,若是redis挂了那么就会没法获取锁。
B: redis就算是主从架构也会出现问题,好比key还没来的级复制给从节点,主节点就挂了。

②RedLock算法
获取锁原理
假设有一个redis cluster集群,有5个redis master实例,而后执行以下步骤去获取锁。
A: 获取当前时间戳,单位是毫秒。
B: 轮流尝试在每一个master节点上面建立锁,过程比较短,通常就十几毫秒。
C: 尝试在大多数节点上创建锁,好比5个节点就要求最少3个节点建立成功。
D: 客户端计算创建好锁的时间,若是创建锁的时机小于超时时间,就算创建成功了。
E: 若是所创建失败了,那么就依次去删除已经创建成功的锁。
F: 若是别人已经创建了一把分布式错,本身就不断轮询去尝试获取锁。

存在问题
A: 获取锁失败时须要不断去重试,比较消耗性能。
B: 若是redis获取锁的客户端挂掉了,须要等待超时才会释放锁。
C: 上锁须要遍历,计算时间等,过程比较复杂。
D: 不是十分的可靠,可参考这两篇文章:
How to do distributed locking
[Is Redlock safe?](http://antirez.com/news/101)

3.使用zookeeper实现

①基于零时节点实现

zk分布式锁,其实能够作的比较简单,就是某个节点尝试建立临时znode,此时建立成功了就获取了这个锁;这个时候别的客户端来建立锁会失败,只能注册个监听器监听这个锁。释放锁就是删除这个znode,一旦释放掉就会通知客户端,而后有一个等待着的客户端就能够再次从新加锁。
简易代码以下:

/**
 * ZooKeeperSession
 * @author Administrator
 *
 */
public class ZooKeeperSession {
    
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    
    private ZooKeeper zookeeper;
    private CountDownLatch latch;

    public ZooKeeperSession() {
        try {
            this.zookeeper = new ZooKeeper(
                    "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 
                    50000, 
                    new ZooKeeperWatcher());            
            try {
                connectedSemaphore.await();
            } catch(InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("ZooKeeper session established......");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 获取分布式锁
     * @param productId
     */
    public Boolean acquireDistributedLock(Long productId) {
        String path = "/product-lock-" + productId;
    
        try {
            zookeeper.create(path, "".getBytes(), 
                    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
             return true;
        } catch (Exception e) {
              while(true) {
                 try {
                         Stat stat = zk.exists(path, true); // 至关因而给node注册一个监听器,去看看这个监听器是否存在
                        if(stat != null) {
                              this.latch = new                                       
                              CountDownLatch(1);
                              this.latch.await(waitTime, TimeUnit.MILLISECONDS);
                             this.latch = null;
                        }
                       zookeeper.create(path, "".getBytes(), 
                        Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                        return true;
                     } catch(Exception e) {
                          continue;
                     }
            }

// 很不优雅,我呢就是给你们来演示这么一个思路,比较通用的.
      }
    return true;
    }
    
    /**
     * 释放掉一个分布式锁
     * @param productId
     */
    public void releaseDistributedLock(Long productId) {
        String path = "/product-lock-" + productId;
        try {
            zookeeper.delete(path, -1); 
            System.out.println("release the lock for product[id=" + productId + "]......");  
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 创建zk session的watcher
     * @author Administrator
     *
     */
    private class ZooKeeperWatcher implements Watcher {

        public void process(WatchedEvent event) {
            System.out.println("Receive watched event: " + event.getState());

            if(KeeperState.SyncConnected == event.getState()) {
                connectedSemaphore.countDown();
            } 

            if(this.latch != null) {  
                  this.latch.countDown();  
             }
        }
        
    }
    
    /**
     * 封装单例的静态内部类
     * @author Administrator
     *
     */
    private static class Singleton {
        
        private static ZooKeeperSession instance;
        
        static {
            instance = new ZooKeeperSession();
        }
        
        public static ZooKeeperSession getInstance() {
            return instance;
        }
        
    }
    
    /**
     * 获取单例
     * @return
     */
    public static ZooKeeperSession getInstance() {
        return Singleton.getInstance();
    }
    
    /**
     * 初始化单例的便捷方法
     */
    public static void init() {
        getInstance();
    }
    
}

存在的问题
A: 须要动态的去建立、销毁节点,性能没有基于redis的高。
B: 使用Zookeeper也有可能带来并发问题,只是并不常见而已。考虑这样的状况,因为网络抖动,客户端可ZK集群的session链接断了,那么zk觉得客户端挂了,就会删除临时节点,这时候其余客户端就能够获取到分布式锁了。就可能产生并发问题。这个问题不常见是由于zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试。屡次重试以后还不行的话才会删除临时节点。
C: 尝试获取锁,锁销毁以后须要获取锁的线程要去竞争锁,复杂。

②就行零时顺序节点实现。

相好比前面的方式来讲。零时顺序节点就是在建立零时节点的时候再也不是建立失败,而是给每一个节点添加了一个编号,每次只有编号最小的能够获取到锁,非最小的节点去监听离他最近的那个次小的节点,实现获取锁的有序性。这样每一个节点只须要去监听一个节点,比他小的那个节点释放锁他会成功获取锁。

简易代码实例:

public class ZooKeeperDistributedLock implements Watcher{
    
    private ZooKeeper zk;
    private String locksRoot= "/locks";
    private String productId;
    private String waitNode;
    private String lockNode;
    private CountDownLatch latch;
    private CountDownLatch connectedLatch = new CountDownLatch(1);
private int sessionTimeout = 30000; 

    public ZooKeeperDistributedLock(String productId){
        this.productId = productId;
         try {
       String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
            zk = new ZooKeeper(address, sessionTimeout, this);
            connectedLatch.await();
        } catch (IOException e) {
            throw new LockException(e);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }

    public void process(WatchedEvent event) {
        if(event.getState()==KeeperState.SyncConnected){
            connectedLatch.countDown();
            return;
        }

        if(this.latch != null) {  
            this.latch.countDown(); 
        }
    }

    public void acquireDistributedLock() {   
        try {
            if(this.tryLock()){
                return;
            }
            else{
                waitForLock(waitNode, sessionTimeout);
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        } 
    }

    public boolean tryLock() {
        try {
        // 传入进去的locksRoot + “/” + productId
        // 假设productId表明了一个商品id,好比说1
        // locksRoot = locks
        // /locks/10000000000,/locks/10000000001,/locks/10000000002
            lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
   
            // 看看刚建立的节点是否是最小的节点
        // locks:10000000000,10000000001,10000000002
            List<String> locks = zk.getChildren(locksRoot, false);
            Collections.sort(locks);
    
            if(lockNode.equals(locksRoot+"/"+ locks.get(0))){
                //若是是最小的节点,则表示取得锁
                return true;
            }
    
            //若是不是最小的节点,找到比本身小1的节点
            int previousLockIndex = -1;
            for(int i = 0; i < locks.size(); i++) {
                if(lockNode.equals(locksRoot + “/” + locks.get(i))) {
                     previousLockIndex = i - 1;
                    break;
                 }
            }
       
           this.waitNode = locks.get(previousLockIndex);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }
     
    private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
        if(stat != null){
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);                 this.latch = null;
        }
        return true;
   }

    public void unlock() {
        try {
        // 删除/locks/10000000000节点
        // 删除/locks/10000000001节点
            System.out.println("unlock " + lockNode);
            zk.delete(lockNode,-1);
            lockNode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}

3.比较

三种方案的比较
上面几种方式,哪一种方式都没法作到完美。就像CAP同样,在复杂性、可靠性、性能等方面没法同时知足,因此,根据不一样的应用场景选择最适合本身的才是王道。

从理解的难易程度角度(从低到高)
数据库 > 缓存 > Zookeeper

从实现的复杂性角度(从低到高)
Zookeeper >= 缓存 > 数据库

从性能角度(从高到低)
缓存 > Zookeeper >= 数据库

从可靠性角度(从高到低) Zookeeper > 缓存 > 数据库

相关文章
相关标签/搜索