电子商务平台源码请加企鹅求求:三伍三六贰四柒二伍九。基于Consul的分布式锁主要利用Key/Value存储API中的acquire和release操做来实现。acquire和release操做是相似Check-And-Set的操做:bash
acquire操做只有当锁不存在持有者时才会返回true,而且set设置的Value值,同时执行操做的session会持有对该Key的锁,不然就返回falsesession
release操做则是使用指定的session来释放某个Key的锁,若是指定的session无效,那么会返回false,不然就会set设置Value值,并返回true并发
具体实现dom
public class Lock {
private static final String prefix = "lock/"; // 同步锁参数前缀
private ConsulClient consulClient;
private String sessionName;
private String sessionId = null;
private String lockKey;
/**
*
* @param consulClient
* @param sessionName 同步锁的session名称
* @param lockKey 同步锁在consul的KV存储中的Key路径,会自动增长prefix前缀,方便归类查询
*/
public Lock(ConsulClient consulClient, String sessionName, String lockKey) {
this.consulClient = consulClient;
this.sessionName = sessionName;
this.lockKey = prefix + lockKey;
}
/**
* 获取同步锁
*
* @param block 是否阻塞,直到获取到锁为止
* @return
*/
public Boolean lock(boolean block) {
if (sessionId != null) {
throw new RuntimeException(sessionId + " - Already locked!");
}
sessionId = createSession(sessionName);
while(true) {
PutParams putParams = new PutParams();
putParams.setAcquireSession(sessionId);
if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) {
return true;
} else if(block) {
continue;
} else {
return false;
}
}
}
/**
* 释放同步锁
*
* @return
*/
public Boolean unlock() {
PutParams putParams = new PutParams();
putParams.setReleaseSession(sessionId);
boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue();
consulClient.sessionDestroy(sessionId, null);
return result;
}
/**
* 建立session
* @param sessionName
* @return
*/
private String createSession(String sessionName) {
NewSession newSession = new NewSession();
newSession.setName(sessionName);
return consulClient.sessionCreate(newSession, null).getValue();
}
}
复制代码
单元测试分布式
下面单元测试的逻辑:经过线程的方式来模拟不一样的分布式服务来竞争锁。多个处理线程同时以阻塞方式来申请分布式锁,当处理线程得到锁以后,Sleep一段随机事件,以模拟处理业务逻辑,处理完毕以后释放锁。ide
public class TestLock {
private Logger logger = Logger.getLogger(getClass());
@Test
public void testLock() throws Exception {
new Thread(new LockRunner(1)).start();
new Thread(new LockRunner(2)).start();
new Thread(new LockRunner(3)).start();
new Thread(new LockRunner(4)).start();
new Thread(new LockRunner(5)).start();
Thread.sleep(200000L);
}
class LockRunner implements Runnable {
private Logger logger = Logger.getLogger(getClass());
private int flag;
public LockRunner(int flag) {
this.flag = flag;
}
@Override
public void run() {
Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key");
try {
if (lock.lock(true)) {
logger.info("Thread " + flag + " start!");
Thread.sleep(new Random().nextInt(3000L));
logger.info("Thread " + flag + " end!");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
复制代码
单元测试执行结果以下:单元测试
2017-04-12 21:28:09,698 INFO [Thread-0] LockRunner - Thread 1 start!
2017-04-12 21:28:12,717 INFO [Thread-0] LockRunner - Thread 1 end!
2017-04-12 21:28:13,219 INFO [Thread-2] LockRunner - Thread 3 start!
2017-04-12 21:28:15,672 INFO [Thread-2] LockRunner - Thread 3 end!
2017-04-12 21:28:15,735 INFO [Thread-1] LockRunner - Thread 2 start!
2017-04-12 21:28:17,788 INFO [Thread-1] LockRunner - Thread 2 end!
2017-04-12 21:28:18,249 INFO [Thread-4] LockRunner - Thread 5 start!
2017-04-12 21:28:19,573 INFO [Thread-4] LockRunner - Thread 5 end!
2017-04-12 21:28:19,757 INFO [Thread-3] LockRunner - Thread 4 start!
2017-04-12 21:28:21,353 INFO [Thread-3] LockRunner - Thread 4 end!
复制代码
从测试结果咱们能够看到,经过分布式锁的形式来控制并发时,多个同步操做只会有一个操做可以被执行,其余操做只有在等锁释放以后才有机会去执行,因此经过这样的分布式锁,咱们能够控制共享资源同时只能被一个操做进行执行,以保障数据处理时的分布式并发问题。测试