redis锁类型及简单实现

1. redis加锁分类

  1. redis能用的的加锁命令分表是INCRSETNXSET

2. 第一种锁命令INCR

这种加锁的思路是, key 不存在,那么 key 的值会先被初始化为 0 ,而后再执行 INCR 操做进行加一。 
而后其它用户在执行 INCR 操做进行加一时,若是返回的数大于 1 ,说明这个锁正在被使用当中。java

一、 客户端A请求服务器获取key的值为1表示获取了锁
二、 客户端B也去请求服务器获取key的值为2表示获取锁失败
三、 客户端A执行代码完成,删除锁
四、 客户端B在等待一段时间后在去请求的时候获取key的值为1表示获取锁成功
五、 客户端B执行代码完成,删除锁

$redis->incr($key);
$redis->expire($key, $ttl); //设置生成时间为1秒

3. 第二种锁SETNX

这种加锁的思路是,若是 key 不存在,将 key 设置为 value 
若是 key 已存在,则 SETNX 不作任何动做redis

一、 客户端A请求服务器设置key的值,若是设置成功就表示加锁成功
二、 客户端B也去请求服务器设置key的值,若是返回失败,那么就表明加锁失败
三、 客户端A执行代码完成,删除锁
四、 客户端B在等待一段时间后在去请求设置key的值,设置成功
五、 客户端B执行代码完成,删除锁

$redis->setNX($key, $value);
$redis->expire($key, $ttl);

4. 第三种锁SET

上面两种方法都有一个问题,会发现,都须要设置 key 过时。那么为何要设置key过时呢?若是请求执行由于某些缘由意外退出了,致使建立了锁可是没有删除锁,那么这个锁将一直存在,以致于之后缓存再也得不到更新。因而乎咱们须要给锁加一个过时时间以防不测。 
可是借助 Expire 来设置就不是原子性操做了。因此还能够经过事务来确保原子性,可是仍是有些问题,因此官方就引用了另一个,使用 SET 命令自己已经从版本 2.6.12 开始包含了设置过时时间的功能。缓存

一、 客户端A请求服务器设置key的值,若是设置成功就表示加锁成功
二、 客户端B也去请求服务器设置key的值,若是返回失败,那么就表明加锁失败
三、 客户端A执行代码完成,删除锁
四、 客户端B在等待一段时间后在去请求设置key的值,设置成功
五、 客户端B执行代码完成,删除锁
$redis->set($key, $value, array('nx', 'ex' => $ttl));  //ex表示秒

5. 其它问题

虽然上面一步已经知足了咱们的需求,可是仍是要考虑其它问题? 
一、 redis发现锁失败了要怎么办?中断请求仍是循环请求? 
二、 循环请求的话,若是有一个获取了锁,其它的在去获取锁的时候,是否是容易发生抢锁的可能? 
三、 锁提早过时后,客户端A还没执行完,而后客户端B获取到了锁,这时候客户端A执行完了,会不会在删锁的时候把B的锁给删掉?服务器

6. 解决办法

针对问题1:使用循环请求,循环请求去获取锁 
针对问题2:针对第二个问题,在循环请求获取锁的时候,加入睡眠功能,等待几毫秒在执行循环 
针对问题3:在加锁的时候存入的key是随机的。这样的话,每次在删除key的时候判断下存入的key里的value和本身存的是否同样测试

setnx的Java简单实现:spa

获取锁:.net

/**
	 * 获取一次锁
	 * @param redisClient
	 * @return
	 * 	<p>true : 获取到锁</p>
	 * 	<p>false :未获取到锁</p>
	 */
	public static boolean getLock(Jedis redisClient) {
		//是否获取到锁
	    boolean hasLock = false;
	    try {
	        hasLock = redisClient.setnx(lockKey, "lockObj") == 1;
	        if (hasLock) {
	            redisClient.expire(lockKey, lockTime);//一小时
	        }
	    } catch (Exception e) {
	        redisClient.expire(lockKey, lockTime);//一小时
	    }
	    return hasLock;
	}

此实现会有上面的问题1,只是获取一次,若是获取失败,则再也不获取。线程

若是要继续获取,须要使用方本身实现循环获取逻辑和超时逻辑。code

修改下:blog

/**
	 * 循环获取锁,直到过超时时间
	 * @param redisClient
	 * @param timeout	单位s
	 * @return
	 * 	<p>true : 获取到锁</p>
	 * 	<p>false :未获取到锁</p>
	 */
	public static boolean getLockTimeOut(Jedis redisClient,int timeout) {
		//当前的毫秒
		long start = System.currentTimeMillis();
		//超时时间转换为毫秒单位
		timeout = timeout * 1000;
		//是否获取到锁
		boolean hasLock = false;
		
		while(!hasLock){
			try {
				hasLock = redisClient.setnx(lockKey, "lockObj") == 1;
				//获取到锁
				if (hasLock) {
					redisClient.expire(lockKey, lockTime);//一小时
				} else {
					//未获取到锁,判断是否超过超时时间
					long now = System.currentTimeMillis();
					//当前时间超过起始时间的时间间隔,当间隔>=超时时间,则中止获取锁
					if(now - start >= timeout){
						System.out.println("--------获取锁超时,再也不获取--------");
						break;
					}
					//睡眠,下降抢锁频率,缓解redis压力
					Thread.sleep(500);
				}
			} catch (Exception e) {
				redisClient.expire(lockKey, lockTime);//一小时
			}
		}
		return hasLock;
	}

此种实现,经过Thread.sleep(500)来下降抢锁频率,用以处理问题2,同时,经过while来实现循环获取锁逻辑,直到超过超时时间。这样一来,外部调用时就不用再考虑自实现循环和超时问题了。

针对问题3,还未实现,晚点实现再发吧

 

附一个简单测试类:

package com.paic.elis.elis_smp_cms.redis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisCount2 {

	static JedisPoolConfig config = new JedisPoolConfig();
	static JedisPool newPool = null;
	static Jedis jedisCli = null;
	static int lockTime = 60 * 60;	//单位:S
	static final String lockKey = "lockKey";
	static {
		config.setMaxTotal(20);
		newPool = new JedisPool(config, "10.20.130.34", 4436,20000,"quul5trl");
		jedisCli = newPool.getResource();
		jedisCli.del("mqCount");
		jedisCli.del(lockKey);
	}
	
	public static void main(String[] args) {
//		test1();
//		test2();
		test3();
//		test1();
	}
	
	//多个线程去获取锁,仅有获取到锁的线程才会执行,其余线程被丢弃
	public static void test1(){
		for(int i = 0 ; i < 10; i++){
			new Thread(){
				public void run(){
					Jedis jedisC = newPool.getResource();
					boolean flag = false;
					flag = getLock(jedisC);
					//获取到锁
					if(flag){
						System.out.println("获取到锁,开始处理");
						//业务逻辑执行
						for(int i = 0 ; i < 100 ; i++){
							jedisC.incr("mqCount");
						}
						System.out.println(jedisC.get("mqCount"));
						releaseLock(jedisC);
						System.out.println("释放锁成功");
					}
				}
			}.start();
		}
	}
	

	//多个线程去获取锁,按照获取到锁的顺序执行,一直等到全部线程执行完毕
	public static void test2(){
		for(int i = 0 ; i < 10; i++){
			new Thread(){
				public void run(){
					Jedis jedisC = newPool.getResource();
					boolean flag = false;
					while(!flag){
						flag = getLock(jedisC);
						//获取到锁
						if(flag){
							System.out.println("获取到锁,开始处理");
							//业务逻辑执行
							for(int i = 0 ; i < 100 ; i++){
								jedisC.incr("mqCount");
							}
							System.out.println(jedisC.get("mqCount"));
							releaseLock(jedisC);
							System.out.println("释放锁成功");
						}
					}
				}
			}.start();
		}
	}

	//多个线程去获取锁,按照获取到锁的顺序执行,等到超时时间以后仍未执行的线程被丢弃
	public static void test3(){
		for(int i = 0 ; i < 10; i++){
			new Thread(){
				public void run(){
					Jedis jedisC = newPool.getResource();
					boolean flag = false;
					flag = getLockTimeOut(jedisC,2);
					//获取到锁
					if(flag){
						System.out.println("获取到锁,开始处理");
						//业务逻辑执行
						for(int i = 0 ; i < 100 ; i++){
							jedisC.incr("mqCount");
						}
						System.out.println(jedisC.get("mqCount"));
						releaseLock(jedisC);
						System.out.println("释放锁成功");
					}
				}
			}.start();
		}
	}

	/**
	 * 获取到锁则执行,未获取则不执行(放弃本次执行)
	 * @param redisClient
	 * @return
	 */
	public static void doMethod(Jedis redisClient) {
		//未获取到锁--直接返回
		if(!getLock(redisClient)) {
			return;
		}
		//获取到锁,开始处理
		try{
			System.out.println("获取到锁,开始处理");
			//业务逻辑执行
			for(int i = 0 ; i < 100 ; i++){
				redisClient.incr("mqCount");
			}
			System.out.println(redisClient.get("mqCount"));
		} finally {
			// 只要获取到锁,则在业务逻辑结束以后,必须释放锁
			releaseLock(redisClient);
		}
	}
	
	/**
	 * 获取到锁则执行,未获取则一直尝试获取,直到获取到锁为止
	 * @param redisClient
	 * @return
	 */
	public static void doMethodContinue(Jedis redisClient) {
		
		boolean flag = false;
		while(!flag){
			flag = getLock(redisClient);
			//若是获取到锁,则继续执行,不然循环获取
			if(flag){
				try{
					System.out.println("获取到锁,开始处理");
				} finally {
					// 只要获取到锁,则在业务逻辑结束以后,必须释放锁
					releaseLock(redisClient);
				}
			}
			//间隔0.5s再次获取
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
			}
		}
	}
	
	/**
	 * 获取到锁则执行,未获取则一直尝试获取,直到到达超时时间
	 * @param redisClient
	 * @param timeout	单位S
	 */
	public static void doMethodContinueTimeout(Jedis redisClient,int timeout) {
		
		//当前的毫秒
		long start = System.currentTimeMillis();
		//超时时间转换为毫秒单位
		timeout = timeout * 1000;
		boolean flag = false;
		while(!flag){
			flag = getLock(redisClient);
			//若是获取到锁,则继续执行,不然循环获取
			if(flag){
				try{
					System.out.println("获取到锁,开始处理");
				} finally {
					// 只要获取到锁,则在业务逻辑结束以后,必须释放锁
					releaseLock(redisClient);
				}
			}
			long now = System.currentTimeMillis();
			//当前时间超过起始时间的时间间隔,当间隔>=超时时间,则中止获取锁
			if(now - start >= timeout){
				flag  = true;
			}
			//睡眠,下降抢锁频率,缓解redis压力
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	/**
	 * 获取一次锁
	 * @param redisClient
	 * @return
	 * 	<p>true : 获取到锁</p>
	 * 	<p>false :未获取到锁</p>
	 */
	public static boolean getLock(Jedis redisClient) {
		//是否获取到锁
	    boolean hasLock = false;
	    try {
	        hasLock = redisClient.setnx(lockKey, "lockObj") == 1;
	        if (hasLock) {
	            redisClient.expire(lockKey, lockTime);//一小时
	        }
	    } catch (Exception e) {
	        redisClient.expire(lockKey, lockTime);//一小时
	    }
	    return hasLock;
	}
	
	/**
	 * 循环获取锁,直到过超时时间
	 * @param redisClient
	 * @param timeout	单位s
	 * @return
	 * 	<p>true : 获取到锁</p>
	 * 	<p>false :未获取到锁</p>
	 */
	public static boolean getLockTimeOut(Jedis redisClient,int timeout) {
		//当前的毫秒
		long start = System.currentTimeMillis();
		//超时时间转换为毫秒单位
		timeout = timeout * 1000;
		//是否获取到锁
		boolean hasLock = false;
		
		while(!hasLock){
			try {
				hasLock = redisClient.setnx(lockKey, "lockObj") == 1;
				//获取到锁
				if (hasLock) {
					redisClient.expire(lockKey, lockTime);//一小时
				} else {
					//未获取到锁,判断是否超过超时时间
					long now = System.currentTimeMillis();
					//当前时间超过起始时间的时间间隔,当间隔>=超时时间,则中止获取锁
					if(now - start >= timeout){
						System.out.println("--------获取锁超时,再也不获取--------");
						break;
					}
					//睡眠,下降抢锁频率,缓解redis压力
					Thread.sleep(500);
				}
			} catch (Exception e) {
				redisClient.expire(lockKey, lockTime);//一小时
			}
		}
		return hasLock;
	}

	/**
	 * 释放锁
	 * @param redisClient
	 */
	public static void releaseLock(Jedis redisClient) {
	    redisClient.del(lockKey);
	}

}

 

主要转自:

http://blog.csdn.net/Dennis_ukagaka/article/details/78072274

相关文章
相关标签/搜索