你所不知道的库存超限作法

在互联网企业中,限购的作法,多种多样,有的别出心裁,有的因循守旧,可是种种作法皆想达到的目的,无外乎几种,商品卖的完,系统抗的住,库存不超限。虽然短短数语,却有着说不完,道不尽,轻者如释重负,重者涕泪横流的架构体验。 可是,在实际开发过程当中,库存超限,做为其中最核心的一员,到底该怎么作,如何作才会是最合适的呢?python

今天这篇文章,我将会展现给你们库存限购的五种常见的作法,并对其利弊一一探讨,因为这五种作法,有的在设计之初当作提案被否认掉的,有的在线上跑着,可是在没有任何单元测试和压测状况下,这几种超限控制的作法也许是不符合你的业务的,因此不建议直接用于生产环境。我这里权当是作抛砖引玉,期待你们更好的作法。mysql

工欲善其事必先利其器,在这里,咱们将利用一台测试环境的redis服务器当作库存超限控制的主战场,先设置库存量为10进去,而后根据此库存量,一一展开,设置库存代码以下:程序员

1:  def set_storage():
   2:      conn = redis_conn()
   3:      key = "storage_seckill"
   4:      current_storage = conn.get(key)
   5:      if current_storage == None:
   6:          conn.set(key, 10)
复制代码

为了方便性,我这里使用了python语言来书写逻辑,可是今天咱们只是讲解思想,语言这类的,你们能够本身尝试转一下。redis

上面就是咱们的设置库存到redis中的作法,很简单,就是在redis中设置一个storage_seckill的库存key,而后里面放上库存量10.sql

超限限制作法一:先获取当前库存值进行比对,而后进行扣减操做bash

1:  def storage_scenario_one():
   2:      conn = redis_conn()
   3:      key = "storage_seckill"
   4:      current_storage = conn.get(key)
   5:      current_storage_int = int(current_storage)
   6:      if current_storage_int<=0 :
   7:          return 0
   8:      result = conn.decr(key)
   9:      return result
复制代码

首先,咱们拿到当前的库存值,而后看看是否已经扣减到了零,若是扣减到了零,则不继续扣减,直接返回;若是库存还有,则利用decr原子操做进行扣减,同时返回扣减后的库存值。服务器

此种作法在小并发量下访问,问题不大;在对库存量控制不严格的业务中,问题也不大。可是若是并发量比较大一些,同时业务要求严格控制库存,那么此种作法是很是不合适的,缘由在于,在高并发状况下,get命令,decr命令,都是分开发给redis的,这样会致使比对的时候,很容易出现限制不住的状况,也就是会形成第六行的比对失效。网络

设想以下一个场景,AB两个请求进来,A获取的库存值为1,B获取的库存值为1,而后两个请求都被发到redis中进行扣减操做,而后这种场景下,A最后获得的库存值为0;可是B最后获得的库存值为-1,超限。架构

因此此种场景,因为在高并发下,get和decr操做不是一组原子性操做,会引起超限问题,被直接pass。并发

超限限制作法二:先扣减库存,而后比对,最后根据状况回滚

1:  def storage_scenario_two():
   2:      conn = redis_conn()
   3:      key = "storage_seckill"
   4:      current = conn.decr(key)
   5:      if current>=0:
   6:          return current
   7:      else:
   8:          #回滚库存
   9:          conn.incr(key)
  10:          return 0
复制代码

首先,请求进来,直接对库存值进行扣减,而后获得当前的库存值;而后,对此库存值进行校验,若是库存还有,则返回库存值,若是库存没有了,则回滚库存,以便于防止负库存量的存在。

此作法,相比作法一,要稍微可靠一些,因为redis的decr操做直接返回真实的库存值,因此每一个请求进来,只要执行了decr操做,拿到的确定是当前最准确的库存值。而后进行比对,若是库存值大于等于零,返回当前库存值,若是小于零,则将库存进行回滚。

此种作法,最大的一个问题就是,若是大批量的并发请求过来,redis承受的写操做的量,相对于方法一来讲,是加倍的,由于回滚库存的存在致使的。因此这种状况下,高并发量进来,极有可能将redis的写操做打出极限值,而后会出现不少redis写失败的错误警告。 另外一个问题和作法一是同样的,就是第五行的比对在高并发下,也是限不住的,具体的压测结果请看个人这篇stackoverflow的提问:Will redis incr command can be limitation to specific number?

因此此种场景,虽然在高并发状况下避免了redis命令的分开操做,可是却大大增长了redis的写并发量,被pass。

超限限制作法三:先递减库存,而后经过整数溢出控制,最后根据状况回滚

1:  def storage_scenario_three():
   2:      conn = redis_conn()
   3:      key = "storage_seckill"
   4:      current = conn.decr(key)
   5:      #经过整数控制溢出的作法
   6:      if storage_overflow_checker(current):
   7:          return current
   8:      else:
   9:          #回滚库存
  10:          conn.incr(key)
  11:          return 0
  12:   
  13:  def storage_overflow_checker(current_storage):
  14:      #若是当前库存未被递减到0,则check_number为int类型,isinstance方法检测结果为true
  15:      #若是当前库存已被递减到负数,则check_number为long类型,isinstance方法检测结果为false
  16:      check_number = sys.maxint - current_storage
  17:      check_result = isinstance(check_number,int)
  18:      return check_result
复制代码

说明一下,当前库存,若是为负数,则利用python的isinstance(check_number,int)检测的时候,check_result返回是false;若是为非负数,则检测的时候,check_result返回的是true,上面的storage_overflow_checker的作法,和下面的C#语言的作法是同样的,利用C#语言描述,你们可能对上面的代码更清晰一些:

1:      /**
   2:       * 经过让Integer溢出的方式来控制数量超卖(递减致使溢出)
   3:       * @param current
   4:       * @return
   5:       */
   6:      public boolean StorageOverFillChecker(long current) {
   7:          try {
   8:              //当前数值的结果计算
   9:              Long value = Integer.MAX_VALUE - current;
  10:              //尝试转变为Inter类型,若是超卖,则转换会出错;若是未超卖,则转换不会出错
  11:              Integer.parseInt(value.toString());
  12:          } catch (Exception ex) {
  13:              //值溢出
  14:              return true;
  15:          }
  16:   
  17:          return false;
  18:      }
复制代码

能够看出,此种作法和方法二很类似,只是比对部分由,直接和零比对,变成了经过检测integer是否溢出的方式来进行。这样就完全解决了高并发状况下,直接和零比对,限制不住的问题了。

虽然此种作法,相对于作法二说来,要靠谱不少,可是仍然解决不了在高并发状况下,redis写并发量加倍的问题,极有可能某个促销活动,在开始的那一刻,直接将redis的写操做打出问题来。

超限限制作法四:共享锁

1:  def storage_scenario_four():
   2:      conn = redis_conn()
   3:      key = "storage_seckill"
   4:      key_lock = key + "_lock"
   5:      if conn.setnx(key_lock, "1"):
   6:          #客户端挂掉,设置过时时间,防止其不释放锁
   7:          conn.pexpire(key_lock, 5)
   8:          current_storage = conn.get(key)
   9:          if int(current_storage)<=0 :
  10:              return 0
  11:          result = conn.decr(key)
  12:          #客户端正常,删除共享锁,提升性能
  13:          conn.delete(key_lock)
  14:          return result
  15:      else :
  16:          return "someone in it"
复制代码

前面三种,因为在高并发下都有问题,因此本作法,主要是经过setnx设置共享锁,而后请求到锁的用户请求,正常进行库存扣减操做;请求不到锁的用户请求,则直接提示有其余人在操做库存。

因为setnx的特殊性,当客户端挂掉的时候,是不会释放这个锁的,因此当请求进来的时候,首先经过pexpire命令,为锁设置过时时间,防止死锁不释放。而后执行正常的库存扣减操做,当操做完毕,删掉共享锁,能够极大的提升程序性能,不然只能等待锁慢慢过时了。

此种作法相对于上面的三种操做,经过采用共享锁,牺牲了部分性能,来规避了高并发的问题,比较推荐,可是因为redis操做命令仍是不少,而且每条都要发送到redis端执行,因此在网络传输上,耗费的时间开销是不小的。这是后面须要着力优化的方向。

看了上面四种作法,都不是很完美,其中最大的问题在于,高并发状况下,多条redis命令分开操做库存,极容易发生库存限不住的问题;同时,因为加了rollback库存操做,极容易因为redis写命令的操做数加倍致使压垮redis的风险。加了锁,虽然牺牲了部分性能,规避了高并发问题,可是redis命令操做量过多。

其实我上面一直在强调高并发,高并发。上面的四个场景,只有在高并发的状况下,才会出现问题,若是你的用户请求量没有那么多,那么采用上面四种方式之一,也不是不能够。可是如何才能知道采用起来没问题呢?其实最简单的一个方式,就是在大家本身的集群机器上,模拟活动的真实用户量,进行压测,看看会不会超限就好了,不超限的话,上面四种作法彻底知足需求。

那么,就没有比较好一些的解决方案了吗?

也不是,虽然解决这个问题,没有绝对好用的银弹,可是有相对好用的大蒜和圣水。下面的讲解,会涉及到Redisson的Redlock的源码实现,固然也会涉及一点lua方面的知识,还请提早预备一下。

偶然在研究分布式锁的时候,尝试翻阅过Redisson的Redlock的实现,并对其底层的实现方式有所记录,咱们先来看看其加锁过程的源码实现:


从上面的方法中,咱们能够看到,分布式锁的上锁过程,是首先判断一个key存不存在,若是不存在,则设置这个key,而后pexpire设置一个过时时间,来防止客户端访问的时候,挂掉了后,不释放锁的问题。为何这段lua代码就能实现分布式锁的核心呢? 缘由就是,这段代码放到一个lua脚本中,那么这段lua脚本就是一个原子性的操做。redis在执行这段lua脚本的过程当中,不会掺杂任何其余的命令。因此从根本上避免了并发操做命令的问题。

咱们都知道,一个key若是设置了过时时间,key过时后,redis是不会删掉这个key的,只有用户访问才会删除掉这个key,因此,当使用分布式锁的时候,若是设置的pexpire过时时间为5ms,那么一秒钟只能处理200个并发,性能很是低。如何解决这种性能问题呢?来看来解锁的操做:


从上面解锁的方法中,咱们能够看到,若是这个锁用完了以后,Redisson的作法是是直接删除掉的。这样能够提升很多的性能。(源码参阅,属于我本身的理解,若有谬误,还请指教)

那么按照上面这种设计思路,新的超限作法就出来了。

超限作法五:基于lua的共享锁

1:  def storage_scenario_five():
   2:      conn = redis_conn()
   3:      key = "storage_seckill"
   4:      key_lock = key + "_lock"
   5:      key_val = "get_the_key"
   6:      lua = """ 7: local key = KEYS[1] 8: local expire = KEYS[2] 9: local value = KEYS[3] 10: 11: local result = redis.call('setnx',key,value) 12: if result == 1 then 13: redis.call('pexpire', key, expire) 14: end 15: return result 16: """
  17:      locked = conn.eval(lua, 3, key_lock, 5, key_val)
  18:      print (locked == 1)
  19:      if locked == 1:
  20:          val = storage_scenario_one()
  21:          print("val:"+str(val))
  22:          #删掉共享key,用以提升性能, 不然只能默默的等其过时
  23:          conn.delete(key_lock)
  24:          return val
  25:      else:
  26:          return "someone in it"
复制代码

这种作法,实际上是作法四的衍生优化版本,优化的地方在于,将多条redis操做命令屡次发送,改为了将多条redis操做命令放在了一个原子性操做事务中一次性执行完毕,省去了不少的网络请求。若是能够,其实你也能够将业务逻辑糅合到上面的lua代码中,这样一来,性能固然会更好。

上面这种作法,若是 storage_scenario_one()这种操做是直接操做的mysql库存,则很是推荐这种作法,可是若是storage_scenario_one()这种操做直接操做的redis中的虚拟库存,则不是很推荐这种作法,不如直接用限流操做。

超限作法六: All In Lua

1:  def storage_scenario_six():
   2:      conn = redis_conn()
   3:      lua = """ 4: local storage = redis.call('get','storage_seckill') 5: if storage ~= false then 6: if tonumber(storage) > 0 then 7: return redis.call('decr','storage_seckill') 8: else 9: return 'storage is zero now, can't perform decr action' 10: end 11: else 12: return redis.call('set','storage_seckill',10) 13: end 14: """
  15:      result = conn.eval(lua,0)
  16:      print(result)
复制代码

此种作法是当前最好的作法,全部的库存扣减操做都放在lua脚本中进行,造成一个原子性操做,redis在执行上面的lua脚本的时候,是不会掺杂任何其余的执行命令的。因此这样从根本上避免了高并发下,多条命令执行带来的问题。并且上面的redis命令执行,都直接在redis服务器上,省去了网络传输时间,也没有共享锁的限制,从性能上而言,是最好的。可是,业务逻辑的lua化,相对而言是比较麻烦的,因此对于追求极限库存控制的业务,能够考虑这种作法。

好了,这就是我今天为你们带来的六种库存超限的作法,每种作法都有本身的优缺点,好使的限不住,限的住的性能不行,性能好的又须要引入lua,真心不知道如何选择了。

声明:上面六种库存超限作法,有些属于本人的推理,线上并未实际用过,若是你贸然使用而未通过压测,由此形成的损失,找老板去讨论吧。

欢迎工做一到五年的Java工程师朋友们加入Java程序员开发: 854393687 群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交代! 

相关文章
相关标签/搜索