https://mp.weixin.qq.com/s/GydyNbaKkC301X1qjVGREghtml
咱们在构建分布式系统的时候,常常须要控制对共享资源的互斥访问。这个时候咱们就涉及到分布式锁(也称为全局锁)的实现,基于目前的各类工具,咱们已经有了大量的实现方式,好比:基于Redis的实现、基于Zookeeper的实现。本文将介绍一种基于Consul 的Key/Value存储来实现分布式锁以及信号量的方法。
基于Consul的分布式锁主要利用Key/Value存储API中的acquire和release操做来实现。acquire和release操做是相似Check-And-Set的操做:api
acquire操做只有当锁不存在持有者时才会返回true,而且set设置的Value值,同时执行操做的session会持有对该Key的锁,不然就返回false微信
具体实现中主要使用了这几个Key/Value的API:session
public class Lock { private static final String prefix = "lock/"; // 同步锁参数前缀 private ConsulClient consulClient; private String sessionName; private String sessionId = null; private String lockKey; /** * * @param consulClient * @param sessionName 同步锁的session名称 * @param lockKey 同步锁在consul的KV存储中的Key路径,会自动增长prefix前缀,方便归类查询 */ public Lock(ConsulClient consulClient, String sessionName, String lockKey) { this.consulClient = consulClient; this.sessionName = sessionName; this.lockKey = prefix + lockKey; } /** * 获取同步锁 * * @param block 是否阻塞,直到获取到锁为止 * @return */ public Boolean lock(boolean block) { if (sessionId != null) { throw new RuntimeException(sessionId + " - Already locked!"); } sessionId = createSession(sessionName); while(true) { PutParams putParams = new PutParams(); putParams.setAcquireSession(sessionId); if(consulClient.setKVValue(lockKey, "lock:" + LocalDateTime.now(), putParams).getValue()) { return true; } else if(block) { continue; } else { return false; } } } /** * 释放同步锁 * * @return */ public Boolean unlock() { PutParams putParams = new PutParams(); putParams.setReleaseSession(sessionId); boolean result = consulClient.setKVValue(lockKey, "unlock:" + LocalDateTime.now(), putParams).getValue(); consulClient.sessionDestroy(sessionId, null); return result; } /** * 建立session * @param sessionName * @return */ private String createSession(String sessionName) { NewSession newSession = new NewSession(); newSession.setName(sessionName); return consulClient.sessionCreate(newSession, null).getValue(); } }
下面单元测试的逻辑:经过线程的方式来模拟不一样的分布式服务来竞争锁。多个处理线程同时以阻塞方式来申请分布式锁,当处理线程得到锁以后,Sleep一段随机事件,以模拟处理业务逻辑,处理完毕以后释放锁。并发
public class TestLock { private Logger logger = Logger.getLogger(getClass()); @Test public void testLock() throws Exception { new Thread(new LockRunner(1)).start(); new Thread(new LockRunner(2)).start(); new Thread(new LockRunner(3)).start(); new Thread(new LockRunner(4)).start(); new Thread(new LockRunner(5)).start(); Thread.sleep(200000L); } class LockRunner implements Runnable { private Logger logger = Logger.getLogger(getClass()); private int flag; public LockRunner(int flag) { this.flag = flag; } @Override public void run() { Lock lock = new Lock(new ConsulClient(), "lock-session", "lock-key"); try { if (lock.lock(true)) { logger.info("Thread " + flag + " start!"); Thread.sleep(new Random().nextInt(3000L)); logger.info("Thread " + flag + " end!"); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } }
单元测试执行结果以下:dom
2017-04-12 21:28:09,698 INFO [Thread-0] LockRunner - Thread 1 start! 2017-04-12 21:28:12,717 INFO [Thread-0] LockRunner - Thread 1 end! 2017-04-12 21:28:13,219 INFO [Thread-2] LockRunner - Thread 3 start! 2017-04-12 21:28:15,672 INFO [Thread-2] LockRunner - Thread 3 end! 2017-04-12 21:28:15,735 INFO [Thread-1] LockRunner - Thread 2 start! 2017-04-12 21:28:17,788 INFO [Thread-1] LockRunner - Thread 2 end! 2017-04-12 21:28:18,249 INFO [Thread-4] LockRunner - Thread 5 start! 2017-04-12 21:28:19,573 INFO [Thread-4] LockRunner - Thread 5 end! 2017-04-12 21:28:19,757 INFO [Thread-3] LockRunner - Thread 4 start! 2017-04-12 21:28:21,353 INFO [Thread-3] LockRunner - Thread 4 end!
从测试结果咱们能够看到,经过分布式锁的形式来控制并发时,多个同步操做只会有一个操做可以被执行,其余操做只有在等锁释放以后才有机会去执行,因此经过这样的分布式锁,咱们能够控制共享资源同时只能被一个操做进行执行,以保障数据处理时的分布式并发问题。分布式
本文咱们实现了基于Consul的简单分布式锁,可是在实际运行时,可能会由于各类各样的意外状况致使unlock操做没有获得正确地执行,从而使得分布式锁没法释放。因此为了更完善的使用分布式锁,咱们还必须实现对锁的超时清理等控制,保证即便出现了未正常解锁的状况下也能自动修复,以提高系统的健壮性。那么如何实现呢?请持续关注个人后续分解!ide
Key/Value的API:https://www.consul.io/api/kv.html工具
版权声明单元测试
本文采用 CC BY 3.0 CN协议 进行许可。 可自由转载、引用,但需署名做者且注明文章出处。如转载至微信公众号,请在文末添加做者公众号二维码。
长按指纹
一键关注