Hystrix执行原理

前奏

Hystrix的常规使用姿式java

@Test
    public void test_run(){
        String s = new CommandHelloWorld("Bob").execute();
        System.out.println(s);
    }

咱们的command在new的时候发生了什么?execute()是如何执行的?execute执行失败或者超时如何fallback?segmentfault

1、PREPARE 初始化

当咱们new XXCommand()的时候,大部分的工做都是在 AbstractCommand完成缓存

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
        HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
        HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
        HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

    this.commandGroup = initGroupKey(group);
    this.commandKey = initCommandKey(key, getClass());
    this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
    this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
    this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
    this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
    this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

    //Strategies from plugins
    this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
    this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
    this.executionHook = initExecutionHook(executionHook);

    this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
    this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

    /* fallback semaphore override if applicable */
    this.fallbackSemaphoreOverride = fallbackSemaphore;

    /* execution semaphore override if applicable */
    this.executionSemaphoreOverride = executionSemaphore;
}

能够很清晰的看到,这里面在进行command配置装载、线程池配置装载及线程池的建立、指标搜集器、熔断器的初始化等等。app

//HystrixCommandMetrics
ConcurrentHashMap<String, HystrixCommandMetrics> metrics = new ConcurrentHashMap<String, HystrixCommandMetrics>();

//HystrixThreadPoolDefault
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();

//com.netflix.hystrix.HystrixCircuitBreaker.Factory
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

除HystrixCommand每次都须要从新创建,其它基本都以commandKey维护着配置,熔断器,指标的单例而线程池则以threadkey进场存储。ide

咱们能够了了解下Hystrix的线程池如何管理
建立线程调用 HystrixThreadPool.Factory.getInstanceui

static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
    // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
    String key = threadPoolKey.name();

    // this should find it for all but the first time
    HystrixThreadPool previouslyCached = threadPools.get(key);
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // if we get here this is the first time so we need to initialize
    synchronized (HystrixThreadPool.class) {
        if (!threadPools.containsKey(key)) {
            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
        }
    }
    return threadPools.get(key);
}

从缓存中以threadPoolKey获取线程池,获取不到则 调用new HystrixThreadPoolDefault新建this

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
    this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
    HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    this.queueSize = properties.maxQueueSize().get();

    this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
            concurrencyStrategy.getThreadPool(threadPoolKey, properties),
            properties);
    this.threadPool = this.metrics.getThreadPool();
    this.queue = this.threadPool.getQueue();

    /* strategy: HystrixMetricsPublisherThreadPool */
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}

注意spa

this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,concurrencyStrategy.getThreadPool(threadPoolKey, properties),properties);

其中 concurrencyStrategy.getThreadPool,HystrixConcurrencyStrategy就是hystrix的线程建立策略者.net

真正的建立线程执行
HystrixConcurrencyStrategy#getThreadPool线程

public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
    .....各类配置,此处代码省略......

    if (allowMaximumSizeToDivergeFromCoreSize) {
        final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
        if (dynamicCoreSize > dynamicMaximumSize) {
            logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                    dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        } else {
            return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
        }
    } else {
        return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
    }
}

这里调用java JUC原生的 ThreadPoolExecutor建立线程

2、Observable 大串烧

Hystrix的执行利用RxJava,组合了不少的Observable,造成一个Observable,和传统的调用链相比更加简洁。

3、各色Observable显神通

3.1.command 状态位

  1. toObservable 第一个observable,在下一个chain以前,会更改HystrixCommand状态位 OBSERVABLE_CHAIN_CREATED
  2. toObservable doOnTerminate,探测到terminate时,会将HystrixCommand更改成 TERMINAL
  3. executeCommandWithSpecifiedIsolation在开始执行的时候会更改HystrixCommand更改成 USER_CODE_EXECUTED
  4. toObservable doOnUnsubscribe,探测到terminate时,会将HystrixCommand更改成 UNSUBSCRIBED
3.2.executeCommandWithSpecifiedIsolation

分配执行线程,维护线程状态

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                .....省略干扰代码.....
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }

                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                        // the command timed out in the wrapping thread so we will return immediately
                        // and not increment any of the counters below or other such logic
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {

                    try {
                       .....省略干扰代码.....

                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                } else {
                    //command has already been unsubscribed, so return immediately
                    return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                }
            }
        }).doOnTerminate(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                    //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                }
                //if it was unsubscribed, then other cleanup handled it
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                    //if it was never started and was cancelled, then no need to clean up
                }
                //if it was terminal, then other cleanup handled it
            }
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            @Override
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } else {
        .....省略干扰代码.....
    }
}

具体逻辑
1.判断隔离策略,若是是Semaphore 信号量则在当前线程上执行,不然进入线程分配逻辑
2.更改HystrixCommand的状态 USER_CODE_EXECUTED
3.判断HystrixCommand超时状态,若是已经超时则抛出异常
4.更改当前command的线程执行状态为 STARTED
5.调用 getUserExecutionObservable 执行具体逻辑
6.doOnTerminate 当Observale执行完毕后(HystrixCommand可能失败也可能执行成功),此时的线程状态可能有两种分别是 STARTEDNOT_USING_THREAD , 而后更改线程状态为 TERMINAL
7.doOnUnsubscribe 当Observable被取消订阅,更改线程状态为 TERMINAL
8.subscribeOn 指定scheduler,这里Hystrix实现了本身的scheduler,在scheduler的worker指定线程池,在配置线程以前会从新加载线程池配置(这里是Rxjava的东西,暂时你们能够粗略的认为这里就是指定线程池,而后把要执行的任务扔到这个线程池里)

@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
    touchConfig();
    return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

// allow us to change things via fast-properties by setting it each time
private void touchConfig() {
    final int dynamicCoreSize = properties.coreSize().get();
    final int configuredMaximumSize = properties.maximumSize().get();
    int dynamicMaximumSize = properties.actualMaximumSize();
    final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
    boolean maxTooLow = false;

    if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
        //if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum
        dynamicMaximumSize = dynamicCoreSize;
        maxTooLow = true;
    }

    // In JDK 6, setCorePoolSize and setMaximumPoolSize will execute a lock operation. Avoid them if the pool size is not changed.
    if (threadPool.getCorePoolSize() != dynamicCoreSize || (allowSizesToDiverge && threadPool.getMaximumPoolSize() != dynamicMaximumSize)) {
        if (maxTooLow) {
            logger.error("Hystrix ThreadPool configuration for : " + metrics.getThreadPoolKey().name() + " is trying to set coreSize = " +
                    dynamicCoreSize + " and maximumSize = " + configuredMaximumSize + ".  Maximum size will be set to " +
                    dynamicMaximumSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
        }
        threadPool.setCorePoolSize(dynamicCoreSize);
        threadPool.setMaximumPoolSize(dynamicMaximumSize);
    }

    threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
}

touchConfig 执行具体的线程池参数调整。

从上面的过程也能发现,该observable也是维护线程状态的地方,线程的状态变动见下图

3.3.getUserExecutionObservable

执行具体业务逻辑

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;

    try {
        userObservable = getExecutionObservable();
    } catch (Throwable ex) {
        // the run() method is a user provided implementation so can throw instead of using Observable.onError
        // so we catch it here and turn it into Observable.error
        userObservable = Observable.error(ex);
    }

    return userObservable
            .lift(new ExecutionHookApplication(_cmd))
            .lift(new DeprecatedOnRunHookApplication(_cmd));
}

userObservable = getExecutionObservable(); 由HystrixCommand本身实现

//HystrixCommand
final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    }).doOnSubscribe(new Action0() {
        @Override
        public void call() {
            // Save thread on which we get subscribed so that we can interrupt it later if needed
            executionThread.set(Thread.currentThread());
        }
    });
}

这里看到 run()应该就明白了,就是咱们本身的业务代码 CommandHelloWorld去实现的。

3.4.getFallbackOrThrowException

当executeCommandWithSpecifiedIsolation探测到异常时触发该Observable。getFallbackOrThrowException里具体fallback执行看
executeCommandAndObserve。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    .....省略干扰代码.....
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        .....省略干扰代码.....
    };

    .....省略干扰代码.....

    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

doErrorResumeNext 会触发下一个 handleFallback。

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    ....省略干扰代码....

    if (isUnrecoverable(originalException)) {
        ....省略干扰代码....
    } else {
        ....省略干扰代码....

        if (properties.fallbackEnabled().get()) {
        
            ....省略干扰代码....

            Observable<R> fallbackExecutionChain;

            // acquire a permit
            if (fallbackSemaphore.tryAcquire()) {
                try {
                    if (isFallbackUserDefined()) {
                        executionHook.onFallbackStart(this);
                        fallbackExecutionChain = getFallbackObservable();
                    } else {
                        //same logic as above without the hook invocation
                        fallbackExecutionChain = getFallbackObservable();
                    }
                } catch (Throwable ex) {
                    //If hook or user-fallback throws, then use that as the result of the fallback lookup
                    fallbackExecutionChain = Observable.error(ex);
                }

                return fallbackExecutionChain
                        .doOnEach(setRequestContext)
                        .lift(new FallbackHookApplication(_cmd))
                        .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                        .doOnNext(markFallbackEmit)
                        .doOnCompleted(markFallbackCompleted)
                        .onErrorResumeNext(handleFallbackError)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } else {
               return handleFallbackRejectionByEmittingError();
            }
        } else {
            return handleFallbackDisabledByEmittingError(originalException, failureType, message);
        }
    }
}

这里优先几个步骤
1.判断异常是不是能走fallback处理,不能则抛出HystrixRuntimeException
2.判断配置是否开启容许fallback,开启,则进入 getFallbackObservable(),而该方法具体有HystrixCommand实现,调用的则是用户的Command的fallback方法,若是调用方没有覆盖该方法,则会执行HystrixCommand的fallback方法,抛出未定义fallback方法的异常

protected R getFallback() {
    throw new UnsupportedOperationException("No fallback available.");
 }

@Override
final protected Observable<R> getFallbackObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
               //调用方 fallback逻辑
                return Observable.just(getFallback());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    });
}

后续系列文章,欢迎参阅
Hystrix熔断器执行机制
Hystrix超时实现机制

相关文章
相关标签/搜索