INCR
、SETNX
、SET
INCR
这种加锁的思路是, key 不存在,那么 key 的值会先被初始化为 0 ,而后再执行 INCR 操做进行加一。
而后其它用户在执行 INCR 操做进行加一时,若是返回的数大于 1 ,说明这个锁正在被使用当中。java
一、 客户端A请求服务器获取key的值为1表示获取了锁 二、 客户端B也去请求服务器获取key的值为2表示获取锁失败 三、 客户端A执行代码完成,删除锁 四、 客户端B在等待一段时间后在去请求的时候获取key的值为1表示获取锁成功 五、 客户端B执行代码完成,删除锁 $redis->incr($key); $redis->expire($key, $ttl); //设置生成时间为1秒
SETNX
这种加锁的思路是,若是 key 不存在,将 key 设置为 value
若是 key 已存在,则 SETNX
不作任何动做redis
一、 客户端A请求服务器设置key的值,若是设置成功就表示加锁成功 二、 客户端B也去请求服务器设置key的值,若是返回失败,那么就表明加锁失败 三、 客户端A执行代码完成,删除锁 四、 客户端B在等待一段时间后在去请求设置key的值,设置成功 五、 客户端B执行代码完成,删除锁 $redis->setNX($key, $value); $redis->expire($key, $ttl);
SET
上面两种方法都有一个问题,会发现,都须要设置 key 过时。那么为何要设置key过时呢?若是请求执行由于某些缘由意外退出了,致使建立了锁可是没有删除锁,那么这个锁将一直存在,以致于之后缓存再也得不到更新。因而乎咱们须要给锁加一个过时时间以防不测。
可是借助 Expire 来设置就不是原子性操做了。因此还能够经过事务来确保原子性,可是仍是有些问题,因此官方就引用了另一个,使用 SET
命令自己已经从版本 2.6.12 开始包含了设置过时时间的功能。缓存
一、 客户端A请求服务器设置key的值,若是设置成功就表示加锁成功 二、 客户端B也去请求服务器设置key的值,若是返回失败,那么就表明加锁失败 三、 客户端A执行代码完成,删除锁 四、 客户端B在等待一段时间后在去请求设置key的值,设置成功 五、 客户端B执行代码完成,删除锁 $redis->set($key, $value, array('nx', 'ex' => $ttl)); //ex表示秒
虽然上面一步已经知足了咱们的需求,可是仍是要考虑其它问题?
一、 redis发现锁失败了要怎么办?中断请求仍是循环请求?
二、 循环请求的话,若是有一个获取了锁,其它的在去获取锁的时候,是否是容易发生抢锁的可能?
三、 锁提早过时后,客户端A还没执行完,而后客户端B获取到了锁,这时候客户端A执行完了,会不会在删锁的时候把B的锁给删掉?服务器
针对问题1:使用循环请求,循环请求去获取锁
针对问题2:针对第二个问题,在循环请求获取锁的时候,加入睡眠功能,等待几毫秒在执行循环
针对问题3:在加锁的时候存入的key是随机的。这样的话,每次在删除key的时候判断下存入的key里的value和本身存的是否同样测试
setnx的Java简单实现:spa
获取锁:.net
/** * 获取一次锁 * @param redisClient * @return * <p>true : 获取到锁</p> * <p>false :未获取到锁</p> */ public static boolean getLock(Jedis redisClient) { //是否获取到锁 boolean hasLock = false; try { hasLock = redisClient.setnx(lockKey, "lockObj") == 1; if (hasLock) { redisClient.expire(lockKey, lockTime);//一小时 } } catch (Exception e) { redisClient.expire(lockKey, lockTime);//一小时 } return hasLock; }
此实现会有上面的问题1,只是获取一次,若是获取失败,则再也不获取。线程
若是要继续获取,须要使用方本身实现循环获取逻辑和超时逻辑。code
修改下:blog
/** * 循环获取锁,直到过超时时间 * @param redisClient * @param timeout 单位s * @return * <p>true : 获取到锁</p> * <p>false :未获取到锁</p> */ public static boolean getLockTimeOut(Jedis redisClient,int timeout) { //当前的毫秒 long start = System.currentTimeMillis(); //超时时间转换为毫秒单位 timeout = timeout * 1000; //是否获取到锁 boolean hasLock = false; while(!hasLock){ try { hasLock = redisClient.setnx(lockKey, "lockObj") == 1; //获取到锁 if (hasLock) { redisClient.expire(lockKey, lockTime);//一小时 } else { //未获取到锁,判断是否超过超时时间 long now = System.currentTimeMillis(); //当前时间超过起始时间的时间间隔,当间隔>=超时时间,则中止获取锁 if(now - start >= timeout){ System.out.println("--------获取锁超时,再也不获取--------"); break; } //睡眠,下降抢锁频率,缓解redis压力 Thread.sleep(500); } } catch (Exception e) { redisClient.expire(lockKey, lockTime);//一小时 } } return hasLock; }
此种实现,经过Thread.sleep(500)来下降抢锁频率,用以处理问题2,同时,经过while来实现循环获取锁逻辑,直到超过超时时间。这样一来,外部调用时就不用再考虑自实现循环和超时问题了。
针对问题3,还未实现,晚点实现再发吧
附一个简单测试类:
package com.paic.elis.elis_smp_cms.redis; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class RedisCount2 { static JedisPoolConfig config = new JedisPoolConfig(); static JedisPool newPool = null; static Jedis jedisCli = null; static int lockTime = 60 * 60; //单位:S static final String lockKey = "lockKey"; static { config.setMaxTotal(20); newPool = new JedisPool(config, "10.20.130.34", 4436,20000,"quul5trl"); jedisCli = newPool.getResource(); jedisCli.del("mqCount"); jedisCli.del(lockKey); } public static void main(String[] args) { // test1(); // test2(); test3(); // test1(); } //多个线程去获取锁,仅有获取到锁的线程才会执行,其余线程被丢弃 public static void test1(){ for(int i = 0 ; i < 10; i++){ new Thread(){ public void run(){ Jedis jedisC = newPool.getResource(); boolean flag = false; flag = getLock(jedisC); //获取到锁 if(flag){ System.out.println("获取到锁,开始处理"); //业务逻辑执行 for(int i = 0 ; i < 100 ; i++){ jedisC.incr("mqCount"); } System.out.println(jedisC.get("mqCount")); releaseLock(jedisC); System.out.println("释放锁成功"); } } }.start(); } } //多个线程去获取锁,按照获取到锁的顺序执行,一直等到全部线程执行完毕 public static void test2(){ for(int i = 0 ; i < 10; i++){ new Thread(){ public void run(){ Jedis jedisC = newPool.getResource(); boolean flag = false; while(!flag){ flag = getLock(jedisC); //获取到锁 if(flag){ System.out.println("获取到锁,开始处理"); //业务逻辑执行 for(int i = 0 ; i < 100 ; i++){ jedisC.incr("mqCount"); } System.out.println(jedisC.get("mqCount")); releaseLock(jedisC); System.out.println("释放锁成功"); } } } }.start(); } } //多个线程去获取锁,按照获取到锁的顺序执行,等到超时时间以后仍未执行的线程被丢弃 public static void test3(){ for(int i = 0 ; i < 10; i++){ new Thread(){ public void run(){ Jedis jedisC = newPool.getResource(); boolean flag = false; flag = getLockTimeOut(jedisC,2); //获取到锁 if(flag){ System.out.println("获取到锁,开始处理"); //业务逻辑执行 for(int i = 0 ; i < 100 ; i++){ jedisC.incr("mqCount"); } System.out.println(jedisC.get("mqCount")); releaseLock(jedisC); System.out.println("释放锁成功"); } } }.start(); } } /** * 获取到锁则执行,未获取则不执行(放弃本次执行) * @param redisClient * @return */ public static void doMethod(Jedis redisClient) { //未获取到锁--直接返回 if(!getLock(redisClient)) { return; } //获取到锁,开始处理 try{ System.out.println("获取到锁,开始处理"); //业务逻辑执行 for(int i = 0 ; i < 100 ; i++){ redisClient.incr("mqCount"); } System.out.println(redisClient.get("mqCount")); } finally { // 只要获取到锁,则在业务逻辑结束以后,必须释放锁 releaseLock(redisClient); } } /** * 获取到锁则执行,未获取则一直尝试获取,直到获取到锁为止 * @param redisClient * @return */ public static void doMethodContinue(Jedis redisClient) { boolean flag = false; while(!flag){ flag = getLock(redisClient); //若是获取到锁,则继续执行,不然循环获取 if(flag){ try{ System.out.println("获取到锁,开始处理"); } finally { // 只要获取到锁,则在业务逻辑结束以后,必须释放锁 releaseLock(redisClient); } } //间隔0.5s再次获取 try { Thread.sleep(500); } catch (InterruptedException e) { } } } /** * 获取到锁则执行,未获取则一直尝试获取,直到到达超时时间 * @param redisClient * @param timeout 单位S */ public static void doMethodContinueTimeout(Jedis redisClient,int timeout) { //当前的毫秒 long start = System.currentTimeMillis(); //超时时间转换为毫秒单位 timeout = timeout * 1000; boolean flag = false; while(!flag){ flag = getLock(redisClient); //若是获取到锁,则继续执行,不然循环获取 if(flag){ try{ System.out.println("获取到锁,开始处理"); } finally { // 只要获取到锁,则在业务逻辑结束以后,必须释放锁 releaseLock(redisClient); } } long now = System.currentTimeMillis(); //当前时间超过起始时间的时间间隔,当间隔>=超时时间,则中止获取锁 if(now - start >= timeout){ flag = true; } //睡眠,下降抢锁频率,缓解redis压力 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 获取一次锁 * @param redisClient * @return * <p>true : 获取到锁</p> * <p>false :未获取到锁</p> */ public static boolean getLock(Jedis redisClient) { //是否获取到锁 boolean hasLock = false; try { hasLock = redisClient.setnx(lockKey, "lockObj") == 1; if (hasLock) { redisClient.expire(lockKey, lockTime);//一小时 } } catch (Exception e) { redisClient.expire(lockKey, lockTime);//一小时 } return hasLock; } /** * 循环获取锁,直到过超时时间 * @param redisClient * @param timeout 单位s * @return * <p>true : 获取到锁</p> * <p>false :未获取到锁</p> */ public static boolean getLockTimeOut(Jedis redisClient,int timeout) { //当前的毫秒 long start = System.currentTimeMillis(); //超时时间转换为毫秒单位 timeout = timeout * 1000; //是否获取到锁 boolean hasLock = false; while(!hasLock){ try { hasLock = redisClient.setnx(lockKey, "lockObj") == 1; //获取到锁 if (hasLock) { redisClient.expire(lockKey, lockTime);//一小时 } else { //未获取到锁,判断是否超过超时时间 long now = System.currentTimeMillis(); //当前时间超过起始时间的时间间隔,当间隔>=超时时间,则中止获取锁 if(now - start >= timeout){ System.out.println("--------获取锁超时,再也不获取--------"); break; } //睡眠,下降抢锁频率,缓解redis压力 Thread.sleep(500); } } catch (Exception e) { redisClient.expire(lockKey, lockTime);//一小时 } } return hasLock; } /** * 释放锁 * @param redisClient */ public static void releaseLock(Jedis redisClient) { redisClient.del(lockKey); } }
主要转自:
http://blog.csdn.net/Dennis_ukagaka/article/details/78072274