分布式锁优化方案

先来段提神醒脑的问题场景描述:java

在业务的某一环节,员工获取某张表的数据进行处理,要求不一样的员工获取不一样(id)的数据。(好比张三获取了id=1的这条数据,其它员工就不能获取该数据,转而获取其它)redis

STOP!!!数据库

读者能够先思考下,若是是你,会怎么处理这个问题。以后,再和笔者的优化方案做比较。相信我,这样更有收获。多线程

 

 

 

 

 

 

 

 

##############             我是给读者思考空间的帅气的分割线         ################################app

 

由于是集群环境,需经过分布式锁(基于redis)进行处理。分布式

 

原代码的逻辑以下:
获取锁setnx(lock_key,overtime),成功后获取list,而后get(0)。finally中释放锁。性能

 

原逻辑中,先获取锁,成功后再拿列表数据,取第1条。比较容易能想到,若是对列表list中的每一个对象单独加锁,多个线程间会有更少的资源竞争,性能也所以提高。
因而构思出方案一:单独对象锁方案优化

步骤1:
链接redis,若是此处抛异常,进行重试操做3次;重试3次依然不成,中断。
链接成功,从redis存储的指定set集合(payment_handler_set)上获取数据。spa

步骤2:
若是步骤1的payment_handler_set为空,或者payment_handler_set不存在,则从数据库获取符合条件的数据。线程

    若是无符合条件数据,return "NoData";
    不然,将这些数据的id组成set,放入redis。
    【注:set元素最大能够包含(2的32次方-1)个元素,目测不会有溢出问题;但考虑到步骤3的乱序操做,这里从数据库获取符合条件的前300条数据】
       
步骤3:
前两部结束后,或得到java代码中的paymentSet,做为数据副本。对paymentSet做乱序操做。

【注:加上“乱序操做”,不一样的线程获取的list中元素的对象次序随机,减小资源竞争】

步骤4:
Payment p = null;
循环paymentSet,依次获取对象锁,setnx(对象id,超时时间)。
A.返回1,获取成功;
    首先检查该对象在数据库中的状态,是否还符合条件;
    【注:这里的检查是有必要的。
    线程t1在步骤2从数据库中获取了java版副本paymentSet1,同时t2获取了paymentSet2。而后paymentSet1中的id1处理完数据,锁已释放;paymentSet2就不该该再处理id1数据了】
        
    若是符合条件,根据id获取payment给p赋值,break。
    不然,表示该对象已处理完,从redis中的payment_handler_set中移除当前对象,释放该对象锁,continue;

B.返回0,获取失败,表示该对象正在被其它线程处理,continue。

循环结束,判断p是否为null:
    若是是,表示paymentSet中无可用对象,return "NoData";
    不然,进行业务处理,finally中释放锁。

 

方案 一 over!!

其实,方案一的核心思路,就像前文说的,对list中的每一个对象加锁。

 

 

那是一个阳光明媚的周六,本觉得搞定方案一后,能够宣告收工,去吃个火锅唱个歌了……可是!!!

笔者在检查方案一,查阅redis相关资料的时候想到:彷佛还有更好的方案。

方案二:进阶的操做锁(推荐)

payment_handler_dataset:存放待处理的数据
payment_handler_operset:存放正在处理的数据

步骤1:
链接redis,若是此处抛异常,进行重试操做3次;重试3次依然不成,中断。
链接成功,从redis存储的指定set集合(payment_handler_dataset)上获取数据。

步骤2:
若是步骤1的payment_handler_dataset为空,或者payment_handler_dataset不存在,则从数据库获取符合条件,而且不在payment_handler_operset中的数据。

    若是无符合条件数据,return "NoData";
    不然,以setnx(update_handler_set,超时时间)方式,获取更新数据操做锁:
        返回1,获取成功:将这些数据的id组成set,放入redis中的payment_handler_dataset(考虑到,可能有线程卡死的状况,数据以watch方式更新)。finally中释放锁。

【注:set元素最大能够包含(2的32次方-1)个元素,目测不会有溢出问题。但数据过多的话,这一步能够限制set的大小,好比:只取前500条数据(具体的限制到多少,根据实际状况调整)】
        返回0,获取失败,表示redis中的该集合数据,正由其它线程更新。能够sleep(1000),return 步骤1。

步骤3:

payment_handler_dataset以SPOP命令(随机移除并返回一个元素)获取元素String randId

       若是randId为nil,表示集合中已经无元素,return 步骤1。

       不然,先将randId放入payment_handler_operset,表示该数据正在被操做。
             而后根据randId获取payment,进行相应的业务处理。
             finally中将randId移出payment_handler_operset,表示该数据操做完成。
       【注:这里能够用多线程来写,设置超时时间,做线程中断】

 

方案有了,只差代码。笔者懒,读者先自行脑补吧(或许我之后会补上具体代码实现(⊙﹏⊙)b)……

#########################################

现把当年承诺的代码补上:

/**
 * getPayment:(获取数据). <br/>
 *
 * @author liuzijian
 * @since JDK 1.8
 */
public Payment getPayment(Long tenantId,Long userId,Integer status,boolean isQingDan){
    final String dataSetKey = getByTenant(wait_oper, tenantId,status, isQingDan);   //待操做
    final String operingSetKey = getByTenant(doing_oper, tenantId,status, isQingDan);   //操做中
    Payment res = null;
    try {
	long id = redisClient.spop(dataSetKey);
	if(id==0){  
	    /** 无数据 **/
	    logger.info("redis中“{}”中已无数据,尝试从数据库中获取",dataSetKey);
	    
	    /** 查询数据库中待处理数据,最多获取500条 **/    //TODO 这里能够优化成单独线程写,其它线程等待
	    List<Long> idList = getMapper().getWaitOperData(tenantId, status, isQingDan);
	    /** 过滤掉在redis已操做集合中的 **/
	    Iterator<Long> itea = idList.iterator();
	    while(itea.hasNext()){
		Long tempId = itea.next();
		if(redisClient.sIsMember(operingSetKey, tempId)){
		    logger.info("过滤掉redis已操做集合中的数据:id={}",tempId);
		    itea.remove();
		}
	    }
	    
	    if(CollectionUtils.isEmpty(idList)){   //数据库中无符合条件的数据
		logger.info("数据库中一样无status={}的数据,返回null",status);
		return res;
	    }else{
		try {
		    final String updateLock = getByTenant(update, tenantId, status, isQingDan);
		    if(distributedLock.tryLock(updateLock, 3000)){
			logger.info("成功获取了分分布式锁lock={},对redis数据set={}进行更新,更新的内容ids={}:",updateLock,dataSetKey,idList);
			redisClient.sAdd(dataSetKey, idList.toArray());
		    }else{
			TimeUnit.MILLISECONDS.sleep(500L);
		    }
		    
		    return getPayment(tenantId,userId,status,isQingDan);  //数据更新后,再次调用本方法,从新获取
		} catch (DistributedLockException e) {
		    logger.error("获取分布式锁error:"+Utils.getFullErrorMessage(e));
		} catch (InterruptedException e) {
		    logger.error("线程沉睡error:"+Utils.getFullErrorMessage(e));
		}
	    }
	    
	}else{
	    /** 有数据 **/
	    res = findOne(id);
	    logger.info("获取了id={}的数据",id);
				
	    redisClient.sAdd(operingSetKey, id);    //记录正在操做的payment
	    
	    /** 业务逻辑部分,记录操做人员等 start **/
	    /** 状态修改,操做人员记录 **/
	    if (PaymentStatus.INPUT_WAIT.getValue().equals(status)){
		res = checkDataId(res);
		res.setEntryBy(userId);
		res.setStatus(PaymentStatus.INPUT_ING.getValue());
	    } else if (PaymentStatus.CHECK_WAIT.getValue().equals(status)) {
		res.setVerifyBy(userId);
		res.setStatus(PaymentStatus.CHECK_ING.getValue());
	    }
	    res.setUserId(userId);
	    updateSelective(res);
	    /** 业务逻辑部分,记录操做人员等 end **/

	    redisClient.sRem(operingSetKey, id);    //数据库记录id后,redis中可清掉id
	    
	}
    } catch (CacheException e) {
	logger.error("redis随机弹出元素时error:"+Utils.getFullErrorMessage(e));
    }
    return res;
}
相关文章
相关标签/搜索