在分布式系统的不少场景中,咱们为了保证数据的最终一致性,须要不少的技术方案来支持,好比分布式事务、分布式锁等。redis
有的时候,咱们须要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,Java中其实提供了不少并发处理相关的API,可是这些API在分布式场景中就无能为力了。也就是说单纯的Java Api并不能提供分布式锁的能力。数据库
目前针对分布式锁的实现目前有多种方案:api
在分析这几种实现方案以前咱们先来想一下,咱们须要的分布式锁应该是怎么样的?(这里以方法锁为例,资源锁同理)缓存
能够保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。bash
要实现分布式锁,最简单的方式可能就是直接建立一张锁表,而后经过操做该表中的数据来实现了。服务器
当咱们要锁住某个方法或资源时,咱们就在该表中增长一条记录,想要释放锁的时候就删除这条记录。并发
建立这样一张数据库表:分布式
CREATE TABLE `methodLock` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
`desc` varchar(1024) NOT NULL DEFAULT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
复制代码
当咱们想要锁住某个方法时,执行如下SQL:memcached
insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
复制代码
由于咱们对method_name作了惟一性约束,这里若是有多个请求同时提交到数据库的话,数据库会保证只有一个操做能够成功,那么咱们就能够认为操做成功的那个线程得到了该方法的锁,能够执行方法体内容。工具
当方法执行完毕以后,想要释放锁的话,须要执行如下Sql:
delete from methodLock where method_name ='method_name'
复制代码
上面这种简单的实现有如下几个问题:
这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会致使业务系统不可用。
这把锁没有失效时间,一旦解锁操做失败,就会致使锁记录一直在数据库中,其余线程没法再得到到锁。
这把锁只能是非阻塞的,由于数据的insert操做,一旦插入失败就会直接报错。没有得到锁的线程并不会进入排队队列,要想再次得到锁就要再次触发得到锁操做。
这把锁是非重入的,同一个线程在没有释放锁以前没法再次得到该锁。由于数据中数据已经存在了。
固然,咱们也能够有其余方式解决上面的问题。
针对 数据库是单点问题搞两个数据库,数据以前双向同步。一旦挂掉快速切换到备库上。
针对 没有失效时间?只要作一个定时任务,每隔必定时间把数据库中的超时数据清理一遍。
针对 非阻塞的?搞一个while循环,直到insert成功再返回成功。
针对 非重入的?在数据库表中加个字段,记录当前得到锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,若是当前机器的主机信息和线程信息在数据库能够查到的话,直接把锁分配给他就能够了。
除了能够经过增删操做数据表中的记录之外,其实还能够借助数据中自带的锁来实现分布式的锁。
咱们还用刚刚建立的那张数据库表。能够经过数据库的排他锁来实现分布式锁。 基于MySql的InnoDB引擎,可使用如下方法来实现加锁操做:
public boolean lock(){
connection.setAutoCommit(false)
while(true){
try{
result = select * from methodLock where method_name=xxx
for update;
if(result==null){
return true;
}
}catch(Exception e){
}
sleep(1000);
}
return false;
}
复制代码
在查询语句后面增长for update,数据库会在查询过程当中给数据库表增长排他锁。当某条记录被加上排他锁以后,其余线程没法再在该行记录上增长排他锁。
咱们能够认为得到排它锁的线程便可得到分布式锁,当获取到锁以后,能够执行方法的业务逻辑,执行完方法以后,再经过如下方法解锁:
public void unlock(){
connection.commit();
}
复制代码
经过connection.commit()操做来释放锁。
这种方法能够有效的解决上面提到的没法释放锁和阻塞锁的问题。
阻塞锁? for update语句会在执行成功后当即返回,在执行失败时一直处于阻塞状态,直到成功。
锁定以后 服务宕机,没法释放?使用这种方式,服务宕机以后数据库会本身把锁释放掉。 可是仍是没法直接解决数据库单点和可重入问题。
总结一下使用数据库来实现分布式锁的方式,这两种方式都是依赖数据库的一张表,一种是经过表中的记录的存在状况肯定当前是否有锁存在,另一种是经过数据库的排他锁来实现分布式锁。
数据库实现分布式锁的 优势: 直接借助数据库,容易理解。
数据库实现分布式锁的 缺点: 会有各类各样的问题,在解决问题的过程当中会使整个方案变得愈来愈复杂。
操做数据库须要必定的开销,性能问题须要考虑。
相比较于基于数据库实现分布式锁的方案来讲,基于缓存来实如今性能方面会表现的更好一点。并且不少缓存是能够集群部署的,能够解决单点问题。
目前有不少成熟的缓存产品,包括Redis,memcached等。
在实现的时候要注意的几个关键点:
锁信息必须是会过时超时的,不能让一个线程长期占有一个锁而致使死锁;
同一时刻只能有一个线程获取到锁。
几个要用到的redis命令:
setnx(key, value):“set if not exits”,若该key-value不存在,则成功加入缓存而且返回1,不然返回0。
get(key):得到key对应的value值,若不存在则返回nil。
getset(key, value):先获取key对应的value值,若不存在则返回nil,而后将旧的value更新为新的value。
expire(key, seconds):设置key-value的有效期为seconds秒。
看一下流程图:
在这个流程下,不会致使死锁。
我采用Jedis做为Redis客户端的api,下面来看一下具体实现的代码。
public class RedisPool {
private static JedisPool pool;//jedis链接池
private static int maxTotal = 20;//最大链接数
private static int maxIdle = 10;//最大空闲链接数
private static int minIdle = 5;//最小空闲链接数
private static boolean testOnBorrow = true;//在取链接时测试链接的可用性
private static boolean testOnReturn = false;//再还链接时不测试链接的可用性
static {
initPool();//初始化链接池
}
public static Jedis getJedis(){
return pool.getResource();
}
public static void close(Jedis jedis){
jedis.close();
}
private static void initPool(){
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(maxTotal);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setTestOnBorrow(testOnBorrow);
config.setTestOnReturn(testOnReturn);
config.setBlockWhenExhausted(true);
pool = new JedisPool(config, "127.0.0.1", 6379, 5000, "liqiyao");
}
}
复制代码
public class RedisPoolUtil {
private RedisPoolUtil(){}
private static RedisPool redisPool;
public static String get(String key){
Jedis jedis = null;
String result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.get(key);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static Long setnx(String key, String value){
Jedis jedis = null;
Long result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.setnx(key, value);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static String getSet(String key, String value){
Jedis jedis = null;
String result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.getSet(key, value);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static Long expire(String key, int seconds){
Jedis jedis = null;
Long result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.expire(key, seconds);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
public static Long del(String key){
Jedis jedis = null;
Long result = null;
try {
jedis = RedisPool.getJedis();
result = jedis.del(key);
} catch (Exception e){
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
return result;
}
}
}
复制代码
public class DistributedLockUtil {
private DistributedLockUtil(){
}
public static boolean lock(String lockName){//lockName能够为共享变量
名,也能够为方法名,主要是用于模拟锁信息
System.out.println(Thread.currentThread() + "开始尝试加锁!");
Long result = RedisPoolUtil.setnx
(lockName, String.valueOf(System.currentTimeMillis() + 5000));
if (result != null && result.intValue() == 1){
System.out.println(Thread.currentThread() + "加锁成功!");
RedisPoolUtil.expire(lockName, 5);
System.out.println(Thread.currentThread() + "执行业务逻辑!");
RedisPoolUtil.del(lockName);
return true;
} else {
String lockValueA = RedisPoolUtil.get(lockName);
if (lockValueA != null && Long.parseLong(lockValueA) >=
System.currentTimeMillis()){
String lockValueB = RedisPoolUtil.getSet(lockName,
String.valueOf(System.currentTimeMillis() + 5000));
if (lockValueB == null || lockValueB.equals(lockValueA)){
System.out.println(Thread.currentThread() + "加锁成功!");
RedisPoolUtil.expire(lockName, 5);
System.out.println(Thread.currentThread() + "执行业务逻辑!");
RedisPoolUtil.del(lockName);
return true;
} else {
return false;
}
} else {
return false;
}
}
}
}
复制代码
基于zookeeper临时有序节点能够实现的分布式锁。大体思想即为:每一个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个惟一的
瞬时有序节点。 判断是否获取锁的方式很简单,只须要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除便可。同时,其能够避免服务宕机致使的锁没法释放,而产生的死锁问题。
锁没法释放? 使用Zookeeper能够有效的解决锁没法释放的问题,由于在建立锁的时候,客户端会在ZK中建立一个临时节点,一旦客户端获取到锁以后忽然挂掉(Session链接断开),那么这个临时节点就会自动删除掉。其余客户端就能够再次得到锁。
非阻塞锁? 使用Zookeeper能够实现阻塞的锁,客户端能够经过在ZK中建立顺序节点,而且在节点上绑定监听器,一旦节点有变化,Zookeeper会通知客户端,客户端能够检查本身建立的节点是否是当前全部节点中序号最小的,若是是,那么本身就获取到锁,即可以执行业务逻辑了。
不可重入? 使用Zookeeper也能够有效的解决不可重入的问题,客户端在建立节点的时候,把当前客户端的主机信息和线程信息直接写入到节点中,下次想要获取锁的时候和当前最小的节点中的数据比对一下就能够了。若是和本身的信息同样,那么本身直接获取到锁,若是不同就再建立一个临时的顺序节点,参与排队。
单点问题? 使用Zookeeper能够有效的解决单点问题,ZK是集群部署的,只要集群中有半数以上的机器存活,就能够对外提供服务。 能够直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。
public boolean tryLock(long timeout, TimeUnit unit) throws
InterruptedException {
try {
return interProcessMutex.acquire(timeout, unit);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
public boolean unlock() {
try {
interProcessMutex.release();
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
executorService.schedule(new Cleaner(client, path),
delayTimeForClean, TimeUnit.MILLISECONDS);
}
return true;
}
复制代码
Curator提供的InterProcessMutex是分布式锁的实现。acquire方法用户获取锁,release方法用于释放锁。
使用ZK实现的分布式锁好像彻底符合了本文开头咱们对一个分布式锁的全部指望。可是,其实并非,Zookeeper实现的分布式锁其实存在一个缺点,那就是性能上可能并无缓存服务 那么高。由于每次在建立锁和释放锁的过程当中,都要动态建立、销毁瞬时节点来实现锁功能。ZK中建立和删除节点只能经过Leader服务器来执行,而后将数据同不到全部的Follower机器上。
使用Zookeeper实现分布式锁的优势: 有效的解决单点问题,不可重入问题,非阻塞问题以及锁没法释放的问题。实现起来较为简单。
使用Zookeeper实现分布式锁的缺点 : 性能上不如使用缓存实现分布式锁。 须要对ZK的原理有所了解。
从理解的难易程度角度(从低到高): 数据库 > 缓存 > Zookeeper
从实现的复杂性角度(从低到高): Zookeeper >= 缓存 > 数据库
从性能角度(从高到低): 缓存 > Zookeeper >= 数据库
从可靠性角度(从高到低): Zookeeper > 缓存 > 数据库
所以我我的更加倾向于使用缓存来实现,后续的文章中会基于Redis封装一个咱们本身的分布式锁实现。