在工做中接到一个需求:对于访问页面的前x名用户分发A奖品,x+1名及之后的用户分发另一种奖品。在J2EE的开发中,咱们知道servlet是单实例多线程的,Spring的Controller类也同样,因此这里须要考虑多线程并发时如何判断该用户是否为前x名。一种办法是在代码中用内存控制,例如添加一个成员变量,建立一个方法,并在内部使用synchronized
块对该变量加锁,每次调用这个方法时,来一个用户就先判断变量是否大于x,小于的话就对该变量+1,直到该变量超过x为止。可是由于咱们的代码是部署在多台服务器上的,而在多台服务器上同步内存比较麻烦,因此这种方法只适用于一台服务器的状况。另外一种方法就是在数据库级别加锁,由于咱们的数据库只有一个节点,因此只要在这一个节点上加了锁就能够控制来访的用户了。html
mysql提供了locking read机制,能够参考官方文档,一共有两种方式:SELECT ... FOR UPDATE
和SELECT ... LOCK IN SHARE MODE
。介绍它们以前,这里首先说一下X锁和S锁:java
若事务 T 对数据对象 A 加了 X 锁,则 T 就能够对 A 进行读取以及更新。在 T 释放 A 上的 X 锁之前,其它事务不能对 A 加任何类型的锁,但可使用普通select语句获取值,而这个值不能保证是最新的,由于事务 T 可能修改了 A 的值,而它尚未提交;mysql
若事务 T 对数据对象 A 加了 S 锁,则 T 就能够对 A 进行读取,但不能进行更新。在 T 释放 A 上的 S 锁之前,其余事务能够再对 A 加 S 锁,但不能加 X 锁,从而能够读取 A ,但不能更新 A;redis
SELECT ... FOR UPDATE
是:sql
为选择的行添加排它锁(X锁),保证查询到的数据是最新的数据,容许其它事务对该数据加上共享锁(S锁),但不能修改,只有当前事务能够修改,其它事务须要等当前事务commit或rollback以后才能够修改加锁的行;数据库
SELECT ... LOCK IN SHARE MODE
是缓存
为选择的行添加共享锁(S锁),其它事务也能够对该行数据添加S锁,它保证了读取到的是最新的数据,而且不容许别人修改,可是本身也** 不必定 **可以修改,由于可能别的事务也对这个数据加了S锁;安全
从上面对mysql锁的介绍能够看到,个人业务须要不只读的时候要阻止别人读最新值,并且还可能要修改读取后的结果,所以这里使用SELECT ... FOR UPDATE
语句来控制用户访问的排名最合适。
这里要注意一下,在mysql中用SELECT ... FOR UPDATE
加锁,后面的WHERE条件是主键和非主键时有不一样的加锁状况的,当WHERE后面是主键时,仅对行加锁,其它事务中能够对表的其余行进行增删改查,容许插入新的行;当WHERE后面的条件不是主键时,会锁全表,则其它事务不能对表的任意行进增删改的操做,插入新的行也不能够,只能查询。
首先在数据库建立一个简单的表,结构以下:服务器
列名 | 类型 | 备注 |
---|---|---|
LOCK_KEY | int | 主键,每一个锁是一行 |
LOCK_NUM | int | 当前排名,即代码中须要判断的变量x,初始值为0 |
LOCK_DESC | varchar | 锁的描述 |
这个表中的每一行表明一个锁,也就是说下一次搞其它的活动,若是也须要对前x名进行控制,则插入一行记录用于表明一个锁。在java代码中,建立一个跟表映射的实体类LockBean,而后在DAO中添加两个方法,分别对应于查询和修改:网络
@Select(" select LOCK_KEY, LOCK_NUM, LOCK_DESC FROM LOCK_TABLE WHERE LOCK_KEY=#{lockKey} FOR UPDATE") public LockBean findCurrentLock(int lockKey); @Update(" update LOCK_TABLE set LOCK_NUM = #{lockNum} where LOCK_KEY = #{lockKey} ") public void updateCurrentLock(LockBean lockBean);
最后,在service层中添加事务控制,保证这两个DAO的方法在一个事务里面执行。须要注意的是,SELECT ... FOR UPDATE
语句必需要关闭自动提交,例如使用普通的JDBC来调用,则须要先调用 connection.setAutocommit(flase)
关闭自动commit操做,而后在select
和update
以后,再调用connection.commit()
提交事务。若是想要在Navicat或mysql workbench中测试locking read功能,则须要先执行set autocommit=0
语句关闭自动提交,而后再进行操做。
上面的方法对于每一次用户请求,都须要经过数据库级别的SELECT ... FOR UPDATE
语句来加锁,但是每每前x名用户在总用户中所占的比例都是比较小的,毕竟大奖老是掌握在少数人手中嘛!若是每次都访问数据库,这样IO次数多了(一样也会致使网络请求次数增多,由于数据库只有一个节点)就会影响性能,因此咱们在内存中再添加一个控制。在某个类中建立一个变量,用于判断前x名的奖品是否已经分发完毕:
public static volatile boolean isQueryNecessary = true;
顺便复习一下,要使得volatile
变量提供理想的线程安全,必须同时知足如下两个条件:
- 对变量的写操做不依赖于当前值
- 该变量没有bao含在具备其余变量的不变式中
当变量声明为volatile
后,全部线程对该对象的读取都会直接从主内存中获取,不会使用缓存的值,而在CPU缓存的一些值都会被标识为过时,从而完成线程对该对象的同步操做。具体介绍可见 Java 理论与实践: 正确使用 Volatile 变量.
回归正题,在service层的处理方法giveAward()
中,伪代码以下:
if(true == isQueryNecessary) { // 若是isQueryNecessary为真,则查询数据库,注意这里可能须要等待有X锁的线程释放锁 LockBean bean = dao.findCurrentLock(lockKey); /** 判断bean中的lockNum是否>=x * true :此时可能恰好等于x,也多是在查询数据库时被别的线程抢先并更新了锁, * 即奖品别别人先抢完了,总之须要更新isQueryNecessary的值为false * isQueryNecessary = false; * false:lockNum++, * dao.updateCurrentLock(bean); */ } if(false == isQueryNecessary) { // 再次判断是由于以前在查询数据库的时候有可能结果是lockNum >= x, // 致使isQueryNecessary的值被更新为false了 // 总之这里处理x+1名之后的用户的逻辑 logicForUserAfterX(); }
这里对isQueryNecessary
判断了两次,主要是由于在多线程抢资源的状况下,变量的值可能会在等待过程当中改变,因此采用单例模式中DCL的思想,双重判断,从而确保对每一个用户请求正确分流。
经过这种优化后,对于单台服务器,顶多在第x个用户以后的部分请求(由于这些请求可能在抢第x个席位的过程当中等待)会发生多于的数据库查询操做;而对于多台服务器,也只有部分的请求会执行多于的数据库查询,只要有一个请求在查询数据库以后发现已经不知足条件了就会把isQueryNecessary
设为false,这台服务器后续的请求就不会再去查询数据库了,当所有的服务器上的isQueryNecessary
都设为false以后,集群中后续的全部请求就都再也不会查询数据库了,这样能够节省不少IO和网络操做。
redis的 setnx
命令能够用来实现分布式锁的功能,所以能够把奖品数量放到redis中,例如系统加载时从DB获取到奖品总数为80,则SET AWARDNUM 80
,接下来每一个请求线程中用setnx命令加分布式锁(具体实现能够参考网上的方案,思路是给一个常量设置值,即setnx constant value,value为随机值,设置能够的过时时间,这样只有当前线程能释放该分布式锁,若没有及时释放也能够等待锁过时后从新尝试获取),获取到分布式锁后,先判断奖品库存是否<=0,如是则同步更新内存变量,避免下次再查询redis;若是>0则表示秒杀成功,而后对该奖品数量减一,并释放分布式锁便可。
该方案参考了这篇博文。redis有多种数据结构,例如链表,它能够做为一个MQ来使用,例如每一个秒杀请求都放到队列中,再启动其它的线程去处理队列中前n个请求做为秒杀成功的处理。可是还有更简单的实现方案,例如系统初始化时从DB获取奖品数量为80,则初始化一个长度为80的list做为奖池,每一个秒杀请求进来时使用LPOP
或RPOP
命令从list中抽取一个奖品,若是返回值为空,则说明奖池已经空了,不然表示秒杀成功。由于redis命令执行的时候都是单线程的原子操做,因此该方案的好处是实现简单且不须要用分布式锁,感受分布式锁可能会更耗时间,由于即要加锁又要更新奖品数量,而这个方案只要读一次redis就能够了。