国庆长假结束后,笔者一直在于假期综合症缠斗,特别是周六上班。。。java
相信你们对Hystrix都很熟悉,它的源码大量使用RxJava,正好笔者的老本行是Android开发工程师,之前也略微接触过,想分享下本身看完Hystix的请求合并与请求缓存部分源码的一些收获。git
Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.github
一、隔离:经过隔离,避免服务之间相互影响,一个服务不可用,不会影响别的服务,避免了服务雪崩。 二、降级:分布式环境中,服务不可用的状况没法避免,降级机制能够给出更加友好的交互(默认值、异常返回)。 三、熔断:熔断机制能够避免在服务不可用时,服务调用方还在调用不可用的服务,致使资源消耗、耗时增长。 四、提供可视化的监控,Hystrix Dashboard。 四、固然,还有笔者今天要讲的请求合并与请求缓存。缓存
Parallel execution. Concurrency aware request caching. Automated batching through request collapsing.数据结构
一、请求缓存:CommandUsingRequestCache 二、请求合并:CommandCollapserGetValueForKeyapp
CommandUsingRequestCache
,继承自HystrixCommand
,和通常的Command
一致。一、初始化
HystrixRequestContext
二、重写getCacheKey
分布式
HystrixRequestContext.initializeContext
代码在HystrixRequestContext
中,从类名能够看出这是个请求上下文,保存一些请求的信息。测试
从源码能够看出,new出一个HystrixRequestContext
,塞入ThreadLocal
变量中。ui
private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();
/** * Call this at the beginning of each request (from parent thread) * to initialize the underlying context so that {@link HystrixRequestVariableDefault} can be used on any children threads and be accessible from * the parent thread. * <p> * <b>NOTE: If this method is called then <code>shutdown()</code> must also be called or a memory leak will occur.</b> * <p> * See class header JavaDoc for example Servlet Filter implementation that initializes and shuts down the context. */
public static HystrixRequestContext initializeContext() {
HystrixRequestContext state = new HystrixRequestContext();
requestVariables.set(state);
return state;
}
复制代码
HystrixRequestContext
存储上下文的数据结构是怎样的呢?// 每一个HystrixRequestContext实例,都会有一个ConcurrentMap
ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> state = new ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>>();
/** 删除ConcurrentMap中存储的全部键值对,若是初始化了HystrixRequestContext对象,没有调用shutdown方法,确实会致使内存泄漏,由于state还在。 */
public void shutdown() {
if (state != null) {
for (HystrixRequestVariableDefault<?> v : state.keySet()) {
// for each RequestVariable we call 'remove' which performs the shutdown logic
try {
HystrixRequestVariableDefault.remove(this, v);
} catch (Throwable t) {
HystrixRequestVariableDefault.logger.error("Error in shutdown, will continue with shutdown of other variables", t);
}
}
// null out so it can be garbage collected even if the containing object is still
// being held in ThreadLocals on threads that weren't cleaned up
state = null;
}
}
复制代码
ConcurrentHashMap
里存的HystrixRequestVariableDefault
及静态内部类HystrixRequestVariableDefault.LazyInitializer
又是什么呢?HystrixRequestVariableDefault
其实就是存储了泛型T
的value
,而且封装了initialValue
、get
、set
方法。LazyInitializer
顾名思义就是为了懒汉式初始化value
,而设计的内部类。// 做用一:做为内部类调用HystrixRequestVariableDefault.initialValue方法,经过维护initialized布尔值,使HystrixRequestVariableDefault.initialValue方法只调用一次。
// 做用二:new一个LazyInitializer对象或LazyInitializer被垃圾回收时不会调用HystrixRequestVariableDefault.initialValue方法,也就是说对于业务初始化逻辑的影响被排除。
// 做用三:调用get方法时,能够经过CAS乐观锁的方式实现value的获取,具体请参照get方法。
static final class LazyInitializer<T> {
// @GuardedBy("synchronization on get() or construction")
private T value;
/* * Boolean to ensure only-once initialValue() execution instead of using * a null check in case initialValue() returns null */
// @GuardedBy("synchronization on get() or construction")
private boolean initialized = false;
private final HystrixRequestVariableDefault<T> rv;
// 不会调用HystrixRequestVariableDefault.initialValue,不会更新initialized值
private LazyInitializer(HystrixRequestVariableDefault<T> rv) {
this.rv = rv;
}
// 不会调用HystrixRequestVariableDefault.initialValue,只能经过set方式调用
private LazyInitializer(HystrixRequestVariableDefault<T> rv, T value) {
this.rv = rv;
this.value = value;
this.initialized = true;
}
// 若是未初始化(没有调用过set方法)过,则返回HystrixRequestVariableDefault.initialValue的值,初始化过则返回初始化的值
public synchronized T get() {
if (!initialized) {
value = rv.initialValue();
initialized = true;
}
return value;
}
}
复制代码
ConcurrentHashMap
中取出对应的LazyInitializer
,若是为空则使用CAS乐观锁的方式,new一个LazyInitializer
并存入ConcurrentHashMap
,最后返回调用LazyInitializer.get()
并返回public T get() {
// 当前线程的HystrixRequestContext为null 或 ConcurrentHashMap<HystrixRequestVariableDefault<?>, HystrixRequestVariableDefault.LazyInitializer<?>> 为null
if (HystrixRequestContext.getContextForCurrentThread() == null) {
throw new IllegalStateException(HystrixRequestContext.class.getSimpleName() + ".initializeContext() must be called at the beginning of each request before RequestVariable functionality can be used.");
}
ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;
// short-circuit the synchronized path below if we already have the value in the ConcurrentHashMap
LazyInitializer<?> v = variableMap.get(this);
if (v != null) {
return (T) v.get();
}
/* * 乐观锁方式(CAS)new一个LazyInitializer,放进ConcurrentHashMap * 这里值得注意的是,不调用LazyInitializer.get方法是不会执行HystrixRequestVariableDefault.initialValue,故当putIfAbsent失败时,能够乐观地放弃该实例,使该实例被GC。 * 无论哪一个LazyInitializer实例的get方法被调用,HystrixRequestVariableDefault.initialValue也只会被调用一次。 */
LazyInitializer<T> l = new LazyInitializer<T>(this);
LazyInitializer<?> existing = variableMap.putIfAbsent(this, l);
if (existing == null) {
/* * We won the thread-race so can use 'l' that we just created. */
return l.get();
} else {
/* * We lost the thread-race so let 'l' be garbage collected and instead return 'existing' */
return (T) existing.get();
}
}
复制代码
getCacheKey
重写了AbstractCommand.getCacheKey
方法,AbstractCommand
为HystrixCommand
的基类。
execute
方法,最终调用toObservable
方法,而toObservable
方法在AbstractCommand
中,所以咱们能够初步判定在AbstractCommand.toObservable
方法中,会与HystrixRequestVariableDefault
或者其实现的接口产生关联,进行缓存的读取和写入。*AbstractCommand.toObservable
的关键代码以下:this
final String cacheKey = getCacheKey();
/* 若是开启了缓存功能,从缓存读取 */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
// 缓存对象
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// 放进缓存
if (requestCacheEnabled && cacheKey != null) {
// 包装成缓存Observable对象
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
复制代码
HystrixRequestCache
与 HystrixRequestVariableDefault
之间的关联了,AbstractCommand
构造器中经过HystrixRequestCache.getInstance
构造了HystrixRequestCache
对象。// 又是CAS,putIfAbsent。。。
private static HystrixRequestCache getInstance(RequestCacheKey rcKey, HystrixConcurrencyStrategy concurrencyStrategy) {
HystrixRequestCache c = caches.get(rcKey);
if (c == null) {
HystrixRequestCache newRequestCache = new HystrixRequestCache(rcKey, concurrencyStrategy);
HystrixRequestCache existing = caches.putIfAbsent(rcKey, newRequestCache);
if (existing == null) {
// we won so use the new one
c = newRequestCache;
} else {
// we lost so use the existing
c = existing;
}
}
return c;
}
复制代码
HystrixRequestCache
的值是怎么存储的,看HystrixRequestCache.putIfAbsent
。HystrixCachedObservable<T> putIfAbsent(String cacheKey, HystrixCachedObservable<T> f) {
// 使用HystrixRequestCache.prefix + concurrencyStrategy + HystrixCommand.getCacheKey包装成缓存key
ValueCacheKey key = getRequestCacheKey(cacheKey);
if (key != null) {
// 寻找缓存,关键代码
ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> cacheInstance = requestVariableForCache.get(concurrencyStrategy);
if (cacheInstance == null) {
throw new IllegalStateException("Request caching is not available. Maybe you need to initialize the HystrixRequestContext?");
}
HystrixCachedObservable<T> alreadySet = (HystrixCachedObservable<T>) cacheInstance.putIfAbsent(key, f);
if (alreadySet != null) {
// someone beat us so we didn't cache this
return alreadySet;
}
}
// we either set it in the cache or do not have a cache key
return null;
}
复制代码
requestVariableInstance.get(key)
为HystrixRequestVariableHolder
中的方法。// 找到了关联。。。这里有HystrixRequestVariable
private static ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>> requestVariableInstance = new ConcurrentHashMap<RVCacheKey, HystrixRequestVariable<?>>();
//
public T get(HystrixConcurrencyStrategy concurrencyStrategy) {
RVCacheKey key = new RVCacheKey(this, concurrencyStrategy);
HystrixRequestVariable<?> rvInstance = requestVariableInstance.get(key);
if (rvInstance == null) {
requestVariableInstance.putIfAbsent(key, concurrencyStrategy.getRequestVariable(lifeCycleMethods));
/* * 内存泄漏检测, */
if (requestVariableInstance.size() > 100) {
logger.warn("Over 100 instances of HystrixRequestVariable are being stored. This is likely the sign of a memory leak caused by using unique instances of HystrixConcurrencyStrategy instead of a single instance.");
}
}
// HystrixRequestVariable.get取出ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>的map,再从ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>中根据重写的getCacheKey构造出ValueCacheKey,拿出缓存值。
return (T) requestVariableInstance.get(key).get();
}
复制代码
最后,再总结下请求缓存机制,一个request对应一个HystrixRequestContext
、HystrixRequestVariable
中存储缓存值,经过重写getCacheKey
构造对应RVCacheKey
,经过HystrixRequestCache
的HystrixRequestVariableHolder
拿到HystrixRequestVariable
的值。
看了源码才发现,做者有以下感觉:
一、各类ConcurrentHashMap 二、终于RxJava第一次看到在非Android领域运用 三、懒加载+CAS伴随整个流程,后续也会考虑这种非锁实现