随着业务愈来愈复杂,应用服务都会朝着分布式、集群方向部署,而分布式CAP原则告诉咱们,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。html
不少场景中,须要使用分布式事务、分布式锁等技术来保证数据最终一致性。有的时候,咱们须要保证某一方法同一时刻只能被一个线程执行。
在单机(单进程)环境中,JAVA提供了不少并发相关API,但在多机(多进程)环境中就无能为力了。java
对于分布式锁,最好可以知足如下几点git
能够保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行
这把锁要是一把可重入锁(避免死锁)
这把锁最好是一把阻塞锁
有高可用的获取锁和释放锁功能
获取锁和释放锁的性能要好
针对分布式锁,目前有如下几种实现方案(From: http://www.hollischuang.com/a...)github
基于数据库实现分布式锁
基于缓存实现分布式锁
基于zookeeper实现分布式锁
对于第一种(基于数据库)及第三种(基于zookeeper)的实现方式能够参考博文http://www.hollischuang.com/a...,本篇文章介绍如何基于redis实现分布式锁redis
首先奉上源码 https://github.com/manerfan/m...spring
锁的实现主要基于redis的SETNX
命令(SETNX详细解释参考这里),咱们来看SETNX
的解释数据库
SETNX key value
将 key 的值设为 value ,当且仅当 key 不存在。
若给定的 key 已经存在,则 SETNX 不作任何动做。
SETNX 是『SET if Not eXists』(若是不存在,则 SET)的简写。返回值:
设置成功,返回 1 。
设置失败,返回 0 。api
使用SETNX
完成同步锁的流程及事项以下:缓存
SETNX
命令获取锁,若返回0(key已存在,锁已存在)则获取失败,反之获取成功SETNX
命令老是返回0而进入死锁状态,须要为该key设置一个“合理”的过时时间DEL
命令将锁数据删除/** * 同步锁 * * @property key Redis key * @property stringRedisTemplate RedisTemplate * @property expire Redis TTL/秒 * @property safetyTime 安全时间/秒 */ class SyncLock( private val key: String, private val stringRedisTemplate: StringRedisTemplate, private val expire: Long, private val safetyTime: Long )
key
reids中的key,对应java api synchronized的对象expire
reids中key的过时时间safetyTime
下文介绍其做用安全
private val value: String get() = Thread.currentThread().name /** * 尝试获取锁(当即返回) * * @return 是否获取成功 */ fun tryLock(): Boolean { val locked = stringRedisTemplate.opsForValue().setIfAbsent(key, value) ?: false if (locked) { stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS) } return locked }
这里使用setIfAbsent
函数(对应SETNX
命令)尝试设置key的值为value(当前线程id+线程名),若成功则同时设置key的过时时间并返回true,不然返回false
private val waitMillisPer: Long = 10 /** * 尝试获取锁,并至多等待timeout时长 * * @param timeout 超时时长 * @param unit 时间单位 * * @return 是否获取成功 */ fun tryLock(timeout: Long, unit: TimeUnit): Boolean { val waitMax = unit.toMillis(timeout) var waitAlready: Long = 0 while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) { Thread.sleep(waitMillisPer) waitAlready += waitMillisPer } if (waitAlready < waitMax) { stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS) return true } return false }
这里使用while循环不断尝试锁的获取,并至多尝试timeout时长,在timeout时间内若成功则同时设置key的过时时间并返回true,不然返回false
其实以上两种tryLock
函数仍是有一种可能即是,在调用setIfAbsent
后、调用expire
以前若服务出现异常,也将致使该锁(key)没法释放(过时或删除),使得其余线程/进程再没法获取锁而进入死循环,为了不此问题的产生,咱们引入了safetyTime
该参数的做用为,从获取锁开始直到safetyTime时长,若仍未获取成功则认为某一线程/进程出现异常致使数据不正确,此时强制获取,其实现以下
/** * 获取锁 */ fun lock() { val waitMax = TimeUnit.SECONDS.toMillis(safetyTime) var waitAlready: Long = 0 while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) { Thread.sleep(waitMillisPer) waitAlready += waitMillisPer } // stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS) stringRedisTemplate.opsForValue().set(key, value, expire, TimeUnit.SECONDS) }
这里一样使用while循环不断尝试锁的获取,但至多等待safetyTime时长,最终不管是否成功,均使用SETEX
命令将key设置为当前先线程对应的value,并同时设置该key的过时时间
/** * 释放锁 */ fun unLock() { stringRedisTemplate.opsForValue()[key]?.let { if (it == value) { stringRedisTemplate.delete(key) } } }
锁的释放使用DEL
命令删除key,但须要注意的是,释放锁时只能释放本线程持有的锁
若expire设置不合理,如expire设置为10秒,结果在获取锁后线程运行了20秒,该锁有可能已经被其余线程强制获取,即该key表明的锁已经不是当前线程所持有的锁,此时便不能冒然删除该key,而只能释放本线程持有的锁。
为了更好的与spring集成,咱们建立一个工厂类来辅助建立同步锁实例
/** * SyncLock同步锁工厂类 */ @Component class SyncLockFactory { @Autowired private lateinit var stringRedisTemplate: StringRedisTemplate private val syncLockMap = mutableMapOf<String, SyncLock>() /** * 建立SyncLock * * @param key Redis key * @param expire Redis TTL/秒,默认10秒 * @param safetyTime 安全时间/秒,为了防止程序异常致使死锁,在此时间后强制拿锁,默认 expire * 5 秒 */ @Synchronized fun build(key: String, expire: Long = 10 /* seconds */, safetyTime: Long = expire * 5/* seconds */): SyncLock { if (!syncLockMap.containsKey(key)) { syncLockMap[key] = SyncLock(key, stringRedisTemplate, expire, safetyTime) } return syncLockMap[key]!! } }
在spring框架下能够更方便的使用
@Component class SomeLogic: InitializingBean { @Autowired lateinit var syncLockFactory: SyncLockFactory lateinit var syncLock override fun afterPropertiesSet() { syncLock = syncLockFactory.build("lock:some:name", 10) } fun someFun() { syncLock.lock() try { // some logic } finally { syncLock.unlock() } } }
借助spring aop框架,咱们能够将SyncLock的使用进一步简化
/** * 同步锁注解 * * @property key Redis key * @property expire Redis TTL/秒,默认10秒 */ @Target(AnnotationTarget.FUNCTION) @Retention(AnnotationRetention.RUNTIME) annotation class SyncLockable( val key: String, val expire: Long = 10 )
/** * 同步锁注解处理 */ @Aspect @Component class SyncLockHandle { @Autowired private lateinit var syncLockFactory: SyncLockFactory /** * 在方法上执行同步锁 */ @Around("@annotation(syncLockable)") fun syncLock(jp: ProceedingJoinPoint, syncLockable: SyncLockable): Any? { val lock = syncLockFactory.build(syncLockable.key, syncLockable.expire) try { lock.lock() return jp.proceed() } finally { lock.unLock() } } }
如此一来,咱们即可以按照以下方式使用SyncLock
@Component class SomeLogic { @SyncLockable("lock:some:name", 10) fun someFun() { // some logic } }
是否是显得更加方便!