Hystrix请求合并与请求缓存(一):请求缓存

前言

国庆长假结束后,笔者一直在于假期综合症缠斗,特别是周六上班。。。java

相信你们对Hystrix都很熟悉,它的源码大量使用RxJava,正好笔者的老本行是Android开发工程师,之前也略微接触过,想分享下本身看完Hystix的请求合并与请求缓存部分源码的一些收获。git

Hystrix简介

  • Hystrix由Netflix开源,官方定义以下:

Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.github

  • 笔者理解:在分布式环境中,错误不可避免,Hystrix提供了一种隔离、降级、熔断等机制。

一、隔离:经过隔离,避免服务之间相互影响,一个服务不可用,不会影响别的服务,避免了服务雪崩。 二、降级:分布式环境中,服务不可用的状况没法避免,降级机制能够给出更加友好的交互(默认值、异常返回)。 三、熔断:熔断机制能够避免在服务不可用时,服务调用方还在调用不可用的服务,致使资源消耗、耗时增长。 四、提供可视化的监控,Hystrix Dashboard。 四、固然,还有笔者今天要讲的请求合并与请求缓存缓存

  • 请求合并与请求缓存,对应于官方给出的**What does it do?**的第3项:

Parallel execution. Concurrency aware request caching. Automated batching through request collapsing.数据结构

  • 如下都是经过官方给的测试用例做为入口,查找源码并进行分析。

一、请求缓存:CommandUsingRequestCache 二、请求合并:CommandCollapserGetValueForKeyapp

请求缓存

  • 请求缓存的例子在CommandUsingRequestCache,继承自HystrixCommand,和通常的Command一致。
  • 那么,使用缓存和不使用缓存代码层面有何不一样呢?

一、初始化HystrixRequestContext 二、重写getCacheKey分布式

HystrixRequestContext

  • HystrixRequestContext.initializeContext代码在HystrixRequestContext中,从类名能够看出这是个请求上下文,保存一些请求的信息。测试

  • 从源码能够看出,new出一个HystrixRequestContext,塞入ThreadLocal变量中。ui

private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();

/** * Call this at the beginning of each request (from parent thread) * to initialize the underlying context so that {@link HystrixRequestVariableDefault} can be used on any children threads and be accessible from * the parent thread. * <p> * <b>NOTE: If this method is called then <code>shutdown()</code> must also be called or a memory leak will occur.</b> * <p> * See class header JavaDoc for example Servlet Filter implementation that initializes and shuts down the context. */
    public static HystrixRequestContext initializeContext() {
        HystrixRequestContext state = new HystrixRequestContext();
        requestVariables.set(state);
        return state;
    }
复制代码
  • 那么,HystrixRequestContext存储上下文的数据结构是怎样的呢?
// 每一个HystrixRequestContext实例,都会有一个ConcurrentMap
ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>>();

/** 删除ConcurrentMap中存储的全部键值对,若是初始化了HystrixRequestContext对象,没有调用shutdown方法,确实会致使内存泄漏,由于state还在。 */
    public void shutdown() {
        if (state != null) {
            for (HystrixRequestVariableDefault<?> v : state.keySet()) {
                // for each RequestVariable we call 'remove' which performs the shutdown logic
                try {
                    HystrixRequestVariableDefault.remove(this, v);
                } catch (Throwable t) {
                    HystrixRequestVariableDefault.logger.error("Error in shutdown, will continue with shutdown of other variables", t);
                }
            }
            // null out so it can be garbage collected even if the containing object is still
            // being held in ThreadLocals on threads that weren't cleaned up
            state = null;
        }
    }
复制代码
  • 这个ConcurrentHashMap里存的HystrixRequestVariableDefault及静态内部类HystrixRequestVariableDefault.LazyInitializer又是什么呢?

HystrixRequestVariableDefault

  • HystrixRequestVariableDefault其实就是存储了泛型Tvalue,而且封装了initialValuegetset方法。
  • LazyInitializer顾名思义就是为了懒汉式初始化value,而设计的内部类。
// 做用一:做为内部类调用HystrixRequestVariableDefault.initialValue方法,经过维护initialized布尔值,使HystrixRequestVariableDefault.initialValue方法只调用一次。
// 做用二:new一个LazyInitializer对象或LazyInitializer被垃圾回收时不会调用HystrixRequestVariableDefault.initialValue方法,也就是说对于业务初始化逻辑的影响被排除。
// 做用三:调用get方法时,能够经过CAS乐观锁的方式实现value的获取,具体请参照get方法。
static final class LazyInitializer<T> {
        // @GuardedBy("synchronization on get() or construction")
        private T value;

        /* * Boolean to ensure only-once initialValue() execution instead of using * a null check in case initialValue() returns null */
        // @GuardedBy("synchronization on get() or construction")
        private boolean initialized = false;

        private final HystrixRequestVariableDefault<T> rv;

		// 不会调用HystrixRequestVariableDefault.initialValue,不会更新initialized值
        private LazyInitializer(HystrixRequestVariableDefault<T> rv) {
            this.rv = rv;
        }

		// 不会调用HystrixRequestVariableDefault.initialValue,只能经过set方式调用
        private LazyInitializer(HystrixRequestVariableDefault<T> rv, T value) {
            this.rv = rv;
            this.value = value;
            this.initialized = true;
        }
		// 若是未初始化(没有调用过set方法)过,则返回HystrixRequestVariableDefault.initialValue的值,初始化过则返回初始化的值
        public synchronized T get() {
            if (!initialized) {
                value = rv.initialValue();
                initialized = true;
            }
            return value;
        }
    }
复制代码
  • get方法,先从ConcurrentHashMap中取出对应的LazyInitializer,若是为空则使用CAS乐观锁的方式,new一个LazyInitializer并存入ConcurrentHashMap,最后返回调用LazyInitializer.get()并返回
public T get() {
		// 当前线程的HystrixRequestContext为null 或 ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> 为null
        if (HystrixRequestContext.getContextForCurrentThread() == null) {
            throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
        }
        ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;

        // short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap
        LazyInitializer<?> v = variableMap.get(this);
        if (v != null) {
            return (T) v.get();
        }

        /* * 乐观锁方式(CAS)new一个LazyInitializer,放进ConcurrentHashMap * 这里值得注意的是,不调用LazyInitializer.get方法是不会执行HystrixRequestVariableDefault.initialValue,故当putIfAbsent失败时,能够乐观地放弃该实例,使该实例被GC。 * 无论哪一个LazyInitializer实例的get方法被调用,HystrixRequestVariableDefault.initialValue也只会被调用一次。 */
        LazyInitializer<T> l = new LazyInitializer<T>(this);
        LazyInitializer<?> existing = variableMap.putIfAbsent(this, l);
        if (existing == null) {
            /* * We won the thread-race so can use 'l' that we just created. */
            return l.get();
        } else {
            /* * We lost the thread-race so let 'l' be garbage collected and instead return 'existing' */
            return (T) existing.get();
        }
    }
复制代码

各种之间的关系

  • 一个request(不局限于一个线程) -> HystrixRequestContext -> ConcurrentHashMap<HystrixRequestVariableDefault, HystrixRequestVariableDefault.LazyInitializer>
  • 也就是说每一个request都有一个ConcurrentHashMap<HystrixRequestVariableDefault, HystrixRequestVariableDefault.LazyInitializer> map。

获取缓存

  • getCacheKey重写了AbstractCommand.getCacheKey方法,AbstractCommandHystrixCommand的基类。
    enter image description here
  • 根据上图,咱们能够看出execute方法,最终调用toObservable方法,而toObservable方法在AbstractCommand中,所以咱们能够初步判定在AbstractCommand.toObservable方法中,会与HystrixRequestVariableDefault或者其实现的接口产生关联,进行缓存的读取和写入。

*AbstractCommand.toObservable的关键代码以下:this

final String cacheKey = getCacheKey();

                /* 若是开启了缓存功能,从缓存读取 */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }

				// 缓存对象
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 放进缓存
                if (requestCacheEnabled && cacheKey != null) {
                    // 包装成缓存Observable对象
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
复制代码
  • 接下来,咱们就只要寻找HystrixRequestCacheHystrixRequestVariableDefault之间的关联了,AbstractCommand构造器中经过HystrixRequestCache.getInstance构造了HystrixRequestCache对象。
// 又是CAS,putIfAbsent。。。
 private static HystrixRequestCache getInstance(RequestCacheKey rcKey, HystrixConcurrencyStrategy concurrencyStrategy) {
        HystrixRequestCache c = caches.get(rcKey);
        if (c == null) {
            HystrixRequestCache newRequestCache = new HystrixRequestCache(rcKey, concurrencyStrategy);
            HystrixRequestCache existing = caches.putIfAbsent(rcKey, newRequestCache);
            if (existing == null) {
                // we won so use the new one
                c = newRequestCache;
            } else {
                // we lost so use the existing
                c = existing;
            }
        }
        return c;
    }
复制代码
  • 来看HystrixRequestCache的值是怎么存储的,看HystrixRequestCache.putIfAbsent
HystrixCachedObservable<T> putIfAbsent(String cacheKey, HystrixCachedObservable<T> f) {
		// 使用HystrixRequestCache.prefix + concurrencyStrategy + HystrixCommand.getCacheKey包装成缓存key
        ValueCacheKey key = getRequestCacheKey(cacheKey);
        if (key != null) {
            // 寻找缓存,关键代码
            ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
            if (cacheInstance == null) {
                throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
            }
            HystrixCachedObservable<T> alreadySet = (HystrixCachedObservable<T>) cacheInstance.putIfAbsent(key, f);
            if (alreadySet != null) {
                // someone beat us so we didn't cache this
                return alreadySet;
            }
        }
        // we either set it in the cache or do not have a cache key
        return null;
    }
复制代码
  • requestVariableInstance.get(key)HystrixRequestVariableHolder中的方法。
// 找到了关联。。。这里有HystrixRequestVariable
 private static ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>> requestVariableInstance = new ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>>();
 // 
 public T get(HystrixConcurrencyStrategy concurrencyStrategy) {
        
        RVCacheKey key = new RVCacheKey(this, concurrencyStrategy);
        HystrixRequestVariable<?> rvInstance = requestVariableInstance.get(key);
        if (rvInstance == null) {
            requestVariableInstance.putIfAbsent(key, concurrencyStrategy.getRequestVariable(lifeCycleMethods));
            /* * 内存泄漏检测, */
            if (requestVariableInstance.size() > 100) {
                logger.warn("Over 100 instances of HystrixRequestVariable are being stored. This is likely the sign of a memory leak caused by using unique instances of HystrixConcurrencyStrategy instead of a single instance.");
            }
        }
        // HystrixRequestVariable.get取出ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>的map,再从ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>中根据重写的getCacheKey构造出ValueCacheKey,拿出缓存值。
        return (T) requestVariableInstance.get(key).get();
    }
复制代码

获取缓存过程当中各个对象的对应关系

  • 一个commandKey
  • 一个HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>
  • 一个ConcurrentHashMap<RVCacheKey, HystrixRequestVariable> requestVariableInstance = new ConcurrentHashMap>()

请求缓存总结

最后,再总结下请求缓存机制,一个request对应一个HystrixRequestContextHystrixRequestVariable中存储缓存值,经过重写getCacheKey构造对应RVCacheKey,经过HystrixRequestCacheHystrixRequestVariableHolder拿到HystrixRequestVariable的值。

总结

看了源码才发现,做者有以下感觉:

一、各类ConcurrentHashMap 二、终于RxJava第一次看到在非Android领域运用 三、懒加载+CAS伴随整个流程,后续也会考虑这种非锁实现

参考文献

相关文章
相关标签/搜索