高并发(一)

背景:当前系统A需调用第三方服务接口接口,因查询并发量太大,致使第三方接口不稳定,java

为了减小查询请求量, 建议优化如下两点:node

  • 一样参数的请求十分钟内只请求一次
  • 一样参数返回的结果保鲜十分钟

A系统线上服务是集群的,有16台服务器,因此为了知足以上两点需求,则可转化为如下两点:redis

  • 相同入参的请求十分钟内16台服务器请求第三方服务共一次
  • 相同入参请求返回的结果十分钟内需在16台服务器中共享;

解决方案:缓存

public static ConcurrentMap<String, CacheResult> cacheMap = new ConcurrentHashMap<>()

if (cacheMap.containsKey(cacheKey)) {
    return getResultByRam(cacheKey, query);
} else {
    // 建立CacheResult,放入本地缓存cacheMap
    CacheResult cacheResult = new CacheResult(cacheKey);
    boolean lockFlag = getZkLock(cacheKey, cacheResult, query);
    cacheMap.put(cacheKey, cacheResult);

    String minChildPath = zkUtil.getMinChildPath(cacheKey);
    cacheResult.setZkPath(minChildPath);
    if (lockFlag)
        return doWorkAfterLockSuccess(query, cacheKey, minChildPath);

    return doWorkAfterLockFailed(cacheKey, minChildPath, query);
}

==================
private FlightQueryResult getResultByRam(String cacheKey, FlightQueryParam query) throws Exception {
        FlightQueryResult result;
        String msg = "";
        CacheResult cacheResult = cacheMap.get(cacheKey);
        log.info("cacheResult.getStatus():{}", cacheResult.getStatus());
        switch (cacheResult.getStatus()) {
            case INIT:
                LockUtil.await(cacheKey);
                cacheResult = cacheMap.get(cacheKey);
                msg = "来自内存数据 - 有等待";
                break;
            case VALID:
                msg = "来自内存数据 - 无等待";
            default:
                break;
        }

        result.setMessage(cacheResult.getMsg() + msg + result.getTransId());
        return result;
    }

==================
@Data
public class CacheResult{
    private String cacheKey;
    private String zkPath; // 该key对应的zk node path
    private CacheStatus status = CacheStatus.INIT;
    private FlightQueryResult result;
    private String msg = "";
    public long CacheTimeStamp = System.currentTimeMillis();//当前时间戳

    public CacheResult(String cacheKey) {
        this.cacheKey = cacheKey;
    }

    public void setMsg(String msg) {
        if (StringUtils.isNotBlank(msg)) {
            this.msg = msg + " || ";
        }
    }
}

==================
public enum CacheStatus {
    INIT("INIT", "初始化状态", 1),
    VALID("VALID", "有效状态", 2),
    EXPIRE("EXPIRE", "过时", 3);
}

1.将请求入参转化为惟一cacheKey,判断本地缓存中是否有该cacheKey;服务器

  • 当存在时,获取相对应的CacheResult对象,CacheResult.status是初始化状态时,则当前线程进入waiting状态;有效状态时,则直接返回CacheResult.result;
  • 当不存在时,建立CacheResult对象,CacheResult.status默认是初始化状态

2.当本地缓存不存在时,经过zookeeper获取分布式锁,利用序列化节点特性实现乐观锁的方式保证只有一个请求获取分布式锁成功,相同参数建立相同目录,但由于是序列化节点,zookeeper会保证时序性,xxx_node0001,xxx_node0002;当是最小当值时表示获取到了分布式锁;并发

  • 获取到分布式锁:
1.监听本身建立的节点
2.调用第三方服务接口,返回结果
3.启动十分钟定时任务,保证数据十分钟保鲜期
4.将返回的结果放入redis中
5.将zookeeper节点状态值改成有效状态(VALID);
  • 没有获取到分布式锁:
1.删除本身建立的节点,并监听序列化值最小的节点
2.获取最小节点目录的值进行判断
   init:调用await方法,让当前线程等待
   valid:从cacheMap中获取result结果,并返回

当获取到分布式锁以后,zookeeper节点状态值改成VALID时,全部监听最小节点的服务器都会获取事件,从redis中获取数据,并将数据放入当前服务器的本地内存中,最后调用lockUtil.signAll方法;分布式

当十分钟后定时任务触发,会将zk中的状态改成Expired值,这时监听最小节点的服务器都会收获expired事件;这时只须要删除本地缓存便可;优化

这种设计方式有如下几个好处:this

1.防止大量的请求访问redis,减轻redis压力线程

2.redis虽然有expire功能,可是没有事件触发,每一个服务器仍是须要去查询redis,形成压力;同时责任单一,redis只负责数据存储

当获取乐观锁的服务器down机:
   全部监听最小节点当服务器会接收节点删除事件,一样会删除本地内存;不会影响业务;

以上解决方案遵循两点:

1.经过concurrentHashMap实现本地服务器只有一个请求有机会调用第三方服务接口

2.经过zk乐观锁机制,保证多台服务器只有一台服务器的一个请求调用第三方服务接口 redis只负责存储数据,zk保证分布式锁的惟一获取

相关文章
相关标签/搜索