服务容错保护断路器Hystrix之二:Hystrix工做流程解析

1、总运行流程

当你发出请求后,hystrix是这么运行的html

 

红圈 :Hystrix 命令执行失败,执行回退逻辑。也就是你们常常在文章中看到的“服务降级”。
绿圈 :四种状况会触发失败回退逻辑( fallback )。
第一种 :short-circuit ,处理链路处于熔断的回退逻辑,在 「3. #handleShortCircuitViaFallback()」 详细解析。
第二种 :semaphore-rejection ,处理信号量得到失败的回退逻辑,在 「4. #handleShortCircuitViaFallback()」 详细解析。
第三种 :thread-pool-rejection ,处理线程池提交任务拒绝的回退逻辑,在 「5. #handleThreadPoolRejectionViaFallback()」 详细解析。
第四种 :execution-timeout ,处理命令执行超时的回退逻辑,在 「6. #handleTimeoutViaFallback()」 详细解析。
第五种 :execution-failure ,处理命令执行异常的回退逻辑,在 「7. #handleFailureViaFallback()」 详细解析。
第六种 :bad-request ,TODO 【2014】【HystrixBadRequestException】,和 hystrix-javanica 子项目相关。java

另外,#handleXXXX() 方法,总体代码比较相似,最终都是调用 #getFallbackOrThrowException() 方法,得到【回退逻辑 Observable】或者【异常 Observable】,在 「8. #getFallbackOrThrowException(…)」 详细解析。

git

 

 

 详细解释个步骤github

1.建立  HystrixCommand or HystrixObservableCommand Object

  HystrixCommandweb

用于返回单一的响应
HystrixObservableCommand
用于返回多个可自定义的响应

命令模式,未来自客户端的请求封装成一个对象,从而让你可使用不一样的请求对客户端进行参数化。它能够被用于实现“行为请求者"与”行为实现者“的解耦,以便使二者能够适应变化。
这一过程也包含了策略、资源的初始化,参看AbstractCommand的构造函数:
protected AbstractCommand(...) {
    // 初始化group,group主要是用来对不一样的command key进行统一管理,好比统一监控、告警等
    this.commandGroup = initGroupKey(...);
    // 初始化command key,用来标识降级逻辑,能够理解成command的id
    this.commandKey = initCommandKey(...);
    // 初始化自定义的降级策略
    this.properties = initCommandProperties(...);
    // 初始化线程池key,相同的线程池key将共用线程池
    this.threadPoolKey = initThreadPoolKey(...);
    // 初始化监控器
    this.metrics = initMetrics(...);
    // 初始化断路器
    this.circuitBreaker = initCircuitBreaker(...);
    // 初始化线程池
    this.threadPool = initThreadPool(...);
 
    // Hystrix经过SPI实现了插件机制,容许用户对事件通知、处理和策略进行自定义
    this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
    this.executionHook = initExecutionHook(executionHook);
 
    this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
    this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);
 
    /* fallback semaphore override if applicable */
    this.fallbackSemaphoreOverride = fallbackSemaphore;
 
    /* execution semaphore override if applicable */
    this.executionSemaphoreOverride = executionSemaphore;
}

 

其实构造函数中的不少初始化工做只会集中在建立第一个Command时来作,后续建立的Command对象主要是从静态Map中取对应的实例来赋值,好比监控器、断路器和线程池的初始化,由于相同的Command的command key和线程池key都是一致的,在HystrixCommandMetricsHystrixCircuitBreaker.FactoryHystrixThreadPool中会分别有以下静态属性:spring

private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();
 
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
 
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

可见全部Command对象均可以在这里找到本身对应的资源实例编程

2. Execute the Command(命令执行)

对于HystrixCommand有4个执行方法
对于HystrixObservableCommand只有后两个
//同步阻塞方法,其实就是调用了queue().get()
execute() — blocks, then returns the single response received from the dependency (or throws an exception in case of an error)
 
//异步非阻塞方法,直接返回Future,能够先作本身的事情,作完再.get()
queue() — returns a Future with which you can obtain the single response from the dependency
 
//热观察,能够被当即执行,若是订阅了那么会从新通知,其实就是调用了toObservable()并内置ReplaySubject,详细能够参考RxJava
observe() — subscribes to the Observable that represents the response(s) from the dependency and returns an Observable that replicates that source Observable
 
//冷观察,返回一个Observable对象,当调用此接口,还须要本身加入订阅者,才能接受到信息,详细能够参考RxJava
toObservable() — returns an Observable that, when you subscribe to it, will execute the Hystrix command and emit its responses
 
注:因为Hystrix底层采用了RxJava框架开发,因此没接触过的可能会一脸懵逼,须要再去对RxJava有所了解。

工做流程的源码说明:数组

工做流程图中的第1,2步:HystrixCommand.java的execute()是入口,调用的是queue():
    public R execute() {
        try {
            return queue().get();    
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }

在queue()中调用了toObservable()的方法,接着看源码:缓存

3. Is the Response Cached?(结果是否被缓存)

判断是否使用缓存:是否实现了getCacheKey() 的方法tomcat

若是使用缓存,再判断若是请求缓存可用fromCache != null,而且对于该请求的响应也在缓存中,那么命中的响应会以Observable直接返回。

工做流程的源码说明:

工做流程图中的第3步:AbstractCommand.java的toObservable()方法中的片断:
//....
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();

                /* try from cache first */
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
//.... 

              protected boolean isRequestCachingEnabled() {
                 return properties.requestCacheEnabled().get() && getCacheKey() != null;
              }

 
下图关因而请求缓存的整个生命周期

4. Is the Circuit Open?(断路器是否打开)

在命令结果没有缓存命中的时候,Hystrix在执行命令前需检查断路器是否为打开状态:

  • 若是是打开的,那么Hystrix不会执行命令,而是转接到fallback处理逻辑(对应下面的第8步)
  • 若是断路器是关闭的,那么Hystrix调到第5步,检查是否有可用资源来执行命令。

5. Is the Thread Pool/Queue/Semaphore Full?(线程池/请求队列/信号量是否已经占满)

线程池或者信号量是否已经满负荷,若是已经满负荷那么快速失败

6. HystrixObservableCommand.construct() or HystrixCommand.run())

两个断路器的入口,若是是继承HystrixObservableCommand,那么就调用construct()函数,若是是继承HystrixCommand,那么就调用run()函数。

7. Calculate Circuit Health(计算断路器的健康度)

Hystrix记录了成功,失败,拒绝,超时四种报告

这些报告用于决定哪些用于断路,被断路的点在恢复周期内没法被后来的请求访问到。

8. Get the Fallback

快速失败会在如下几个场景触发

1.由construct() or run()抛出了一个异常

2.断路器已经打开的时候

3.没有空闲的线程池和队列或者信号量

4.一次命令执行超时

 

能够重写快速失败函数来自定义,

HystrixObservableCommand.resumeWithFallback()

HystrixCommand.getFallback()

9. 成功返回

总体的函数调用流程以下,其实这就是源码的调用流程

 

 

源码:

1、AbstractCommand 主要功能点

实现run、getFallback等方法,你就拥有了一个具备基本熔断功能的类。从使用来看,全部的核心逻辑都由AbstractCommand(即HystrixCommand的父类,HystrixCommand只是对AbstractCommand进行了简单包装)抽象类串起来,从功能上来讲,AbstractCommand必须将以下功能联系起来:

策略配置:Hystrix有两种降级模型,即信号量(同步)模型和线程池(异步)模型,这两种模型全部可定制的部分都体如今了HystrixCommandProperties和HystrixThreadPoolProperties两个类中。然而仍是那句老话,Hystrix只提供了配置修改的入口,没有将配置界面化,若是想在页面上动态调整配置,还须要本身实现。

数据统计:Hystrix以命令模式的方式来控制业务逻辑以及熔断逻辑的调用时机,因此说数据统计对它来讲不算难事,但如何高效、精准的在内存中统计数据,还须要必定的技巧。

断路器:断路器能够说是Hystrix内部最重要的状态机,是它决定着每一个Command的执行过程。

监控露出:能经过某种可配置方式将统计数据展示在仪表盘上。

 二. Hystrix的断路器设计

断路器是Hystrix最核心的状态机,只有了解它的变动条件,咱们才能准确掌握Hystrix的内部行为。上面的内部流程图中【断路器状态判断】这个环节直接决定着此次请求(或者说这个Command对象)是尝试去执行正常业务逻辑(即run())仍是走降级后的逻辑(即getFallback()),断路器HystrixCircuitBreaker有三个状态,

为了能作到状态能按照指定的顺序来流转,而且是线程安全的,断路器的实现类HystrixCircuitBreakerImpl使用了AtomicReference:

class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        private final HystrixCommandProperties properties;
        private final HystrixCommandMetrics metrics;

        enum Status {
            CLOSED, OPEN, HALF_OPEN;
        }

// 断路器初始状态确定是关闭状态
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);

 

断路器在状态变化时,使用了AtomicReference#compareAndSet来确保当条件知足时,只有一笔请求能成功改变状态。

那么,什么条件下断路器会改变状态?

1. CLOSED -> OPEN :

时间窗口内(默认10秒请求量大于请求量阈值(即circuitBreakerRequestVolumeThreshold,默认值是20),而且该时间窗口内错误率大于错误率阈值(即circuitBreakerErrorThresholdPercentage,默认值为50,表示50%),那么断路器的状态将由默认的CLOSED状态变为OPEN状态。看代码可能更直接

// 检查是否超过了咱们设置的断路器请求量阈值
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
    // 若是没有超过统计窗口的请求量阈值,则不改变断路器状态,
    // 若是它是CLOSED状态,那么仍然是CLOSED.
    // 若是它是HALF-OPEN状态,咱们须要等待请求被成功执行,
    // 若是它是OPEN状态, 咱们须要等待睡眠窗口过去。
} else {
    if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
        //若是没有超过统计窗口的错误率阈值,则不改变断路器状态,,
        // 若是它是CLOSED状态,那么仍然是CLOSED.
        // 若是它是HALF-OPEN状态,咱们须要等待请求被成功执行,
        // 若是它是OPEN状态, 咱们须要等待【睡眠窗口】过去。
    } else {
        // 若是错误率过高,那么将变为OPEN状态
        if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
            // 由于断路器处于打开状态会有一个时间范围,因此这里记录了变成OPEN的时间
            circuitOpened.set(System.currentTimeMillis());
        }
    }
}

 

这里的错误率是个整数,即errorPercentage= (int) ((doubleerrorCount totalCount 100);,至于睡眠窗口,下面会提到。

2. OPEN ->HALF_OPEN: 

前面说过,当进入OPEN状态后,会进入一段睡眠窗口,即只会OPEN一段时间,因此这个睡眠窗口过去,就会“自动”从OPEN状态变成HALF_OPEN状态,这种设计是为了能作到弹性恢复,这种状态的变动,并非由调度线程来作,而是由请求来触发,每次请求都会进行以下检查:

@Override
public boolean attemptExecution() {
    if (properties.circuitBreakerForceOpen().get()) {
        return false;
    }
    if (properties.circuitBreakerForceClosed().get()) {
        return true;
    }
    // circuitOpened值等于1说明断路器状态为CLOSED
    if (circuitOpened.get() == -1) {
        return true;
    } else {
        if (isAfterSleepWindow()) {
            // 睡眠窗口过去后只有第一个请求能被执行
            // 若是执行成功,那么状态将会变成CLOSED
            // 若是执行失败,状态仍变成OPEN
            if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                return true;
            } else {
                return false;
            }
        } else {
            return false;
        }
    }
}
 
// 睡眠窗口是否过去
private boolean isAfterSleepWindow() {
    // 还记得上面CLOSED->OPEN时记录的时间吗?
    final long circuitOpenTime = circuitOpened.get();
    final long currentTime = System.currentTimeMillis();
    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
    return currentTime > circuitOpenTime + sleepWindowTime;
}

 

 

3. HALF_OPEN ->CLOSED :

变为半开状态后,会放第一笔请求去执行,并跟踪它的执行结果,若是是成功,那么将由HALF_OPEN状态变成CLOSED状态

@Override
public void markSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
        //This thread wins the race to close the circuit - it resets the stream to start it over from 0
        metrics.resetStream();
        Subscription previousSubscription = activeSubscription.get();
        if (previousSubscription != null) {
            previousSubscription.unsubscribe();
        }
        Subscription newSubscription = subscribeToStream();
        activeSubscription.set(newSubscription);
        // 已经进入了CLOSED阶段,因此将OPEN的修改时间设置成-1
        circuitOpened.set(-1L);
    }
}

4. HALF_OPEN ->OPEN :

 变为半开状态时若是第一笔被放去执行的请求执行失败(资源获取失败、异常、超时等),就会由HALP_OPEN状态再变为OPEN状态

@Override
public void markNonSuccess() {
    if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
        // This thread wins the race to re-open the circuit - it resets the start time for the sleep window
        circuitOpened.set(System.currentTimeMillis());
    }
}

三. 滑动窗口(滚动窗口)

上面提到的断路器须要的时间窗口请求量和错误率这两个统计数据,都是指固定时间长度内的统计数据,断路器的目标,就是根据这些统计数据来预判并决定系统下一步的行为,Hystrix经过滑动窗口来对数据进行“平滑”统计,默认状况下,一个滑动窗口包含10个桶Bucket),每一个桶时间宽度是1秒,负责1秒的数据统计。滑动窗口包含的总时间以及其中的桶数量都是能够配置的,来张官方的截图认识下滑动窗口:

上图的每一个小矩形表明一个桶,能够看到,每一个桶都记录着1秒内的四个指标数据:成功量、失败量、超时量和拒绝量,这里的拒绝量指的就是上面流程图中【信号量/线程池资源检查】中被拒绝的流量。10个桶合起来是一个完整的滑动窗口,因此计算一个滑动窗口的总数据须要将10个桶的数据加起来

 

咱们如今来具体看看滑动窗口和桶的设计,若是将滑动窗口设计成对一个长度为10的整形数组的操做,第一个想到的应该是AtomicLongArray,AtomicLongArray中每一个位置的数据都能线程安全的操做,提供了譬如incrementAndGet、getAndSet、compareAndSet等经常使用方法。但因为一个桶须要维护四个指标,若是用四个AtomicLongArray来实现,作法不够高级,因而咱们想到了AtomicReferenceArray<Bucket>Bucket对象内部能够用AtomicLong来维护着这四个指标。滑动窗口和桶的设计特别讲究技巧,须要尽量作到性能、数据准确性两方面的极致,咱们来看Hystrix是如何作到的。

 

桶的数据统计简单来讲能够分为两类,一类是简单自增计数器,好比请求量、错误量等,另外一类是并发最大值,好比一段时间内的最大并发量(或者说线程池的最大任务数),下面是桶类Bucket的定义:

class Bucket {
    // 标识是哪一秒的桶数据
    final long windowStart;
    // 若是是简单自增统计数据,那么将使用adderForCounterType
    final LongAdder[] adderForCounterType;
    // 若是是最大并发类的统计数据,那么将使用updaterForCounterType
    final LongMaxUpdater[] updaterForCounterType;
 
    Bucket(long startTime) {
        this.windowStart = startTime;
 
        // 预分配内存,提升效率,不一样事件对应不一样的数组index
        adderForCounterType = new LongAdder[HystrixRollingNumberEvent.values().length];
        for (HystrixRollingNumberEvent type : HystrixRollingNumberEvent.values()) {
            if (type.isCounter()) {
                adderForCounterType[type.ordinal()] = new LongAdder();
            }
        }
 
        // 预分配内存,提升效率,不一样事件对应不一样的数组index
        updaterForCounterType = new LongMaxUpdater[HystrixRollingNumberEvent.values().length];
        for (HystrixRollingNumberEvent type : HystrixRollingNumberEvent.values()) {
            if (type.isMaxUpdater()) {
                updaterForCounterType[type.ordinal()] = new LongMaxUpdater();
                // initialize to 0 otherwise it is Long.MIN_VALUE
                updaterForCounterType[type.ordinal()].update(0);
            }
        }
    }
    //...略...
}

咱们能够看到,并无用所谓的AtomicLong,为了方便的管理各类事件(参见com.netflix.hystrix.HystrixEventType)的数据统计,Hystrix对不一样的事件使用不一样的数组index(即枚举的顺序),这样对于某个桶(即某一秒)的指定类型的数据,总能从数组中找到对应的LongAdder(用于统计前面说的简单自增)或LongMaxUpdater(用于统计前面说的最大并发值)对象来进行自增或更新操做。对于性能有要求的中间件或库类都避不开要CPUCache优化的问题,好比cache line,以及cache line带来的false sharing问题。Bucket的内部并无使用AtomicLong,而是使用了JDK8新提供的LongAdder,在高并发的单调自增场景,LongAdder提供了比AtomicLong更好的性能,至于LongAdder的设计思想,本文不展开,感兴趣的朋友能够去拜读Doug Lea大神的代码(有意思的是Hystrix没有直接使用JDK中的LongAdder,而是copy过来改了改)。LongMaxUpdater也是相似的,它和LongAddr同样都派生于Striped64,这里再也不展开。

滑动窗口由多个桶组成,业界通常的作法是将数组作成环,Hystrix中也相似,多个桶是放在AtomicReferenceArray<Bucket>来维护的,为了将其作成环,须要保存头尾的引用,因而有了ListState类: 

class ListState {
    /*
     * 这里的data之因此用AtomicReferenceArray而不是普通数组,是由于data须要
     * 在不一样的ListState对象中跨线程来引用,须要可见性和并发性的保证。
     */
    private final AtomicReferenceArray<Bucket> data;
    private final int size;
    private final int tail;
    private final int head;
 
    private ListState(AtomicReferenceArray<Bucket> data, int head, int tail) {
        this.head = head;
        this.tail = tail;
        if (head == 0 && tail == 0) {
            size = 0;
        } else {
            this.size = (tail + dataLength - head) % dataLength;
        }
        this.data = data;
    }
    //...略...
}

  咱们能够发现,真正的数据是data,而ListState只是一个时间段的数据快照而已,因此tail和head都是final,这样作的好处是咱们不须要去为head、tail的原子操做而苦恼,转而变成对ListState的持有操做,因此滑动窗口看起来以下:

咱们能够看到,因为默认一个滑动窗口包含10个桶,因此AtomicReferenceArray<Bucket>的size得达到10+1=11才能“滑动/滚动”起来,在肯定的某一秒内,只有一个桶被更新,其余的桶数据都没有变化。既然经过ListState能够拿到全部的数据,那么咱们只须要持有最新的ListState对象便可,为了能作到可见性和原子操做,因而有了环形桶类BucketCircularArray

class BucketCircularArray implements Iterable<Bucket> {
    // 持有最新的ListState
    private final AtomicReference<ListState> state;
     //...略...
}

注意到BucketCircularArray实现了迭代器接口,这是由于咱们输出给断路器的数据须要计算滑动窗口中的全部桶,因而你能够看到真正的滑动窗口类HystrixRollingNumber有以下属性和方法:

public class HystrixRollingNumber {
    // 环形桶数组
    final BucketCircularArray buckets;
 
    // 获取该事件类型当前滑动窗口的统计值
    public long getRollingSum(HystrixRollingNumberEvent type) {
        Bucket lastBucket = getCurrentBucket();
        if (lastBucket == null)
            return 0;
    
        long sum = 0;
        // BucketCircularArray实现了迭代器接口环形桶数组
        for (Bucket b : buckets) {
            sum += b.getAdder(type).sum();
        }
        return sum;
    }
    //...略...
}

断路器就是经过监控来从HystrixRollingNumber的getRollingSum方法来获取统计值的

到这里断路器和滑动窗口的核心部分已经分析完了,固然里面还有很多细节没有提到,感兴趣的朋友能够去看一下源码。Hystrix中经过RxJava来实现了事件的发布和订阅,因此若是想深刻了解Hystrix,须要熟悉RxJava,而RxJava在服务端的应用没有像客户端那么广,一个缘由是场景的限制,还一个缘由是大多数开发者认为RxJava设计的过于复杂,加上响应式编程模型,有必定的入门门槛。

4、线程池隔离

     不一样的业务线之间选择用线程池隔离,下降互相影响的几率。设置隔离策略为线程池隔离:
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD));
在Hystrix内部,是根据 properties.executionIsolationStrategy().get()这个字段判断隔离级别。如在 getRunObservableDecoratedForMetricsAndErrorHandling这个方法中会先判断是否是线程池隔离,若是是就获取线程池,若是不是则进行信号量隔离的操做。
     若是是线程池隔离,还须要设置线程池的相关参数如:线程池名字andThreadPoolKey , coreSize(核心线程池大小) , KeepAliveTimeMinutes(线程存存活时间),MaxQueueSize(最大队列长度),QueueSizeRejectionThreshold(拒绝执行的阀值)等等。
. andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getCoreSize())
                                    .withKeepAliveTimeMinutes(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getKeepAliveSeconds())
                                    .withMaxQueueSize(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getMaxQueueSize())
                                    .withQueueSizeRejectionThreshold(resourcesManager.getThreadPoolProperties(platformProtocol.getAppId()).getQueueSizeRejectionThreshold()))
threadPoolKey 也是线程池的名字的前缀,默认前缀是 hystrix 。在Hystrix中,核心线程数和最大线程数是一致的,减小线程临时建立和销毁带来的性能开销。线程池的默认参数都在HystrixThreadPoolProperties中,重点讲解一下参数queueSizeRejectionThreshold 和maxQueueSize 。queueSizeRejectionThreshold默认值是5,容许在队列中的等待的任务数量。maxQueueSize默认值是-1,队列大小。若是是Fast Fail 应用,建议使用默认值。线程池饱满后直接拒绝后续的任务,再也不进行等待。代码以下HystrixThreadPool类中:
        @Override
        public boolean isQueueSpaceAvailable() {
            if (queueSize <= 0) {
                // we don't have a queue so we won't look for space but instead
                // let the thread-pool reject or not
                return true;
            } else {
                return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
            }
        }
线程池一旦建立完成,相关参数就不会更改,存放在静态的ConcurrentHashMap中,key是对应的commandKey 。而queueSizeRejectionThreshold是每一个命令都是设置的。
     
     线程池的相关参数都保存在HystrixThreadPool这个类文件中,线程池的建立方法getThreadPool则在HystrixConcurrencyStrategy类文件中。从getThreadPool方法能够看出线程池的名字就是hystrix-threadPoolKey-threadNumber.
@Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
                thread.setDaemon(true);
                return thread;
            }
     
     在HystrixThreadPool实现类的构造方法中,并发HystrixConcurrencyStrategy实例是经过HystrixPlugins获取的,因此能够经过HystrixPlugins设置自定义插件。具体的HystrixPlugins如何使用,会在后面章节中讲解。
 
线程池的建立     
     前面说了,在Hystrix内部大部分类都是单实例,一样ThreadPool也不例外,也是单实例。而且相同commandKey的依赖还必须是使用同一个线程池。这就须要把ThreadPool保存在一个静态的map中,key是commandKey,同时要保证线程安全,Hytstrix使用了ConcurrentHashMap。关于为何不适用HashTable保证线程安全问题的疑问请自行Google。线程池的建立在HystrixThreadPool这个类文件中的内部类Factory中的getInstance方法。
 
/* package */final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
     String key = threadPoolKey.name();

            // this should find it for all but the first time
            HystrixThreadPool previouslyCached = threadPools.get(key);
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize
            synchronized (HystrixThreadPool.class) {
                if (!threadPools.containsKey(key)) {
                    threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
                }
            }
            return threadPools.get(key);
     
线程池的使用
     HystrixCommand类的execute()内部调用了queue() ,queue又调用了父类AbstractCommand的toObservable方法,toObservable方法处理了是否可缓存问题后,交给了getRunObservableDecoratedForMetricsAndErrorHandling方法,这个方法设置了一系列的executionHook以后,交给了getExecutionObservableWithLifecycle,这个方法经过getExecutionObservable()获取了执行器。getExecutionObservable()是个抽象方法,具体实现放在了子类:HystrixCommand和HystrixObservableCommand类中。下面是HystrixCommand类中的getExecutionObservable方法实现:
final protected Observable<R> getExecutionObservable() {
        return Observable.create(new OnSubscribe<R>() {

            @Override
            public void call(Subscriber<? super R> s) {
                try {
                    s.onNext(run());
                    s.onCompleted();
                } catch (Throwable e) {
                    s.onError(e);
                }
            }

        });
    }
在这个Call方法中执行了具体的业务逻辑run() ;

 

 

 

2、模块详解

2.一、建立请求命令

2.1.一、有4种方式

一、同步阻塞方法,其实就是调用了queue().get()
二、异步非阻塞方法,直接返回Future,能够先作本身的事情,作完再.get()
三、热观察,能够被当即执行,若是订阅了那么会从新通知,其实就是调用了toObservable()并内置ReplaySubject,详细能够参考RxJava
四、冷观察,返回一个Observable对象,当调用此接口,还须要本身加入订阅者,才能接受到信息,详细能够参考RxJava

2.1.二、原生模式

基于hystrix的原生接口,也就是继承HystrixCommand或者HystirxObservableCommand。

在《服务容错保护断路器Hystrix之一:入门介绍》中的示例基础上修改以下,

同步方式/异步方式:

package com.dxz.ribbon;

import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

public class ComputeCommand extends HystrixCommand<String> {
    RestTemplate restTemplate;
    String a;
    String b;
    
    protected ComputeCommand(RestTemplate restTemplate, String a, String b) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.restTemplate = restTemplate;
        this.a = a;
        this.b = b;
    }
    
    @Override
    protected String run() throws Exception {
        return restTemplate.getForEntity("http://compute-service/add?a="+a +"&b="+b+"&sn=1", String.class).getBody();
    }
     /**
     * 快速失败后调用函数
     * @return
     */
    @Override
    protected String getFallback(){
        return "404 :)";
    }
}

观察方式:

package com.dxz.ribbon;

import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;

import rx.Observable;
import rx.Subscriber;

public class ComputeObservableCommand extends HystrixObservableCommand<String> {
    RestTemplate restTemplate;
    String a;
    String b;
    
    protected ComputeObservableCommand(RestTemplate restTemplate, String a, String b) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.restTemplate = restTemplate;
        this.a = a;
        this.b = b;
    }
    
    @Override
    protected Observable<String> construct() {
        return Observable.create(new Observable.OnSubscribe<String>() {

            @Override
            public void call(Subscriber<? super String> observer) {
                if(!observer.isUnsubscribed()) {
                    String result = restTemplate.getForEntity("http://compute-service/add?a="+a +"&b="+b+"&sn=1", String.class).getBody();
                    observer.onNext(result);
                    observer.onCompleted();
                }
            }
            
        });
    }

}

调用方法:

package com.dxz.ribbon;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import rx.functions.Action1;

@RestController
public class ConsumerController2 {

    @Autowired
    RestTemplate restTemplate;

    @RequestMapping(value = "/add2", method = RequestMethod.GET)
    public String add(@RequestParam String m) throws InterruptedException, ExecutionException {
        if("s".equals(m)) {
            String result = new ComputeCommand(restTemplate, "1", "2").execute();
            System.out.println("result:="+result);
            return result;    
        } else if("a".equals(m)) {
            Future<String> result = new ComputeCommand(restTemplate, "1", "2").queue();
            System.out.println("result:="+result.get());
            return result.get();    
        } else {
            ComputeObservableCommand command1 = new ComputeObservableCommand(restTemplate, "1","2");
            rx.Observable<String> result = command1.observe();  
            result.subscribe(new Action1<String>() {  
                @Override  
                public void call(String s) {  
                    System.out.println("Command called. Result is:" + s);  
                }
            });  
            return null;
        }
        
    }

}

结果:

 2.1.三、注解模式

在《服务容错保护断路器Hystrix之一:入门介绍》已经展现过。

 

2.二、定义服务降级

有些状况不去实现降级逻辑,以下所示。
执行写操做的命令:当Hystrix命令是用来执行写操做而不是返回一些信息的时候,一般状况下这类操做的返回类型时void或是为空的Observable,实现服务降级的意义不是很大。当写入操做失败的时候,咱们一般只须要通知调用者便可。
执行批处理或离线计算的命令:当Hystrix命令是用来执行批处理程序生成一份报告或是进行任何类型的离线计算时,那么一般这些操做只须要将错误传播给调用者,而后让调用者稍后重试而不是发送给调用者一个静默的降级处理响应。

 

2.三、工做流程图

 

2.四、开关条件

关于断路器打开

·时间窗口内请求次数(限流)

若是在10s内,超过某个阈值的请求量,才会考虑断路(小于这个次数不会被断路)

配置是circuitBreaker.requestVolumeThreshold

默认10s 20次

·失败率

默认失败率超过50%就会被断路

配置是circuitBreaker.errorThresholdPercentage

 

关于断路器关闭

·从新尝试

在必定时间以后,从新尝试请求来决定是否继续打开或者选择关闭断路器

配置是circuitBreaker.sleepWindowInMilliseconds

默认5000ms

 

2.五、关于隔离

bulkhead pattern模式(舱壁模式)

Htstrix使用了bulkhead pattern模式,典型的例子就是线程隔离。

简单解释一下bulkhead pattern模式。通常状况咱们都用一个线程池来管理全部线程,容易形成一个问题,粒度太粗,没法对线程进行分类管理,会致使局部问题影响全局。bulkhead pattern模式在于,采用多个线程池来管理线程,这样使得1个线程池资源出现问题时不会形成另外一个线程池资源问题。尽可能使问题最小化。

如图所示,采用了bulkhead pattern模式的效果

 

 

说完原理说实现,如何针对不一样依赖采用不一样的线程池管理呢

Hystrix给了咱们三种key来用于隔离:

·CommandKey,针对相同的接口通常CommandKey值相同,目的是把HystrixCommand,HystrixCircuitBreaker,HytrixCommandMerics以及其余相关对象关联在一块儿,造成一个原子组。采用原生接口的话,默认值为类名;采用注解形式的话,默认值为方法名。

·CommandGroupKey,对CommandKey分组,用于真正的隔离。相同CommandGroupKey会使用同一个线程池或者信号量。通常状况相同业务功能会使用相同的CommandGroupKey。

·ThreadPoolKey,若是说CommandGroupKey只是逻辑隔离,那么ThreadPoolKey就是物理隔离,当没有设置ThreadPoolKey的时候,线程池或者信号量的划分按照CommandGroupKey,当设置了ThreadPoolKey,那么线程池和信号量的划分就按照ThreadPoolKey来处理,相同ThreadPoolKey采用同一个线程池或者信号量。

 

Coding

原生模式

能够经过HystrixCommand.Setter来自定义配置
HystrixCommandGroupKey.Factory.asKey(""))
HystrixCommandKey.Factory.asKey("")
HystrixThreadPoolKey.Factory.asKey("")

注解模式

能够直接在方法名上添加

@HystrixCommand(groupKey = "", commandKey = "", threadPoolKey = "")

2.六、关于请求缓存

工做流程图

 

优点

·复用性

  这里的复用性指的是代码复用性

·一致性

  也就是常说的幂等性,无论请求几回,获得的结果应该都是同样的

·减小重复工做

  因为请求缓存是在HystrixCommand的construct()或run()运行以前运行,全部能够有效减小线程的使用

适用场景

请求缓存的优点显而易见,可是也不是银弹。

在读少写多的场景就显得不太合适,对于读的请求,须要add缓存。对于增删改的请求,须要把缓存remove。在增长系统资源开销的同时,又很鸡肋。

因此通常适合读多写少的场景。彷佛全部缓存机制都有这个局限性吧。

Coding

原生模式

继承HystrixCommand后,重写getCacheKey()方法,该方法默认返回的是null,也就是不使用请求缓存功能。相同key的请求会使用相同的缓存。
package com.dxz.ribbon;

import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

public class ComputeCommandCache extends HystrixCommand<String> {
    RestTemplate restTemplate;
    String a;
    String b;
    
    protected ComputeCommandCache(RestTemplate restTemplate, String a, String b) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.restTemplate = restTemplate;
        this.a = a;
        this.b = b;
    }
    
    @Override
    protected String run() throws Exception {
        return restTemplate.getForEntity("http://compute-service/add?a="+a +"&b="+b+"&sn=1", String.class).getBody();
    }
    
 @Override protected String getCacheKey() {
        System.out.println("调用getCacheKey");//打印一下何时会触发
        return a + b;
    }
    
     /**
     * 快速失败后调用函数
     * @return
     */
    @Override
    protected String getFallback(){
        return "404 :)";
    }
}

调用类,若是不加HystrixRequestContext.initializeContext();//初始化请求上下文,会报错以下:

报错了:java.util.concurrent.ExecutionException: Observable onError
Caused by: java.lang.IllegalStateException: Request caching is not available. Maybe you need to initialize the HystrixRequestContext?

 

package com.dxz.ribbon;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

import rx.functions.Action1;

@RestController
public class ConsumerControllerCache {

    @Autowired
    RestTemplate restTemplate;

    @RequestMapping(value = "/add3", method = RequestMethod.GET)
    public String add(@RequestParam String m) throws InterruptedException, ExecutionException {
        HystrixRequestContext.initializeContext();//初始化请求上下文
        if("s".equals(m)) {
            String result = new ComputeCommandCache(restTemplate, "1", "2").execute();
            System.out.println("result:="+result);
            return result;    
        } else if("a".equals(m)) {
            Future<String> result = new ComputeCommandCache(restTemplate, "1", "2").queue();
            System.out.println("result:="+result.get());
            return result.get();    
        } else {
            ComputeObservableCommand command1 = new ComputeObservableCommand(restTemplate, "1","2");
            rx.Observable<String> result = command1.observe();  
            result.subscribe(new Action1<String>() {  
                @Override  
                public void call(String s) {  
                    System.out.println("Command called. Result is:" + s);  
                }
            });  
            return null;
        }
        
    }

}

注解模式

在方法名上增长,并添加与cacheKeyMethod字符串相同的方法。二者共用入参。
复制代码
@CacheResult(cacheKeyMethod = "getCacheKey")
public String post2AnotherService(String seed){
}
public String getCacheKey(String seed){

    return seed;

}
复制代码

 

初始化HystrixRequestContext

还有关键的一步,在调用HystrixCommand以前初始化HystrixRequestContext,其实就是建立一个ThreadLocal的副本,共享请求缓存就是经过ThreadLocal来实现的。
HystrixRequestContext context=HystrixRequestContext.initializeContext();
操做完成后context.shutdown();
通常状况能够在过滤器中控制是初始化和关闭整个生命周期
复制代码
//启动HystrixRequestContext
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
    chain.doFilter(req, res);
} finally {
    //关闭HystrixRequestContext
    context.shutdown();
}
复制代码

 

2.七、关于请求合并(Requst Collapsing)

工做流程图

 

上半部分是模拟请求,下半部分是该请求的依赖设置,时间窗口默认是10ms,在这个时间窗口内,全部对于该接口的请求都会被加入队列,而后进行批处理。这样的好处在于,若是短期内对于某个接口有大量请求,那么能够只处理一次就完成全部响应。

 

优点

全局线程合并

在tomcat容器中,全部请求共用一个进程,也就是一个JVM容器,在并发场景下会派生出许多线程,collapsing能够合并整个JVM中的请求线程,这样能够解决不一样使用者同时请求的大量并发问题。

 

局部线程合并

能够合并单个tomcat请求线程,好比在10ms内有10个请求被同一线程处理(这不是像往常同样请求->处理,而是请求->加入请求队列,全部能够快速收集请求),那这些请求能够被合并。

对象建模和代码复杂度

在实际场景下,调用接口取数据的复杂度每每高于数据的复杂度,通俗来讲就是取数据能够变幻无穷的取,而数据就那么几个接口。

collapsing能够帮助你更好的实现你的业务,好比屡次请求合并结果后再广播出去。

 

适用场景

·并发量大接口

当并发量小,一个时间窗口内只有几个或没有请求,那么就白白浪费了请求合并的资源。

·请求耗时接口

时间窗口是固定的,假如一个请求实际耗时10ms,加上固定的时间窗口,最大延迟达到20ms,延迟被提升了100%。若一个请求实际耗时有1s,那么时间窗口的延迟就能够被忽略不计。

 

Coding

原生模式

复制代码
/**
 * 批量返回值类型
 * 返回值类型
 * 请求参数类型
 */
public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {

    private static Logger logger = LoggerFactory.getLogger(CommandCollapserGetValueForKey.class);

    private final Integer key;

    public CommandCollapserGetValueForKey(Integer key) {
        this.key = key;
    }

    /**
     *获取请求参数
     */
    public Integer getRequestArgument() {
        return key;
    }

    /**
     *合并请求产生批量命令的具体实现
     */
    protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
        return new BatchCommand(requests);
    }

    /**
     *批量命令结果返回后的处理,须要实现将批量结果拆分并传递给合并前的各原子请求命令的逻辑中
     */
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
        int count = 0;
        //请求响应一一对应
        for (CollapsedRequest<String, Integer> request : requests) {
            request.setResponse(batchResponse.get(count++));
        }
    }

    private static final class BatchCommand extends HystrixCommand<List<String>> {
        private static Logger logger = LoggerFactory.getLogger(CommandCollapserGetValueForKey.BatchCommand.class);

        private final Collection<CollapsedRequest<String, Integer>> requests;

        private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
                super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
            this.requests = requests;
        }

        @Override
        protected List<String> run() {
            ArrayList<String> response = new ArrayList<String>();
            // 处理每一个请求,返回结果
            for (CollapsedRequest<String, Integer> request : requests) {
                logger.info("request.getArgument()={}",request.getArgument());
                // artificial response for each argument received in the batch
                response.add("ValueForKey: " + request.getArgument());
            }
            return response;
        }
    }
}
复制代码
调用的时候只须要new CommandCollapserGetValueForKey(1).queue()
在同一个时间窗口内,批处理的函数调用顺序为
getRequestArgument()->createCommand()->mapResponseToRequests()

//官方配置文档

https://github.com/Netflix/Hystrix/wiki/Configuration#circuitBreaker.sleepWindowInMilliseconds

相关文章
相关标签/搜索