Java分布式锁看这篇就够了!

什么是锁?

在单进程的系统中,当存在多个线程能够同时改变某个变量(可变共享变量)时,就须要对变量或代码块作同步,使其在修改这种变量时可以线性执行消除并发修改变量。
而同步的本质是经过锁来实现的。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么须要在某个地方作个标记,这个标记必须每一个线程都能看到,当标记不存在时能够设置该标记,其他后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记。这个标记能够理解为锁。
不一样地方实现锁的方式也不同,只要能知足全部线程都能看获得标记便可。如 Java 中 synchronize 是在对象头设置标记,Lock 接口的实现类基本上都只是某一个 volitile 修饰的 int 型变量其保证每一个线程都能拥有对该 int 的可见性和原子修改,linux 内核中也是利用互斥量或信号量等内存数据作标记。
除了利用内存数据作锁其实任何互斥的都能作锁(只考虑互斥状况),如流水表中流水号与时间结合作幂等校验能够看做是一个不会释放的锁,或者使用某个文件是否存在做为锁等。只须要知足在对标记进行修改能保证原子性和内存可见性便可。java

什么是分布式?

分布式的 CAP 理论告诉咱们:node

任何一个分布式系统都没法同时知足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时知足两项。mysql

目前不少大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。基于 CAP理论,不少系统在设计之初就要对这三者作出取舍。在互联网领域的绝大多数的场景中,都须要牺牲强一致性来换取系统的高可用性,系统每每只须要保证最终一致性。linux

分布式场景git

此处主要指集群模式下,多个相同服务同时开启.github

在许多的场景中,咱们为了保证数据的最终一致性,须要不少的技术方案来支持,好比分布式事务、分布式锁等。不少时候咱们须要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,经过 Java 提供的并发 API 咱们能够解决,可是在分布式环境下,就没有那么简单啦。redis

分布式与单机状况下最大的不一样在于其不是多线程而是多进程。
多线程因为能够共享堆内存,所以能够简单的采起内存做为标记存储位置。而进程之间甚至可能都不在同一台物理机上,所以须要将标记存储在一个全部进程都能看到的地方。
什么是分布式锁?
当在分布式模型下,数据只有一份(或有限制),此时须要利用锁的技术控制某一时刻修改数据的进程数。
与单机模式下的锁不只须要保证进程可见,还须要考虑进程与锁之间的网络问题。(我以为分布式状况下之因此问题变得复杂,主要就是须要考虑到网络的延时和不可靠。。。一个大坑)
分布式锁仍是能够将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存如 Redis、Memcache。至于利用数据库、文件等作锁与单机的实现是同样的,只要保证标记能互斥就行。算法

咱们须要怎样的分布式锁?

能够保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器-上的一个线程执行。
这把锁要是一把可重入锁(避免死锁)
这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)
这把锁最好是一把公平锁(根据业务需求考虑要不要这条)
有高可用的获取锁和释放锁功能
获取锁和释放锁的性能要好sql

基于数据库作分布式锁

基于乐观锁数据库

基于表主键惟一作分布式锁
思路:利用主键惟一的特性,若是有多个请求同时提交到数据库的话,数据库会保证只有一个操做能够成功,那么咱们就能够认为操做成功的那个线程得到了该方法的锁,当方法执行完毕以后,想要释放锁的话,删除这条数据库记录便可。

上面这种简单的实现有如下几个问题:

这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会致使业务系统不可用。
这把锁没有失效时间,一旦解锁操做失败,就会致使锁记录一直在数据库中,其余线程没法再得到到锁。
这把锁只能是非阻塞的,由于数据的 insert操做,一旦插入失败就会直接报错。没有得到锁的线程并不会进入排队队列,要想再次得到锁就要再次触发得到锁操做。
这把锁是非重入的,同一个线程在没有释放锁以前没法再次得到该锁。由于数据中数据已经存在了。
这把锁是非公平锁,全部等待锁的线程凭运气去争夺锁。
在 MySQL 数据库中采用主键冲突防重,在大并发状况下有可能会形成锁表现象。
固然,咱们也能够有其余方式解决上面的问题。
数据库是单点?搞两个数据库,数据以前双向同步,一旦挂掉快速切换到备库上。
没有失效时间?只要作一个定时任务,每隔必定时间把数据库中的超时数据清理一遍。
非阻塞的?搞一个 while 循环,直到 insert 成功再返回成功。
非重入的?在数据库表中加个字段,记录当前得到锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,若是当前机器的主机信息和线程信息在数据库能够查到的话,直接把锁分配给他就能够了。
非公平的?再建一张中间表,将等待锁的线程全记录下来,并根据建立时间排序,只有最早建立的容许获取锁。
比较好的办法是在程序中生产主键进行防重。

基于表字段版本号作分布式锁

这个策略源于 mysql 的 mvcc 机制,使用这个策略其实自己没有什么问题,惟一的问题就是对数据表侵入较大,咱们要为每一个表设计一个版本号字段,而后写一条判断 sql 每次进行判断,增长了数据库操做的次数,在高并发的要求下,对数据库链接的开销也是没法忍受的。

基于悲观锁

基于数据库排他锁作分布式锁

在查询语句后面增长for update,数据库会在查询过程当中给数据库表增长排他锁 (注意: InnoDB 引擎在加锁的时候,只有经过索引进行检索的时候才会使用行级锁,不然会使用表级锁。这里咱们但愿使用行级锁,就要给要执行的方法字段名添加索引,值得注意的是,这个索引必定要建立成惟一索引,不然会出现多个重载方法之间没法同时被访问的问题。重载方法的话建议把参数类型也加上。)。当某条记录被加上排他锁以后,其余线程没法再在该行记录上增长排他锁。

咱们能够认为得到排他锁的线程便可得到分布式锁,当获取到锁以后,能够执行方法的业务逻辑,执行完方法以后,经过connection.commit()操做来释放锁。

这种方法能够有效的解决上面提到的没法释放锁和阻塞锁的问题。

阻塞锁? for update语句会在执行成功后当即返回,在执行失败时一直处于阻塞状态,直到成功。
锁定以后服务宕机,没法释放?使用这种方式,服务宕机以后数据库会本身把锁释放掉。
可是仍是没法直接解决数据库单点和可重入问题。

这里还可能存在另一个问题,虽然咱们对方法字段名使用了惟一索引,而且显示使用 for update 来使用行级锁。可是,MySQL 会对查询进行优化,即使在条件中使用了索引字段,可是否使用索引来检索数据是由 MySQL 经过判断不一样执行计划的代价来决定的,若是 MySQL 认为全表扫效率更高,好比对一些很小的表,它就不会使用索引,这种状况下 InnoDB 将使用表锁,而不是行锁。若是发生这种状况就悲剧了。。。

还有一个问题,就是咱们要使用排他锁来进行分布式锁的 lock,那么一个排他锁长时间不提交,就会占用数据库链接。一旦相似的链接变得多了,就可能把数据库链接池撑爆。

优缺点

优势:简单,易于理解

缺点:会有各类各样的问题(操做数据库须要必定的开销,使用数据库的行级锁并不必定靠谱,性能不靠谱)

基于 Redis 作分布式锁

基于 REDIS 的 SETNX()、EXPIRE() 方法作分布式锁

setnx()

setnx 的含义就是 SET if Not Exists,其主要有两个参数 setnx(key, value)。该方法是原子的,若是 key 不存在,则设置当前 key 成功,返回 1;若是当前 key 已经存在,则设置当前 key 失败,返回 0。

expire()

expire 设置过时时间,要注意的是 setnx 命令不能设置 key 的超时时间,只能经过 expire() 来对 key 设置。

使用步骤

一、setnx(lockkey, 1) 若是返回 0,则说明占位失败;若是返回 1,则说明占位成功

二、expire() 命令对 lockkey 设置超时时间,为的是避免死锁问题。

三、执行完业务代码后,能够经过 delete 命令删除 key。

这个方案实际上是能够解决平常工做中的需求的,但从技术方案的探讨上来讲,可能还有一些能够完善的地方。好比,若是在第一步 setnx 执行成功后,在 expire() 命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题,因此若是要对其进行完善的话,可使用 redis 的 setnx()、get() 和 getset() 方法来实现分布式锁。

基于 REDIS 的 SETNX()、GET()、GETSET()方法作分布式锁

这个方案的背景主要是在 setnx() 和 expire() 的方案上针对可能存在的死锁问题,作了一些优化。

getset()

这个命令主要有两个参数 getset(key,newValue)。该方法是原子的,对 key 设置 newValue 这个值,而且返回 key 原来的旧值。假设 key 原来是不存在的,那么屡次执行这个命令,会出现下边的效果:

getset(key, “value1”) 返回 null 此时 key 的值会被设置为 value1
getset(key, “value2”) 返回 value1 此时 key 的值会被设置为 value2
依次类推!
使用步骤

setnx(lockkey, 当前时间+过时超时时间),若是返回 1,则获取锁成功;若是返回 0 则没有获取到锁,转向 2。
get(lockkey) 获取值 oldExpireTime ,并将这个 value 值与当前的系统时间进行比较,若是小于当前系统时间,则认为这个锁已经超时,能够容许别的请求从新获取,转向 3。
计算 newExpireTime = 当前时间+过时超时时间,而后 getset(lockkey, newExpireTime) 会返回当前 lockkey 的值currentExpireTime。
判断 currentExpireTime 与 oldExpireTime 是否相等,若是相等,说明当前 getset 设置成功,获取到了锁。若是不相等,说明这个锁又被别的请求获取走了,那么当前请求能够直接返回失败,或者继续重试。
在获取到锁以后,当前线程能够开始本身的业务处理,当处理完毕后,比较本身的处理时间和对于锁设置的超时时间,若是小于锁设置的超时时间,则直接执行 delete 释放锁;若是大于锁设置的超时时间,则不须要再锁进行处理。

import cn.com.tpig.cache.redis.RedisService;
import cn.com.tpig.utils.SpringUtils;

//redis分布式锁
public final class RedisLockUtil {

private static final int defaultExpire = 60;

private RedisLockUtil() {
    //
}

/**
 * 加锁
 * @param key redis key
 * @param expire 过时时间,单位秒
 * @return true:加锁成功,false,加锁失败
 */
public static boolean lock(String key, int expire) {

    RedisService redisService = SpringUtils.getBean(RedisService.class);
    long status = redisService.setnx(key, "1");

    if(status == 1) {
        redisService.expire(key, expire);
        return true;
    }

    return false;
}

public static boolean lock(String key) {
    return lock2(key, defaultExpire);
}

/**
 * 加锁
 * @param key redis key
 * @param expire 过时时间,单位秒
 * @return true:加锁成功,false,加锁失败
 */
public static boolean lock2(String key, int expire) {

    RedisService redisService = SpringUtils.getBean(RedisService.class);

    long value = System.currentTimeMillis() + expire;
    long status = redisService.setnx(key, String.valueOf(value));

    if(status == 1) {
        return true;
    }
    long oldExpireTime = Long.parseLong(redisService.get(key, "0"));
    if(oldExpireTime < System.currentTimeMillis()) {
        //超时
        long newExpireTime = System.currentTimeMillis() + expire;
        long currentExpireTime = Long.parseLong(redisService.getSet(key, String.valueOf(newExpireTime)));
        if(currentExpireTime == oldExpireTime) {
            return true;
        }
    }
    return false;
}

public static void unLock1(String key) {
    RedisService redisService = SpringUtils.getBean(RedisService.class);
    redisService.del(key);
}

public static void unLock2(String key) {    
    RedisService redisService = SpringUtils.getBean(RedisService.class);    
    long oldExpireTime = Long.parseLong(redisService.get(key, "0"));   
    if(oldExpireTime > System.currentTimeMillis()) {        
        redisService.del(key);    
    }

}
}
public void drawRedPacket(long userId) {
String key = "draw.redpacket.userid:" + userId;

boolean lock = RedisLockUtil.lock2(key, 60);
if(lock) {
    try {
        //领取操做
    } finally {
        //释放锁
        RedisLockUtil.unLock(key);
    }
} else {
    new RuntimeException("重复领取奖励");
}

}
基于 REDLOCK 作分布式锁

Redlock 是 Redis 的做者 antirez 给出的集群模式的 Redis 分布式锁,它基于 N 个彻底独立的 Redis 节点(一般状况下 N 能够设置成 5)。

算法的步骤以下:

一、客户端获取当前时间,以毫秒为单位。
二、客户端尝试获取 N 个节点的锁,(每一个节点获取锁的方式和前面说的缓存锁同样),N 个节点以相同的 key 和 value 获取锁。客户端须要设置接口访问超时,接口超时时间须要远远小于锁超时时间,好比锁自动释放的时间是 10s,那么接口超时大概设置 5-50ms。这样能够在有 redis 节点宕机后,访问该节点时能尽快超时,而减少锁的正常使用。
三、客户端计算在得到锁的时候花费了多少时间,方法是用当前时间减去在步骤一获取的时间,只有客户端得到了超过 3 个节点的锁,并且获取锁的时间小于锁的超时时间,客户端才得到了分布式锁。
四、客户端获取的锁的时间为设置的锁超时时间减去步骤三计算出的获取锁花费时间。
五、若是客户端获取锁失败了,客户端会依次删除全部的锁。
使用 Redlock 算法,能够保证在挂掉最多 2 个节点的时候,分布式锁服务仍然能工做,这相比以前的数据库锁和缓存锁大大提升了可用性,因为 redis 的高效性能,分布式缓存锁性能并不比数据库锁差。

可是,有一位分布式的专家写了一篇文章《How to do distributed locking》,质疑 Redlock 的正确性。

https://mp.weixin.qq.com/s/1bPLk_VZhZ0QYNZS8LkviA

https://blog.csdn.net/jek123456/article/details/72954106

优缺点
优势: 性能高

缺点:

失效时间设置多长时间为好?如何设置的失效时间过短,方法没等执行完,锁就自动释放了,那么就会产生并发问题。若是设置的时间太长,其余获取锁的线程就可能要平白的多等一段时间。

基于 REDISSON 作分布式锁
redisson 是 redis 官方的分布式锁组件。GitHub 地址:https://github.com/redisson/redisson

上面的这个问题 ——> 失效时间设置多长时间为好?这个问题在 redisson 的作法是:每得到一个锁时,只设置一个很短的超时时间,同时起一个线程在每次快要到超时时间时去刷新锁的超时时间。在释放锁的同时结束这个线程。

基于 ZooKeeper 作分布式锁
ZOOKEEPER 锁相关基础知识
zk 通常由多个节点构成(单数),采用 zab 一致性协议。所以能够将 zk 当作一个单点结构,对其修改数据其内部自动将全部节点数据进行修改然后才提供查询服务。
zk 的数据以目录树的形式,每一个目录称为 znode, znode 中可存储数据(通常不超过 1M),还能够在其中增长子节点。
子节点有三种类型。序列化节点,每在该节点下增长一个节点自动给该节点的名称上自增。临时节点,一旦建立这个 znode 的客户端与服务器失去联系,这个 znode 也将自动删除。最后就是普通节点。
Watch 机制,client 能够监控每一个节点的变化,当产生变化会给 client 产生一个事件。
ZK 基本锁
原理:利用临时节点与 watch 机制。每一个锁占用一个普通节点 /lock,当须要获取锁时在 /lock 目录下建立一个临时节点,建立成功则表示获取锁成功,失败则 watch/lock 节点,有删除操做后再去争锁。临时节点好处在于当进程挂掉后能自动上锁的节点自动删除即取消锁。
缺点:全部取锁失败的进程都监听父节点,很容易发生羊群效应,即当释放锁后全部等待进程一块儿来建立节点,并发量很大。
ZK 锁优化
原理:上锁改成建立临时有序节点,每一个上锁的节点均能建立节点成功,只是其序号不一样。只有序号最小的能够拥有锁,若是这个节点序号不是最小的则 watch 序号比自己小的前一个节点 (公平锁)。
步骤:

1.在 /lock 节点下建立一个有序临时节点 (EPHEMERAL_SEQUENTIAL)。
2.判断建立的节点序号是否最小,若是是最小则获取锁成功。不是则取锁失败,而后 watch 序号比自己小的前一个节点。
3.当取锁失败,设置 watch 后则等待 watch 事件到来后,再次判断是否序号最小。
4.取锁成功则执行代码,最后释放锁(删除该节点)。
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class DistributedLock implements Lock, Watcher{
private ZooKeeper zk;
private String root = "/locks";//根
private String lockName;//竞争资源的标志
private String waitNode;//等待前一个锁
private String myZnode;//当前锁
private CountDownLatch latch;//计数器
private int sessionTimeout = 30000;
private List<Exception> exception = new ArrayList<Exception>();

/**
 * 建立分布式锁,使用前请确认config配置的zookeeper服务可用
 * @param config 127.0.0.1:2181
 * @param lockName 竞争资源标志,lockName中不能包含单词lock
 */
public DistributedLock(String config, String lockName){
    this.lockName = lockName;
    // 建立一个与服务器的链接
    try {
        zk = new ZooKeeper(config, sessionTimeout, this);
        Stat stat = zk.exists(root, false);
        if(stat == null){
            // 建立根节点
            zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        }
    } catch (IOException e) {
        exception.add(e);
    } catch (KeeperException e) {
        exception.add(e);
    } catch (InterruptedException e) {
        exception.add(e);
    }
}

/**
 * zookeeper节点的监视器
 */
public void process(WatchedEvent event) {
    if(this.latch != null) {
        this.latch.countDown();
    }
}

public void lock() {
    if(exception.size() > 0){
        throw new LockException(exception.get(0));
    }
    try {
        if(this.tryLock()){
            System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
            return;
        }
        else{
            waitForLock(waitNode, sessionTimeout);//等待锁
        }
    } catch (KeeperException e) {
        throw new LockException(e);
    } catch (InterruptedException e) {
        throw new LockException(e);
    }
}

public boolean tryLock() {
    try {
        String splitStr = "_lock_";
        if(lockName.contains(splitStr))
            throw new LockException("lockName can not contains \\u000B");
        //建立临时子节点
        myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(myZnode + " is created ");
        //取出全部子节点
        List<String> subNodes = zk.getChildren(root, false);
        //取出全部lockName的锁
        List<String> lockObjNodes = new ArrayList<String>();
        for (String node : subNodes) {
            String _node = node.split(splitStr)[0];
            if(_node.equals(lockName)){
                lockObjNodes.add(node);
            }
        }
        Collections.sort(lockObjNodes);
        System.out.println(myZnode + "==" + lockObjNodes.get(0));
        if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
            //若是是最小的节点,则表示取得锁
            return true;
        }
        //若是不是最小的节点,找到比本身小1的节点
        String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
        waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
    } catch (KeeperException e) {
        throw new LockException(e);
    } catch (InterruptedException e) {
        throw new LockException(e);
    }
    return false;
}

public boolean tryLock(long time, TimeUnit unit) {
    try {
        if(this.tryLock()){
            return true;
        }
        return waitForLock(waitNode,time);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return false;
}

private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
    Stat stat = zk.exists(root + "/" + lower,true);
    //判断比本身小一个数的节点是否存在,若是不存在则无需等待锁,同时注册监听
    if(stat != null){
        System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
        this.latch = new CountDownLatch(1);
        this.latch.await(waitTime, TimeUnit.MILLISECONDS);
        this.latch = null;
    }
    return true;
}

public void unlock() {
    try {
        System.out.println("unlock " + myZnode);
        zk.delete(myZnode,-1);
        myZnode = null;
        zk.close();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (KeeperException e) {
        e.printStackTrace();
    }
}

public void lockInterruptibly() throws InterruptedException {
    this.lock();
}

public Condition newCondition() {
    return null;
}

public class LockException extends RuntimeException {
    private static final long serialVersionUID = 1L;
    public LockException(String e){
        super(e);
    }
    public LockException(Exception e){
        super(e);
    }
}

}
优缺点
优势:
有效的解决单点问题,不可重入问题,非阻塞问题以及锁没法释放的问题。实现起来较为简单。

缺点:
性能上可能并无缓存服务那么高,由于每次在建立锁和释放锁的过程当中,都要动态建立、销毁临时节点来实现锁功能。ZK 中建立和删除节点只能经过 Leader 服务器来执行,而后将数据同步到全部的 Follower 机器上。还须要对 ZK的原理有所了解。

基于 Consul 作分布式锁
DD 写过相似文章,其实主要利用 Consul 的 Key / Value 存储 API 中的 acquire 和 release 操做来实现。

使用分布式锁的注意事项
一、注意分布式锁的开销

二、注意加锁的粒度

三、加锁的方式

总结不管你身处一个什么样的公司,最开始的工做可能都须要从最简单的作起。不要提阿里和腾讯的业务场景 qps 如何大,由于在这样的大场景中你未必能亲自参与项目,亲自参与项目未必能是核心的设计者,是核心的设计者未必能独自设计。但愿你们能根据本身公司业务场景,选择适合本身项目的方案。

相关文章
相关标签/搜索