分布式锁的四种JAVA实现方式

分布式锁的四种JAVA实现方式

前言

做为这一段时间学习分布式锁的总结,本文总结了四种Java分布式锁的实现方式,简单编写了代码,进行模拟实现。相关代码存放在个人github仓库java

为何要用锁

系统内,有多个消费者,须要对同一共享数据并发访问和消费时,会有线程安全问题。例如在秒杀、抢优惠券等场景下,商品库存的数量是有限的,在高并发下,会有"超买"或"超卖"的问题。所以咱们须要使用锁,解决多线程对共享数据并发访问的线程安全问题。node

咱们能够这样来模拟并发访问:mysql

一、模拟商品库存只有一件,而且提供一个减小库存数量的方法,当库存数量大于0时,能够减小库存,返回true。当库存不大于0时,不减小库存,返回false。git

public class Stock {
    //库存数量
    private static int num=1;

    // 减小库存数量的方法
    public boolean reduceStock(){
        if(num>0){
            try {
                //一些逻辑处理
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            num--;
            return true;
        }

        return false;
    }
}

二、模拟两个线程,同时进行减小库存操做。github

@SpringBootTest
public class SampleTest {

    static class StockThread implements Runnable{
        public void run() {
            //调用减小库存的方法
            boolean b = new Stock().reduceStock();

            if (b) {
                System.out.println(Thread.currentThread().getName()+"减小库存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"减小库存失败");
            }
        }
    }

    @Test
    public void test() throws InterruptedException {
        new Thread(new StockThread(),"线程1").start();
        new Thread(new StockThread(),"线程2").start();

        Thread.sleep(3000);
    }

三、从运行结果能够看出,在并发时,虽然库存数量为1,本应该只有一个线程减小库存成功,另外一个线程减小库存失败,但运行时两个线程都操做成功了,出现了线程安全问题。redis

9dfb2ce171918a6251af210bea96c86b.jpeg

单机下的锁使用方式

若是单机版的系统,咱们有不少种解决方案。咱们可使用JDK提供的ReentrantLock,在减小库存前加锁,减小库存后释放锁。这样就能避免线程安全问题。代码示例以下:spring

@SpringBootTest
public class ReentrantLockTest {

    private static Lock lock = new ReentrantLock();

    static class StockThread implements Runnable{

        public void run() {
            //减小库存前加锁
            lock.lock();
            //调用减小库存的方法
            boolean b = new Stock().reduceStock();
            //减小库存后释放锁
            lock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName()+"减小库存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"减小库存失败");
            }
        }
    }

    @Test
    public static void main(String[] args) throws InterruptedException {
        new Thread(new StockThread(),"线程1").start();
        new Thread(new StockThread(),"线程2").start();

        Thread.sleep(4000);
    }

}

为何要用分布式锁

当系统使用分布式架构时,服务会有多个实例存在。不一样服务实例内的线程,使用ReentrantLock是没法感知彼此是否对同一资源进行了加锁操做的。当多个请求进入到不一样实例时,使用ReentrantLock则依然有线程安全问题,由于咱们须要使用分布式锁,保证一个资源在同一时间内只能被同一个线程执行。sql

使用数据库实现分布式锁

数据库分布式锁,是经过操做一张锁表来实现的。对该锁表设置惟一索引,不一样请求对该表插入同一个主键的数据,若是插入成功,则认为成功获取到锁;若是插入失败,则判断获取锁失败。数据库

代码实现apache

一、建立锁表。

CREATE TABLE `lock_record` ( 
	`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键', 
	`lock_name` varchar(50) DEFAULT NULL COMMENT '锁名称', 
	PRIMARY KEY (`id`), 
	UNIQUE KEY `lock_name` (`lock_name`)
)ENGINE=InnoDB AUTO_INCREMENT=38 DEFAULT CHARSET=utf8

二、在项目内引入相关依赖,这里使用Mybatis作持久化映射。

<!--数据库驱动-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
</dependency>
<!--引入mybatis plus 就不须要引入mybatis了-->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.4.0</version>
</dependency>

三、配置数据库链接属性。

spring:
  datasource:
    url: jdbc:mysql://${mysqlAddress}:3306/${dbName}?useUnicode=true&characterEncoding=utf-8&useSSL=false
    username: ${userName}
    password: ${password}
    driver-class-name: com.mysql.jdbc.Driver

四、在启动类添加mybatis包扫描注解。

@SpringBootApplication
@MapperScan("com.haha.mapper")
public class LockDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(LockDemoApplication.class, args);
    }

}

四、编写entity和mapper。

@Data
public class LockRecord {
    private Integer id;
    private String lockName;
}
public interface LockRecordMapper extends BaseMapper<LockRecord> {

}

五、编写数据库锁类,实现Lock接口。

@Service
public class DbLock implements Lock {

    private static final String LOCK_NAME = "db_lock";

    @Autowired
    private LockRecordMapper lockRecordMapper;

    /**
     * 上锁
     */
    public synchronized void lock() {
        while (true){
            boolean b = tryLock();
            if(b){
                //添加记录
                LockRecord lockRecord = new LockRecord();
                lockRecord.setLockName(LOCK_NAME);
                lockRecordMapper.insert(lockRecord);
                return;
            }else{
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("等待中");
            }
        }
    }

    /**
     * 尝试获取锁,根据指定的名称,在数据库表中发起查询
     * @return
     */
    public boolean tryLock() {

        QueryWrapper<LockRecord> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("lock_name",LOCK_NAME);
        List<LockRecord> lockRecords = lockRecordMapper.selectList(queryWrapper);

        if(lockRecords.size()==0){
            return true;
        }

        return false;
    }

    /**
     * 解锁 删除指定名称的记录
     */
    public void unlock() {
        QueryWrapper<LockRecord> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("lock_name",LOCK_NAME);
        lockRecordMapper.delete(queryWrapper);
    }



    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }


    public void lockInterruptibly() throws InterruptedException {

    }

    public Condition newCondition() {
        return null;
    }
}

六、模拟两个线程,使用数据库锁,同时进行减小库存操做。

@SpringBootTest
public class DbLockTest {

    @Autowired
    private DbLock dbLock;

    static class StockThread implements Runnable{

        private DbLock dbLock;

        public StockThread(DbLock dbLock){
            this.dbLock = dbLock;
        }

        public void run() {

            dbLock.lock();
            //调用减小库存的方法
            boolean b = new Stock().reduceStock();

            dbLock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName()+"减小库存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"减小库存失败");
            }
        }
    }

    @Test
    public void test() throws InterruptedException {
        new Thread(new StockThread(this.dbLock),"线程1").start();
        new Thread(new StockThread(this.dbLock),"线程2").start();

        Thread.sleep(4000);
    }
}

数据库锁缺陷

一、数据库锁强依赖数据库的可用性,一旦数据库宕机,会致使业务系统不可用,所以须要对数据库作HA。

二、数据库锁没有失效时间,一旦获取该锁的请求所在的服务实例宕机,会致使该资源被长期锁住,其余请求没法获取该锁。

使用redis实现分布式锁

redis分布式锁,是经过代码调用setnx命令,只在键不存在的状况下, 将键的值设置为某个值。若键已经存在, 则setnx命令不作任何动做。为了能处理"获取该锁的请求所在的服务实例宕机,会致使该资源被长期锁住,其余请求没法获取该锁"这种状况,咱们还须要设置超时时间。

代码实现

一、在项目内引入redis相关依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.3.3.RELEASE</version>
</dependency>

二、在项目中配置redis。

spring:
  redis:
    host: 127.0.0.1
    port: 6379

三、编写redis分布式锁。

@Component
public class RedisLock implements Lock {

    private static final String LOCK_NAME = "redis_lock";

    @Autowired
    private RedisTemplate redisTemplate;


    public void lock() {
        while (true){
            Boolean b = redisTemplate.opsForValue().setIfAbsent("lockName", LOCK_NAME,10,TimeUnit.SECONDS);

            if (b) {
                return;
            }else{
                System.out.println("循环等待中");
            }
        }
    }

    public void lockInterruptibly() throws InterruptedException {

    }

    public boolean tryLock() {
        return false;
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void unlock() {
        redisTemplate.delete("lockName");
    }

    public Condition newCondition() {
        return null;
    }
}

四、模拟两个线程,使用redis分布式锁,同时进行减小库存操做。

@SpringBootTest
public class RedisLockTest {

    @Autowired
    private RedisLock redisLock;

    static class StockThread implements Runnable{

        private RedisLock redisLock;

        public StockThread(RedisLock redisLock){
            this.redisLock= redisLock;
        }

        public void run() {

            redisLock.lock();
            //调用减小库存的方法
            boolean b = new Stock().reduceStock();

            redisLock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName()+"减小库存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"减小库存失败");
            }
        }
    }

    @Test
    public void test() throws InterruptedException {
        new Thread(new StockThread(redisLock),"线程1").start();
        new Thread(new StockThread(redisLock),"线程2").start();

        Thread.sleep(4000);
    }
    
}

redis分布式锁缺陷

一、强依赖redis的可用性,一旦redis宕机,会致使业务系统不可用,所以最好搭建redis集群。

二、由于对锁设置了超时时间,若是某次请求不能在该次限制时间内完成操做,也会致使在某些时刻,多个请求获取到锁。

解决方案也很简单,咱们在调用setnx时,将值设置为该次请求线程的id,而且在服务实例内,设置一个守护线程,当锁快要超时时,判断请求是否完成,若是未完成,延长超时时间。

使用redission实现分布式锁

redisson是一个经常使用的Redis Java客户端,为Java上的分布式应用程序提供了基于Redis的对象,集合,锁,同步器和服务的分布式实现。使用redisson,咱们能够实现基于redis集群的分布式锁。

代码实现

一、启动redis集群(此处用单机模拟)。

二、在项目内引入redisson相关依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.3.3.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>

三、在项目中配置redisson,在配置文件添加redis配置和配置类。

spring:
  redis:
    host: 127.0.0.1
    port: 6379
@Configuration
public class RedissonConfig {

    @Autowired
    private RedisProperties redisProperties;

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        String redisUrl = String.format("redis://%s:%s",redisProperties.getHost()+"",redisProperties.getPort()+"");
        config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
        config.useSingleServer().setDatabase(3);
        return Redisson.create(config);
    }
}

四、模拟两个线程,使用redisson,同时进行减小库存操做。

@SpringBootTest
public class RedissionLockTest {

    @Autowired
    private Redisson redisson;

    static class StockThread implements Runnable{

        private RLock mylock;

        public StockThread(RLock lock){
            this.mylock = lock;
        }

        public void run() {

            mylock.lock();
            //调用减小库存的方法
            boolean b = new Stock().reduceStock();

            mylock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName()+"减小库存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"减小库存失败");
            }
        }
    }

    @Test
    public void test() throws InterruptedException {

        RLock lock = redisson.getLock("redisson_lock");

        new Thread(new StockThread(lock),"线程1").start();
        new Thread(new StockThread(lock),"线程2").start();
        
        Thread.sleep(4000);

    }
}

使用zookeeper实现分布式锁

zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,天生就适合用于实现分布式锁。

使用zookeeper实现分布式锁的步骤

一、设置锁的根节点。判断锁的根节点是否存在,若是不存在,首先建立根节点,例如叫“/locks”。

二、客户端若是须要占用锁,则在“/locks”下建立临时且有序的子节点,而后判断本身建立的子节点是否为当前子节点列表中序号最小的子节点。若是是则认为得到锁,不然监听前一个子节点变动消息。

三、当监听到前一个子节点已经从zookeper上被删除,则认为得到锁。

四、当客户端获取锁并完成业务流程后,则删除对应的子节点,完成释放锁的工做,以便后面的节点得到分布式锁。

五、若是客户端获取锁以后,由于某些缘由宕机,此时因为客户端与zookeeper断开链接,该客户端建立的临时有序节点也会自动从zookeeper上移除,从而让后面的节点得到分布式锁。

代码实现

一、在项目内引入zookeeper相关依赖。

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.12</version>
    <!-- 排除冲突jar -->
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

二、在项目中配置zookeeper,在配置文件添加redis配置和编写配置类。

zookeeper:
  address: 127.0.0.1:2181
  timeout: 4000
@Configuration
public class ZookeeperConfig {

    @Value("${zookeeper.address}")
    private String connectString;

    @Value("${zookeeper.timeout}")
    private Integer timeout;


    @Bean(name = "zkClient")
    public ZooKeeper zkClient(){
        ZooKeeper zooKeeper=null;
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(Event.KeeperState.SyncConnected==event.getState()){
                        //若是收到了服务端的SyncConnected响应事件,表示链接成功
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();

        }catch (Exception e){
            e.printStackTrace();
        }
        return  zooKeeper;
    }
}

三、编写zookeeper分布式锁。

public class ZkLock implements Lock {

    //zk客户端
    private ZooKeeper zk;

    //锁的根节点
    private String root ="/locks";
    //当前锁的名称
    private String lockName;

    //当前线程建立的临时有序节点
    private ThreadLocal<String> nodeId = new ThreadLocal<>();

    private final static byte[] data = new byte[0];

    public ZkLock(ZooKeeper zooKeeper, String lockName){
        this.zk = zooKeeper;
        this.lockName = lockName;

        try {
            Stat stat = zk.exists(root, false);
            if(stat==null) {
                //建立根节点
                zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 监听器,监听临时节点的删除
     */
    class LockWatcher implements Watcher{

        private CountDownLatch latch;

        public LockWatcher(CountDownLatch latch){
            this.latch = latch;
        }

        public void process(WatchedEvent watchedEvent) {
            if(watchedEvent.getType()== Event.EventType.NodeDeleted){
                latch.countDown();
            }
        }
    }

    public void lock() {
        try {
            //在根节点下建立临时有序节点
            String myNode = zk.create(root + "/" + lockName, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            System.out.println(Thread.currentThread().getName()+myNode+" create");

            //获取根节点下的全部根节点下
            List<String> subNodes = zk.getChildren(root, false);

            //排序
            TreeSet<String> sortedNodes = new TreeSet<String>();
            for(String node:subNodes){
                sortedNodes.add(root+"/"+node);
            }
            //获取序号最下的子节点
            String smallNode = sortedNodes.first();

            //若是该次建立的临时有序节点是最小的子节点,则表示取得锁
            if(myNode.equals(smallNode) ){
                System.out.println(Thread.currentThread().getName()+myNode+" get lock");
                this.nodeId.set(myNode);
                return;
            }

            //不然,取得当前节点的前一个节点
            String preNode = sortedNodes.lower(myNode);

            CountDownLatch latch = new CountDownLatch(1);
            //查询前一个节点,同时注册监听
            Stat stat = zk.exists(preNode, new LockWatcher(latch));
            // 若是比本身小的前一个节点查询时,已经不存在则无需等待,若是存在则监听
            if(stat!=null){
                System.out.println(Thread.currentThread().getName()+myNode+" waiting for "+ root+"/"+preNode+" release lock");
                latch.await();//等待
            }
            nodeId.set(myNode);

        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }

    public void unlock() {
        try{
            //释放锁时,只须要将本次建立的临时有序节点移除掉
            System.out.println(Thread.currentThread().getName()+" unlock");
            if(nodeId!=null){
                zk.delete(nodeId.get(),-1);
            }
            nodeId.remove();

        }catch (InterruptedException e){
            e.printStackTrace();
        }catch (KeeperException e){
            e.printStackTrace();
        }
    }

    public void lockInterruptibly() throws InterruptedException {

    }

    public boolean tryLock() {
        return false;
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public Condition newCondition() {
        return null;
    }
}

四、模拟两个线程,使用zookeeper分布式锁,同时进行减小库存操做。

@SpringBootTest
public class ZookeeperLockTest {

    @Autowired
    private ZooKeeper zooKeeper;

    private static final String LOCK_NAME = "zk_lock";

    static class StockThread implements Runnable{

        private ZkLock zkLock;

        public StockThread(ZkLock zkLock){
            this.zkLock= zkLock;
        }

        public void run() {

            zkLock.lock();
            //调用减小库存的方法
            boolean b = new Stock().reduceStock();

            zkLock.unlock();
            if (b) {
                System.out.println(Thread.currentThread().getName()+"减小库存成功");
            }else {
                System.out.println(Thread.currentThread().getName()+"减小库存失败");
            }
        }
    }

    @Test
    public void test() throws InterruptedException {

        ZkLock zkLock = new ZkLock(zooKeeper,LOCK_NAME);

        new Thread(new ZookeeperLockTest.StockThread(zkLock),"线程1").start();
        new Thread(new ZookeeperLockTest.StockThread(zkLock),"线程2").start();

        Thread.sleep(4000);
    }
}

zookeeper分布式锁缺陷

一、强依赖zookeeper的可用性,一旦zookeeper宕机,会致使业务系统不可用,所以最好搭建zookeeper集群。

相关文章
相关标签/搜索