单台机器所能承载的量是有限的,用户的量级上万,基本上服务都会作分布式集群部署。不少时候,会遇到对同一资源的方法。这时候就须要锁,若是是单机版的,能够利用java等语言自带的并发同步处理。若是是多台机器部署就得要有个中间代理人来作分布式锁了。html
经常使用的分布式锁的实现有三种方式。java
目前,我已是用了redis和mysql实现了锁,而且根据应用场景应用在不一样的线上环境中。zk实现比较复杂,又无应用场景,有兴趣的能够参考他山之石中的《Zookeeper实现分布式锁》。mysql
说说心得和体会。redis
没有什么完美的技术、没有万能钥匙、不一样方式不一样应用场景 CAP原理:一致性(consistency)、可用性(availability)、分区可容忍性(partition-tolerance)三者取其二。算法
基于redis的锁实现比较简单,因为redis的执行是单线程执行,自然的具有原子性操做,咱们能够利用命令setnx和expire来实现,java版代码参考以下:sql
package com.fenqile.creditcard.appgatewaysale.provider.util;
import com.fenqile.redis.JedisProxy;
import java.util.Date;
/** * User: Rudy Tan * Date: 2017/11/20 * * redis 相关操做 */
public class RedisUtil {
/** * 获取分布式锁 * * @param key string 缓存key * @param expireTime int 过时时间,单位秒 * @return boolean true-抢到锁,false-没有抢到锁 */
public static boolean getDistributedLockSetTime(String key, Integer expireTime) {
try {
// 移除已经失效的锁
String temp = JedisProxy.getMasterInstance().get(key);
Long currentTime = (new Date()).getTime();
if (null != temp && Long.valueOf(temp) < currentTime) {
JedisProxy.getMasterInstance().del(key);
}
// 锁竞争
Long nextTime = currentTime + Long.valueOf(expireTime) * 1000;
Long result = JedisProxy.getMasterInstance().setnx(key, String.valueOf(nextTime));
if (result == 1) {
JedisProxy.getMasterInstance().expire(key, expireTime);
return true;
}
} catch (Exception ignored) {
}
return false;
}
}
复制代码
包名和获取redis操做对象换成本身的就行了。数据库
基本步骤是缓存
步骤2为最核心的东西, 为啥设置步骤3?可能应为获取到锁的线程出现什么移除请求,而没法释放锁,所以设置一个最长锁时间,避免死锁。 为啥设置步骤1?redis可能在设置expire的时候挂掉。设置过时时间不成功,而出现锁永久生效。并发
线上环境,步骤一、3的问题都出现过。因此要作保底拦截。app
一般redis都是以master-slave解决单点问题,多个master-slave组成大集群,而后经过一致性哈希算法将不一样的key路由到不一样master-slave节点上。
优势:redis自己是内存操做、而且一般是多片部署,所以有这较高的并发控制,能够抗住大量的请求。 缺点:redis自己是缓存,有必定几率出现数据不一致请求。
在线上,以前,利用redis作库存计数器,奖品发放理论上只发放10个的,最后发放了14个。出现了数据的一致性问题。
所以在这以后,引入了mysql数据库分布式锁。
在此以前,在网上搜索了大量的文章,基本上都是 插入、删除发的方式或是直接经过"select for update"这种形式获取锁、计数器。具体能够参考他山之石中的《分布式锁的几种实现方式~》关于数据库锁章节。
一开始,个人实现方式伪代码以下:
public boolean getLock(String key){
select for update if (记录存在){
update
}else {
insert
}
}
复制代码
这样实现出现了很严重的死锁问题,具体缘由能够能够参考他山之石中的《select for update引起死锁分析》 这个版本中存在以下几个比较严重的问题:
1.一般线上数据是不容许作物理删除的 2.经过惟一键重复报错,处理错误形式是不太合理的。 3.若是appclient在处理中还没释放锁以前就挂掉了,会出现锁一直存在,出现死锁。 4.若是以这种方式,实现redis中的计数器(incr decr),当记录不存在的时候,会出现大量死锁的状况。
所以考虑引入,记录状态字段、中央锁概念。
在第二版中完善了数据库表设计,参考以下:
-- 锁表,单库单表
CREATE TABLE IF NOT EXISTS credit_card_user_tag_db.t_tag_lock (
-- 记录index
Findex INT NOT NULL AUTO_INCREMENT COMMENT '自增索引id',
-- 锁信息(key、计数器、过时时间、记录描述)
Flock_name VARCHAR(128) DEFAULT '' NOT NULL COMMENT '锁名key值',
Fcount INT NOT NULL DEFAULT 0 COMMENT '计数器',
Fdeadline DATETIME NOT NULL DEFAULT '1970-01-01 00:00:00' COMMENT '锁过时时间',
Fdesc VARCHAR(255) DEFAULT '' NOT NULL COMMENT '值/描述',
-- 记录状态及相关事件
Fcreate_time DATETIME NOT NULL DEFAULT '1970-01-01 00:00:00' COMMENT '建立时间',
Fmodify_time DATETIME NOT NULL DEFAULT '1970-01-01 00:00:00' COMMENT '修改时间',
Fstatus TINYINT NOT NULL DEFAULT 1 COMMENT '记录状态,0:无效,1:有效',
-- 主键(PS:总索引数不能超过5)
PRIMARY KEY (Findex),
-- 惟一约束
UNIQUE KEY uniq_Flock_name(Flock_name),
-- 普通索引
KEY idx_Fmodify_time(Fmodify_time)
)ENGINE=INNODB DEFAULT CHARSET=UTF8 COMMENT '信用卡|锁与计数器表|rudytan|20180412';
复制代码
在这个版本中,考虑到再条锁并发插入存在死锁(间隙锁争抢)状况,引入中央锁概念。
基本方式是:
考虑到不一样公司引入的数据库操做包不一样,所以提供伪代码,以便于理解 伪代码
// 开启事务
@Transactional
public boolean getLock(String key){
// 获取中央锁
select * from tbl where Flock_name="center_lock"
// 查询key相关记录
select for update if (记录存在){
update
}else {
insert
}
}
复制代码
/** * 初始化记录,若是有记录update,若是没有记录insert */
private LockRecord initLockRecord(String key){
// 查询记录是否存在
LockRecord lockRecord = lockMapper.queryRecord(key);
if (null == lockRecord) {
// 记录不存在,建立
lockRecord = new LockRecord();
lockRecord.setLockName(key);
lockRecord.setCount(0);
lockRecord.setDesc("");
lockRecord.setDeadline(new Date(0));
lockRecord.setStatus(1);
lockMapper.insertRecord(lockRecord);
}
return lockRecord;
}
/** * 获取锁,代码片断 */
@Override
@Transactional
public GetLockResponse getLock(GetLockRequest request) {
// 检测参数
if(StringUtils.isEmpty(request.lockName)) {
ResultUtil.throwBusinessException(CreditCardErrorCode.PARAM_INVALID);
}
// 兼容参数初始化
request.expireTime = null==request.expireTime? 31536000: request.expireTime;
request.desc = Strings.isNullOrEmpty(request.desc)?"":request.desc;
Long nowTime = new Date().getTime();
GetLockResponse response = new GetLockResponse();
response.lock = 0;
// 获取中央锁,初始化记录
lockMapper.queryRecordForUpdate("center_lock");
LockRecord lockRecord = initLockRecord(request.lockName);
// 未释放锁或未过时,获取失败
if (lockRecord.getStatus() == 1
&& lockRecord.getDeadline().getTime() > nowTime){
return response;
}
// 获取锁
Date deadline = new Date(nowTime + request.expireTime*1000);
int num = lockMapper.updateRecord(request.lockName, deadline, 0, request.desc, 1);
response.lock = 1;
return response;
}
复制代码
到此,该方案,可以知足个人分布式锁的需求。
可是该方案,有一个比较致命的问题,就是全部记录共享一个锁,并发并不高。
通过测试,开启50*100个线程并发修改,5次耗时平均为8秒。
因为方案二,存在共享同一把中央锁,并发不高的请求。参考concurrentHashMap实现原理,引入分段锁概念,下降锁粒度。
基本方式是:
伪代码以下:
// 开启事务
@Transactional
public boolean getLock(String key){
// 获取中央锁
select * from tbl where Flock_name="center_lock"
// 查询key相关记录
select for update if (记录存在){
update
}else {
insert
}
}
复制代码
/** * 获取中央锁Key */
private boolean getCenterLock(String key){
String prefix = "center_lock_";
Long hash = SecurityUtil.crc32(key);
if (null == hash){
return false;
}
//取crc32中的最后两位值
Integer len = hash.toString().length();
String slot = hash.toString().substring(len-2);
String centerLockKey = prefix + slot;
lockMapper.queryRecordForUpdate(centerLockKey);
return true;
}
/** * 获取锁 */
@Override
@Transactional
public GetLockResponse getLock(GetLockRequest request) {
// 检测参数
if(StringUtils.isEmpty(request.lockName)) {
ResultUtil.throwBusinessException(CreditCardErrorCode.PARAM_INVALID);
}
// 兼容参数初始化
request.expireTime = null==request.expireTime? 31536000: request.expireTime;
request.desc = Strings.isNullOrEmpty(request.desc)?"":request.desc;
Long nowTime = new Date().getTime();
GetLockResponse response = new GetLockResponse();
response.lock = 0;
// 获取中央锁,初始化记录
getCenterLock(request.lockName);
LockRecord lockRecord = initLockRecord(request.lockName);
// 未释放锁或未过时,获取失败
if (lockRecord.getStatus() == 1
&& lockRecord.getDeadline().getTime() > nowTime){
return response;
}
// 获取锁
Date deadline = new Date(nowTime + request.expireTime*1000);
int num = lockMapper.updateRecord(request.lockName, deadline, 0, request.desc, 1);
response.lock = 1;
return response;
}
复制代码
通过测试,开启50*100个线程并发修改,5次耗时平均为5秒。相较于版本二几乎有一倍的提高。
至此,完成redis/mysql分布式锁、计数器的实现与应用。
根据不一样应用场景,作出以下选择:
表数据和记录:
欢迎关注个人简书博客,一块儿成长,一块儿进步。