先来段提神醒脑的问题场景描述:java
在业务的某一环节,员工获取某张表的数据进行处理,要求不一样的员工获取不一样(id)的数据。(好比张三获取了id=1的这条数据,其它员工就不能获取该数据,转而获取其它)redis
STOP!!!数据库
读者能够先思考下,若是是你,会怎么处理这个问题。以后,再和笔者的优化方案做比较。相信我,这样更有收获。多线程
############## 我是给读者思考空间的帅气的分割线 ################################app
由于是集群环境,需经过分布式锁(基于redis)进行处理。分布式
原代码的逻辑以下:
获取锁setnx(lock_key,overtime),成功后获取list,而后get(0)。finally中释放锁。性能
原逻辑中,先获取锁,成功后再拿列表数据,取第1条。比较容易能想到,若是对列表list中的每一个对象单独加锁,多个线程间会有更少的资源竞争,性能也所以提高。
因而构思出方案一:单独对象锁方案优化
步骤1:
链接redis,若是此处抛异常,进行重试操做3次;重试3次依然不成,中断。
链接成功,从redis存储的指定set集合(payment_handler_set)上获取数据。spa
步骤2:
若是步骤1的payment_handler_set为空,或者payment_handler_set不存在,则从数据库获取符合条件的数据。线程
若是无符合条件数据,return "NoData";
不然,将这些数据的id组成set,放入redis。
【注:set元素最大能够包含(2的32次方-1)个元素,目测不会有溢出问题;但考虑到步骤3的乱序操做,这里从数据库获取符合条件的前300条数据】
步骤3:
前两部结束后,或得到java代码中的paymentSet,做为数据副本。对paymentSet做乱序操做。
【注:加上“乱序操做”,不一样的线程获取的list中元素的对象次序随机,减小资源竞争】
步骤4:
Payment p = null;
循环paymentSet,依次获取对象锁,setnx(对象id,超时时间)。
A.返回1,获取成功;
首先检查该对象在数据库中的状态,是否还符合条件;
【注:这里的检查是有必要的。
线程t1在步骤2从数据库中获取了java版副本paymentSet1,同时t2获取了paymentSet2。而后paymentSet1中的id1处理完数据,锁已释放;paymentSet2就不该该再处理id1数据了】
若是符合条件,根据id获取payment给p赋值,break。
不然,表示该对象已处理完,从redis中的payment_handler_set中移除当前对象,释放该对象锁,continue;
B.返回0,获取失败,表示该对象正在被其它线程处理,continue。
循环结束,判断p是否为null:
若是是,表示paymentSet中无可用对象,return "NoData";
不然,进行业务处理,finally中释放锁。
方案 一 over!!
其实,方案一的核心思路,就像前文说的,对list中的每一个对象加锁。
那是一个阳光明媚的周六,本觉得搞定方案一后,能够宣告收工,去吃个火锅唱个歌了……可是!!!
笔者在检查方案一,查阅redis相关资料的时候想到:彷佛还有更好的方案。
方案二:进阶的操做锁(推荐)
payment_handler_dataset:存放待处理的数据
payment_handler_operset:存放正在处理的数据
步骤1:
链接redis,若是此处抛异常,进行重试操做3次;重试3次依然不成,中断。
链接成功,从redis存储的指定set集合(payment_handler_dataset)上获取数据。
步骤2:
若是步骤1的payment_handler_dataset为空,或者payment_handler_dataset不存在,则从数据库获取符合条件,而且不在payment_handler_operset中的数据。
若是无符合条件数据,return "NoData";
不然,以setnx(update_handler_set,超时时间)方式,获取更新数据操做锁:
返回1,获取成功:将这些数据的id组成set,放入redis中的payment_handler_dataset(考虑到,可能有线程卡死的状况,数据以watch方式更新)。finally中释放锁。
【注:set元素最大能够包含(2的32次方-1)个元素,目测不会有溢出问题。但数据过多的话,这一步能够限制set的大小,好比:只取前500条数据(具体的限制到多少,根据实际状况调整)】
返回0,获取失败,表示redis中的该集合数据,正由其它线程更新。能够sleep(1000),return 步骤1。
步骤3:
payment_handler_dataset以SPOP命令(随机移除并返回一个元素)获取元素String randId
若是randId为nil,表示集合中已经无元素,return 步骤1。
不然,先将randId放入payment_handler_operset,表示该数据正在被操做。
而后根据randId获取payment,进行相应的业务处理。
finally中将randId移出payment_handler_operset,表示该数据操做完成。
【注:这里能够用多线程来写,设置超时时间,做线程中断】
方案有了,只差代码。笔者懒,读者先自行脑补吧(或许我之后会补上具体代码实现(⊙﹏⊙)b)……
#########################################
现把当年承诺的代码补上:
/** * getPayment:(获取数据). <br/> * * @author liuzijian * @since JDK 1.8 */ public Payment getPayment(Long tenantId,Long userId,Integer status,boolean isQingDan){ final String dataSetKey = getByTenant(wait_oper, tenantId,status, isQingDan); //待操做 final String operingSetKey = getByTenant(doing_oper, tenantId,status, isQingDan); //操做中 Payment res = null; try { long id = redisClient.spop(dataSetKey); if(id==0){ /** 无数据 **/ logger.info("redis中“{}”中已无数据,尝试从数据库中获取",dataSetKey); /** 查询数据库中待处理数据,最多获取500条 **/ //TODO 这里能够优化成单独线程写,其它线程等待 List<Long> idList = getMapper().getWaitOperData(tenantId, status, isQingDan); /** 过滤掉在redis已操做集合中的 **/ Iterator<Long> itea = idList.iterator(); while(itea.hasNext()){ Long tempId = itea.next(); if(redisClient.sIsMember(operingSetKey, tempId)){ logger.info("过滤掉redis已操做集合中的数据:id={}",tempId); itea.remove(); } } if(CollectionUtils.isEmpty(idList)){ //数据库中无符合条件的数据 logger.info("数据库中一样无status={}的数据,返回null",status); return res; }else{ try { final String updateLock = getByTenant(update, tenantId, status, isQingDan); if(distributedLock.tryLock(updateLock, 3000)){ logger.info("成功获取了分分布式锁lock={},对redis数据set={}进行更新,更新的内容ids={}:",updateLock,dataSetKey,idList); redisClient.sAdd(dataSetKey, idList.toArray()); }else{ TimeUnit.MILLISECONDS.sleep(500L); } return getPayment(tenantId,userId,status,isQingDan); //数据更新后,再次调用本方法,从新获取 } catch (DistributedLockException e) { logger.error("获取分布式锁error:"+Utils.getFullErrorMessage(e)); } catch (InterruptedException e) { logger.error("线程沉睡error:"+Utils.getFullErrorMessage(e)); } } }else{ /** 有数据 **/ res = findOne(id); logger.info("获取了id={}的数据",id); redisClient.sAdd(operingSetKey, id); //记录正在操做的payment /** 业务逻辑部分,记录操做人员等 start **/ /** 状态修改,操做人员记录 **/ if (PaymentStatus.INPUT_WAIT.getValue().equals(status)){ res = checkDataId(res); res.setEntryBy(userId); res.setStatus(PaymentStatus.INPUT_ING.getValue()); } else if (PaymentStatus.CHECK_WAIT.getValue().equals(status)) { res.setVerifyBy(userId); res.setStatus(PaymentStatus.CHECK_ING.getValue()); } res.setUserId(userId); updateSelective(res); /** 业务逻辑部分,记录操做人员等 end **/ redisClient.sRem(operingSetKey, id); //数据库记录id后,redis中可清掉id } } catch (CacheException e) { logger.error("redis随机弹出元素时error:"+Utils.getFullErrorMessage(e)); } return res; }