继上回基于线程池的多线程售票demo,具体连接: http://www.cnblogs.com/xifenglou/p/8807323.html
以上算是单机版的实现,
至于分布式的项目就不能知足了,因此特别研究了一翻,采用redis 实现分布式锁机制, 实现了2.0版本。
使用redis setNx getSet方法 实现分布式锁,获取到锁的线程 将进行售票核心业务操做,具体见代码,欢迎讨论!
一.redis命令讲解:
setnx()命令:
setnx的含义就是SET if Not Exists,其主要有两个参数 setnx(key, value)。html
该方法是原子的,若是key不存在,则设置当前key成功,返回1;若是当前key已经存在,则设置当前key失败,返回0。java
get()命令:
get(key) 获取key的值,若是存在,则返回;若是不存在,则返回nil;
getset()命令:
这个命令主要有两个参数 getset(key, newValue)。该方法是原子的,对key设置newValue这个值,而且返回key原来的旧值。
假设key原来是不存在的,那么屡次执行这个命令,会出现下边的效果:
1. getset(key, "value1") 返回nil 此时key的值会被设置为value1
2. getset(key, "value2") 返回value1 此时key的值会被设置为value2
3. 依次类推!
二.具体的使用步骤以下:
1. setnx(lockkey, 当前时间+过时超时时间) ,若是返回1,则获取锁成功;若是返回0则没有获取到锁,转向2。
2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,若是小于当前系统时间,则认为这个锁已经超时,能够容许别的请求从新获取,转向3。
3. 计算newExpireTime=当前时间+过时超时时间,而后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。
4. 判断currentExpireTime与oldExpireTime 是否相等,若是相等,说明当前getset设置成功,获取到了锁。若是不相等,说明这个锁又被别的请求获取走了,那么当前请求能够直接返回失败,或者继续重试。
5. 在获取到锁以后,当前线程能够开始本身的业务处理,当处理完毕后,比较本身的处理时间和对于锁设置的超时时间,若是小于锁设置的超时时间,则直接执行delete释放锁;若是大于锁设置的超时时间,则不须要再锁进行处理。
import org.springframework.util.StopWatch;
import redis.clients.jedis.Jedis;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 使用redis
* setnx getset 方式 实现 分布式锁
*
*/
public class TicketRunnable implements Runnable {
private CountDownLatch count;
private CyclicBarrier barrier;
private static final Integer Lock_Timeout = 10000;
private static final String lockKey = "LockKey";
private volatile boolean working = true;
public TicketRunnable(CountDownLatch count,CyclicBarrier barrier) {
this.count = count;
this.barrier = barrier;
}
private int num = 20; // 总票数
public void sellTicket(Jedis jedis) {
try{
boolean getLock = tryLock(jedis,lockKey, Long.valueOf(10));
if(getLock){
// Do your job
if (num > 0) {
System.out.print("=============="+Thread.currentThread().getName()+"=============== 售出票号" + num);
num--;
if(num!=0)
System.out.println(",还剩" + num + "张票--" );
else {
System.out.println(",票已经票完!--");
working = false;
}
}
}
}catch(Exception e){
System.out.println(e);
}finally {
try {
realseLock(jedis, lockKey);
Thread.sleep(600);
}catch (Exception e ) {
e.printStackTrace();
}
}
}
/**
* 获取锁
* @param jedis
* @param lockKey
* @param timeout
* @return
*/
public boolean tryLock(Jedis jedis,String lockKey, Long timeout) {
try {
Long currentTime = System.currentTimeMillis();//开始加锁的时间
boolean result = false;
while (true && working) {
if ((System.currentTimeMillis() - currentTime) / 1000 > timeout) {//当前时间超过了设定的超时时间
System.out.println("---------------- try lock time out.");
break;
} else {
result = innerTryLock(jedis,lockKey);
if (result) {
System.out.println("=============="+Thread.currentThread().getName()+"=============== 获取到锁,开始工做!");
break;
} else {
System.out.println(Thread.currentThread().getName()+" Try to get the Lock,and wait 200 millisecond....");
Thread.sleep(200);
}
}
}
return result;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 释放锁
* @param jedis
* @param lockKey
*/
public void realseLock(Jedis jedis,String lockKey) {
if (!checkIfLockTimeout(jedis,System.currentTimeMillis(), lockKey)) {
jedis.del(lockKey);
System.out.println("=============="+Thread.currentThread().getName()+"=============== 释放锁!");
}
}
/**
* 获取锁具体实现
* @param jedis
* @param lockKey
* @return
*/
private boolean innerTryLock(Jedis jedis,String lockKey) {
long currentTime = System.currentTimeMillis();//当前时间
String lockTimeDuration = String.valueOf(currentTime + Lock_Timeout + 1);//锁的持续时间
Long result = jedis.setnx(lockKey, lockTimeDuration);
if (result == 1) { //返回1 表明第1次设置
return true;
} else {
if (checkIfLockTimeout(jedis,currentTime, lockKey)) {
String preLockTimeDuration = jedis.getSet(lockKey, lockTimeDuration); //此处须要再判断一次
if(preLockTimeDuration == null){ //若是 返回值 为空, 表明获取到锁 不然 锁被其余线程捷足先登
return true;
}else{
if (currentTime > Long.parseLong(preLockTimeDuration)) {
return true;
}
}
}
return false;
}
}
/**
*
* @param jedis
* @param currentTime
* @param lockKey
* @return
*/
private boolean checkIfLockTimeout(Jedis jedis,Long currentTime, String lockKey) {
String value = jedis.get(lockKey);
if (value == null) {
return true;
}else{
if (currentTime > Long.parseLong(value)) {//当前时间超过锁的持续时间
return true;
} else {
return false;
}
}
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"到达,等待中...");
Jedis jedis = new Jedis("localhost", 6379);
try{
barrier.await(); // 此处阻塞 等全部线程都到位后一块儿进行抢票
if(Thread.currentThread().getName().equals("pool-1-thread-1")){
System.out.println("---------------所有线程准备就绪,开始抢票----------------");
}else {
Thread.sleep(5);
}
while (num > 0) {
sellTicket(jedis);
}
count.countDown(); //当前线程结束后,计数器-1
}catch (Exception e){e.printStackTrace();}
}
/**
*
* @param args
*/
public static void main(String[] args) {
int threadNum = 5; //模拟多个窗口 进行售票
final CyclicBarrier barrier = new CyclicBarrier(threadNum);
final CountDownLatch count = new CountDownLatch(threadNum); // 用于统计 执行时长
StopWatch watch = new StopWatch();
watch.start();
TicketRunnable tickets = new TicketRunnable(count,barrier);
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
//ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < threadNum; i++) { //此处 设置数值 受限于 线程池中的数量
executorService.submit(tickets);
}
try {
count.await();
executorService.shutdown();
watch.stop();
System.out.println("耗 时:" + watch.getTotalTimeSeconds() + "秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果以下:
火车票余量 能够在redis中获取, redis
这样就能够模拟多进程 多线程方式 共同访问redis中的变量。
有问题欢迎留言 探讨!spring