前言
前面个人博客介绍了有关分布式锁,分布式事务相关的问题以及解决方案,可是仍是不能解决并发下单,扣减的问题,并发的时候因为数据库的隔离级别/乐观锁/悲观锁...老是会出现一些问题。最近集成了一套方案解决此类型问题,并能够适用于通常状况的秒杀方案。欢迎拍砖...html
情景分析
前提条件:
商品 P 库存数量为 100
用户A,用户B 同时分别想买P,而且A,B分别买一个
数据库有版本号控制乐观锁
指望结果:
用户A,B都买到商品P,而且商品P的库存变为98redis
分析: 1.之前碰到这类问题的时候,咱们说,能够添加分布式锁(也就是悲观锁),将商品P的id锁住,而后A,B提交订 单的时候分别预扣商品P的库存就行了(如方案一)。 2.是的,1分析的有道理,咱们的核心思路确实要将并行控制为串行,可是事与愿违。咱们仔细想想: 若是,A,B想买的不只仅P一个商品,而是P,Q两个商品呢?咱们该怎么锁?一次性将P,Q都锁住?显然不是 很现实,而且实际上提交订单的时候都是从购物车下单,一个购物车里包括多个商品去下单也是很常见的, 而且还有以下逻辑你们仔细思考: 用户A下单->用户A读到库存为100 用户A预扣库存,判断剩余库存是否>=0,100-1=99>0,则A预扣 用户A的下单流程.... 此时A的事务没有提交,B下单了: 用户B下单->用户B读到库存为100(这里读到100,是由于A还有其余逻辑在执行,还未提交事务,B读未提交了!) 用户B预扣库存,判断剩余库存是否>=0,100-1=99>0,则B预扣 用户B的下单流程.... 最后不论A/B谁先提交事务,后面提交事务的就不会扣减库存成功。由于版本号不一致(就算没有乐观锁, 修改的结果也会错,并且错的更离谱)。最终的结局就是库存是99 3.解决方案 目前控制库存的方案有不少种,我这边介绍经过redis预减库存,经过mq发送消息同步的扣减数据库库存的 方案。
方案一数据库
@Transactional public void createOrder(...){ //1.校验,扣减库存 check(Item item); //2.建立订单 //3.建立支付单 } @RedisLock("pId") public void check(Item item){ }
解决方案伪代码缓存
//固然,咱们管理平台新建商品时须要初始化到redis库存里, //这里暂时就不介绍了 //下单部分 @Transactional public void createOrder(...){ //1.校验,扣减库存 check(Item item); //2.建立订单 //3.建立支付单 //4.redis扣减库存 } //支付回调部分 @Transactional public void wxCall(...){ //1.校验订单状态 //2.修改订单,支付单状态 //3.mq发送全局顺序消息 扣减库存 } //取消支付部分 @Transactional public void cancelOrder(...){ //1.校验订单状态 //2.修改订单,支付单状态 //3.redis回退库存 } //退货/退款部分 @Transactional public void returnOrder(...){ //1.校验订单状态 //2.修改订单,支付单状态 //3.redis回退库存 //4.mq发送全局顺序消息 }
代码部分
实现思路服务器
初始化库存回调函数(IStockCallback )并发
/** * 获取库存回调 * create by liuliang * on 2019-11-13 10:45 */ public interface IStockCallback { /** * 获取库存 * @return */ String getStock(); }
/** * * Redis分布式锁 * 使用 SET resource-name anystring NX EX max-lock-time 实现 * <p> * 该方案在 Redis 官方 SET 命令页有详细介绍。 * http://doc.redisfans.com/string/set.html * <p> * 在介绍该分布式锁设计以前,咱们先来看一下在从 Redis 2.6.12 开始 SET 提供的新特性, * 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX],其中: * <p> * EX seconds — 以秒为单位设置 key 的过时时间; * PX milliseconds — 以毫秒为单位设置 key 的过时时间; * NX — 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。 * XX — 将key 的值设为value ,当且仅当key 存在,等效于 SETEX。 * <p> * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。 * <p> * 客户端执行以上的命令: * <p> * 若是服务器返回 OK ,那么这个客户端得到锁。 * 若是服务器返回 NIL ,那么客户端获取锁失败,能够在稍后再重试。 * * * create by liuliang * on 2019-11-13 10:49 */ public class RedisStockLock { private static Logger logger = LoggerFactory.getLogger(RedisStockLock.class); private RedisTemplate<String, Object> redisTemplate; /** * 将key 的值设为value ,当且仅当key 不存在,等效于 SETNX。 */ public static final String NX = "NX"; /** * seconds — 以秒为单位设置 key 的过时时间,等效于EXPIRE key seconds */ public static final String EX = "EX"; /** * 调用set后的返回值 */ public static final String OK = "OK"; /** * 默认请求锁的超时时间(ms 毫秒) */ private static final long TIME_OUT = 100; /** * 默认锁的有效时间(s) */ public static final int EXPIRE = 60; /** * 解锁的lua脚本 */ public static final String UNLOCK_LUA; static { StringBuilder sb = new StringBuilder(); sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] "); sb.append("then "); sb.append(" return redis.call(\"del\",KEYS[1]) "); sb.append("else "); sb.append(" return 0 "); sb.append("end "); UNLOCK_LUA = sb.toString(); } /** * 锁标志对应的key */ private String lockKey; /** * 记录到日志的锁标志对应的key */ private String lockKeyLog = ""; /** * 锁对应的值 */ private String lockValue; /** * 锁的有效时间(s) */ private int expireTime = EXPIRE; /** * 请求锁的超时时间(ms) */ private long timeOut = TIME_OUT; /** * 锁标记 */ private volatile boolean locked = false; final Random random = new Random(); /** * 使用默认的锁过时时间和请求锁的超时时间 * * @param redisTemplate * @param lockKey 锁的key(Redis的Key) */ public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey) { this.redisTemplate = redisTemplate; this.lockKey = lockKey + "_lock"; } /** * 使用默认的请求锁的超时时间,指定锁的过时时间 * * @param redisTemplate * @param lockKey 锁的key(Redis的Key) * @param expireTime 锁的过时时间(单位:秒) */ public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime) { this(redisTemplate, lockKey); this.expireTime = expireTime; } /** * 使用默认的锁的过时时间,指定请求锁的超时时间 * * @param redisTemplate * @param lockKey 锁的key(Redis的Key) * @param timeOut 请求锁的超时时间(单位:毫秒) */ public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, long timeOut) { this(redisTemplate, lockKey); this.timeOut = timeOut; } /** * 锁的过时时间和请求锁的超时时间都是用指定的值 * * @param redisTemplate * @param lockKey 锁的key(Redis的Key) * @param expireTime 锁的过时时间(单位:秒) * @param timeOut 请求锁的超时时间(单位:毫秒) */ public RedisStockLock(RedisTemplate<String, Object> redisTemplate, String lockKey, int expireTime, long timeOut) { this(redisTemplate, lockKey, expireTime); this.timeOut = timeOut; } /** * 尝试获取锁 超时返回 * * @return */ public boolean tryLock() { // 生成随机key lockValue = UUID.randomUUID().toString(); // 请求锁超时时间,纳秒 long timeout = timeOut * 1000000; // 系统当前时间,纳秒 long nowTime = System.nanoTime(); while ((System.nanoTime() - nowTime) < timeout) { if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) { locked = true; // 上锁成功结束请求 return locked; } // 每次请求等待一段时间 seleep(10, 50000); } return locked; } /** * 尝试获取锁 当即返回 * * @return 是否成功得到锁 */ public boolean lock() { lockValue = UUID.randomUUID().toString(); //不存在则添加 且设置过时时间(单位ms) String result = set(lockKey, lockValue, expireTime); locked = OK.equalsIgnoreCase(result); return locked; } /** * 以阻塞方式的获取锁 * * @return 是否成功得到锁 */ public boolean lockBlock() { lockValue = UUID.randomUUID().toString(); while (true) { //不存在则添加 且设置过时时间(单位ms) String result = set(lockKey, lockValue, expireTime); if (OK.equalsIgnoreCase(result)) { locked = true; return locked; } // 每次请求等待一段时间 seleep(10, 50000); } } /** * 解锁 * <p> * 能够经过如下修改,让这个锁实现更健壮: * <p> * 不使用固定的字符串做为键的值,而是设置一个不可猜想(non-guessable)的长随机字符串,做为口令串(token)。 * 不使用 DEL 命令来释放锁,而是发送一个 Lua 脚本,这个脚本只在客户端传入的值和键的口令串相匹配时,才对键进行删除。 * 这两个改动能够防止持有过时锁的客户端误删现有锁的状况出现。 */ public Boolean unlock() { // 只有加锁成功而且锁还有效才去释放锁 // 只有加锁成功而且锁还有效才去释放锁 if (locked) { return (Boolean) redisTemplate.execute(new RedisCallback<Boolean>() { @Override public Boolean doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); Long result = 0L; List<String> keys = new ArrayList<>(); keys.add(lockKey); List<String> values = new ArrayList<>(); values.add(lockValue); // 集群模式 if (nativeConnection instanceof JedisCluster) { result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values); } // 单机模式 if (nativeConnection instanceof Jedis) { result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values); } if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) { logger.info("Redis分布式锁,解锁{}失败!解锁时间:{}", lockKeyLog, System.currentTimeMillis()); } locked = result == 0; return result == 1; } }); } return true; } /** * 获取锁状态 * @Title: isLock * @Description: TODO * @return * @author yuhao.wang */ public boolean isLock() { return locked; } /** * 重写redisTemplate的set方法 * <p> * 命令 SET resource-name anystring NX EX max-lock-time 是一种在 Redis 中实现锁的简单方法。 * <p> * 客户端执行以上的命令: * <p> * 若是服务器返回 OK ,那么这个客户端得到锁。 * 若是服务器返回 NIL ,那么客户端获取锁失败,能够在稍后再重试。 * * @param key 锁的Key * @param value 锁里面的值 * @param seconds 过去时间(秒) * @return */ private String set(final String key, final String value, final long seconds) { Assert.isTrue(!StringUtils.isEmpty(key), "key不能为空"); return (String) redisTemplate.execute(new RedisCallback<String>() { @Override public String doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); String result = null; if (nativeConnection instanceof JedisCommands) { result = ((JedisCommands) nativeConnection).set(key, value, NX, EX, seconds); } if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) { logger.info("获取锁{}的时间:{}", lockKeyLog, System.currentTimeMillis()); } return result; } }); } /** * @param millis 毫秒 * @param nanos 纳秒 * @Title: seleep * @Description: 线程等待时间 * @author yuhao.wang */ private void seleep(long millis, int nanos) { try { Thread.sleep(millis, random.nextInt(nanos)); } catch (InterruptedException e) { logger.info("获取分布式锁休眠被中断:", e); } } public String getLockKeyLog() { return lockKeyLog; } public void setLockKeyLog(String lockKeyLog) { this.lockKeyLog = lockKeyLog; } public int getExpireTime() { return expireTime; } public void setExpireTime(int expireTime) { this.expireTime = expireTime; } public long getTimeOut() { return timeOut; } public void setTimeOut(long timeOut) { this.timeOut = timeOut; } }
/** * 扣库存 * create by liuliang * on 2019-11-13 10:46 */ @Service public class StockComponent { Logger logger = LoggerFactory.getLogger(StockComponent.class); /** * 不限库存 */ public static final long UNINITIALIZED_STOCK = -3L; /** * Redis 客户端 */ @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 执行扣库存的脚本 */ public static final String STOCK_LUA; static { /** * * @desc 扣减库存Lua脚本 * 库存(stock)-1:表示不限库存 * 库存(stock)0:表示没有库存 * 库存(stock)大于0:表示剩余库存 * * @params 库存key * @return * -3:库存未初始化 * -2:库存不足 * -1:不限库存 * 大于等于0:剩余库存(扣减以后剩余的库存) * redis缓存的库存(value)是-1表示不限库存,直接返回-1 */ StringBuilder sb = new StringBuilder(); sb.append("if (redis.call('exists', KEYS[1]) == 1) then"); sb.append(" local stock = tonumber(redis.call('get', KEYS[1]));"); sb.append(" local num = tonumber(ARGV[1]);"); sb.append(" if (stock == -1) then"); sb.append(" return -1;"); sb.append(" end;"); sb.append(" if (stock >= num) then"); sb.append(" return redis.call('incrby', KEYS[1], 0 - num);"); sb.append(" end;"); sb.append(" return -2;"); sb.append("end;"); sb.append("return -3;"); STOCK_LUA = sb.toString(); } /** * @param key 库存key * @param expire 库存有效时间,单位秒 * @param num 扣减数量 * @param stockCallback 初始化库存回调函数 * @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存以后的剩余库存 */ public long stock(String key, long expire, int num, IStockCallback stockCallback) { long stock = stock(key, num); // 初始化库存 if (stock == UNINITIALIZED_STOCK) { RedisStockLock redisLock = new RedisStockLock(redisTemplate, key); try { // 获取锁 if (redisLock.tryLock()) { // 双重验证,避免并发时重复回源到数据库 stock = stock(key, num); if (stock == UNINITIALIZED_STOCK) { // 获取初始化库存 final String initStock = stockCallback.getStock(); // 将库存设置到redis redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS); // 调一次扣库存的操做 stock = stock(key, num); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { redisLock.unlock(); } } return stock; } /** * 加库存(还原库存) * * @param key 库存key * @param num 库存数量 * @return */ public long addStock(String key, int num) { return addStock(key, null, num); } /** * 加库存 * * @param key 库存key * @param expire 过时时间(秒) * @param num 库存数量 * @return */ public long addStock(String key, Long expire, int num) { boolean hasKey = redisTemplate.hasKey(key); // 判断key是否存在,存在就直接更新 if (hasKey) { return redisTemplate.opsForValue().increment(key, num); } Assert.notNull(expire,"初始化库存失败,库存过时时间不能为null"); RedisStockLock redisLock = new RedisStockLock(redisTemplate, key); try { if (redisLock.tryLock()) { // 获取到锁后再次判断一下是否有key hasKey = redisTemplate.hasKey(key); if (!hasKey) { // 初始化库存 redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS); } } } catch (Exception e) { logger.error(e.getMessage(), e); } finally { redisLock.unlock(); } return num; } /** * 获取库存 * * @param key 库存key * @return -1:不限库存; 大于等于0:剩余库存 */ public int getStock(String key) { Integer stock = (Integer) redisTemplate.opsForValue().get(key); return stock == null ? -1 : stock; } /** * 扣库存 * * @param key 库存key * @param num 扣减库存数量 * @return 扣减以后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存以后的剩余库存】 */ private Long stock(String key, int num) { // 脚本里的KEYS参数 List<String> keys = new ArrayList<>(); keys.add(key); // 脚本里的ARGV参数 List<String> args = new ArrayList<>(); args.add(Integer.toString(num)); long result = redisTemplate.execute(new RedisCallback<Long>() { @Override public Long doInRedis(RedisConnection connection) throws DataAccessException { Object nativeConnection = connection.getNativeConnection(); // 集群模式和单机模式虽然执行脚本的方法同样,可是没有共同的接口,因此只能分开执行 // 集群模式 if (nativeConnection instanceof JedisCluster) { return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args); } // 单机模式 else if (nativeConnection instanceof Jedis) { return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args); } return UNINITIALIZED_STOCK; } }); return result; } }
/** * 库存操做对外接口 * * create by liuliang * on 2019-11-13 11:00 */ @Slf4j @Service public class StockService { @Autowired private StockComponent stockComponent; @Autowired private ProductSkuMapper skuMapper; @Autowired private ProductMapper productMapper; @Autowired private RocketMQConfig rocketMQConfig; @Autowired private PresentRocketProducer presentRocketProducer; private static final String REDIS_STOCK_KEY="redis_key:stock:"; /** * 扣减库存 * @param skuId * @param num * @return */ public Boolean stock(String skuId,Integer num) { // 库存ID String redisKey = REDIS_STOCK_KEY + skuId; long stock = stockComponent.stock(redisKey, 60 * 60, num, () -> initStock(skuId)); if(stock < 0){//异常,库存不足 log.info("库存不足........"); ProductSku productSku = skuMapper.selectById(skuId); throw new MallException(MsRespCode.STOCK_NUMBER_ERROR,new Object[]{productMapper.selectById(productSku.getProductId()).getTitle()}); } return stock >= 0 ; } /** * 添加redis - sku库存数量 * @param skuId * @param num * @return */ public Long addStock(String skuId ,Integer num) { // 库存ID String redisKey = REDIS_STOCK_KEY + skuId; long l = stockComponent.addStock(redisKey, num); return l; } /** * 获取初始的库存 * * @return */ private String initStock(String skuId) { //初始化库存 ProductSku productSku = skuMapper.selectById(skuId); return productSku.getStockNumber()+""; } /** * 获取sku库存 * @param skuId * @return */ public Integer getStock(String skuId) { // 库存ID String redisKey = REDIS_STOCK_KEY + skuId; return stockComponent.getStock(redisKey); } }
redis序列化app
/** * create by liuliang * on 2019-11-13 11:29 */ @Configuration public class RedisConfig { /** * 重写Redis序列化方式,使用Json方式: * 当咱们的数据存储到Redis的时候,咱们的键(key)和值(value)都是经过Spring提供的Serializer序列化到数据库的。RedisTemplate默认使用的是JdkSerializationRedisSerializer,StringRedisTemplate默认使用的是StringRedisSerializer。 * Spring Data JPA为咱们提供了下面的Serializer: * GenericToStringSerializer、Jackson2JsonRedisSerializer、JacksonJsonRedisSerializer、JdkSerializationRedisSerializer、OxmSerializer、StringRedisSerializer。 * 在此咱们将本身配置RedisTemplate并定义Serializer。 * * @param redisConnectionFactory * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // 设置值(value)的序列化采用Jackson2JsonRedisSerializer。 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); // 设置键(key)的序列化采用StringRedisSerializer。 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.afterPropertiesSet(); return redisTemplate; } }