本文主要研究一下hystrix的timeout处理java
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.javagit
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> { final AbstractCommand<R> originalCommand; public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) { this.originalCommand = originalCommand; } @Override public Subscriber<? super R> call(final Subscriber<? super R> child) { final CompositeSubscription s = new CompositeSubscription(); // if the child unsubscribes we unsubscribe our parent as well child.add(s); //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread(); TimerListener listener = new TimerListener() { @Override public void tick() { // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath // otherwise it means we lost a race and the run() execution completed or did not start if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) { // report timeout failure originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey); // shut down the original request s.unsubscribe(); final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); timeoutRunnable.run(); //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout } } @Override public int getIntervalTimeInMilliseconds() { return originalCommand.properties.executionTimeoutInMilliseconds().get(); } }; final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener); // set externally so execute/queue can see this originalCommand.timeoutTimer.set(tl); /** * If this subscriber receives values it means the parent succeeded/completed */ Subscriber<R> parent = new Subscriber<R>() { @Override public void onCompleted() { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onCompleted(); } } @Override public void onError(Throwable e) { if (isNotTimedOut()) { // stop timer and pass notification through tl.clear(); child.onError(e); } } @Override public void onNext(R v) { if (isNotTimedOut()) { child.onNext(v); } } private boolean isNotTimedOut() { // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED || originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED); } }; // if s is unsubscribed we want to unsubscribe the parent s.add(parent); return parent; } }
这里有个timerListener去将isCommandTimedOut属性从TimedOutStatus.NOT_EXECUTED改成TimedOutStatus.TIMED_OUT
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() { @Override public void run() { child.onError(new HystrixTimeoutException()); } }); timeoutRunnable.run();
这里若是设置超时状态成功的话,则onError抛出HystrixTimeoutException异常。
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/util/HystrixTimer.javagithub
/** * Add a {@link TimerListener} that will be executed until it is garbage collected or removed by clearing the returned {@link Reference}. * <p> * NOTE: It is the responsibility of code that adds a listener via this method to clear this listener when completed. * <p> * <blockquote> * * <pre> {@code * // add a TimerListener * Reference<TimerListener> listener = HystrixTimer.getInstance().addTimerListener(listenerImpl); * * // sometime later, often in a thread shutdown, request cleanup, servlet filter or something similar the listener must be shutdown via the clear() method * listener.clear(); * }</pre> * </blockquote> * * * @param listener * TimerListener implementation that will be triggered according to its <code>getIntervalTimeInMilliseconds()</code> method implementation. * @return reference to the TimerListener that allows cleanup via the <code>clear()</code> method */ public Reference<TimerListener> addTimerListener(final TimerListener listener) { startThreadIfNeeded(); // add the listener Runnable r = new Runnable() { @Override public void run() { try { listener.tick(); } catch (Exception e) { logger.error("Failed while ticking TimerListener", e); } } }; ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS); return new TimerReference(listener, f); }
这个TimerListener是经过ScheduledThreadPoolExecutor的scheduleAtFixedRate来调度的
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/AbstractCommand.javaide
private void cleanUpAfterResponseFromCache(boolean commandExecutionStarted) { Reference<TimerListener> tl = timeoutTimer.get(); if (tl != null) { tl.clear(); } final long latency = System.currentTimeMillis() - commandStartTimestamp; executionResult = executionResult .addEvent(-1, HystrixEventType.RESPONSE_FROM_CACHE) .markUserThreadCompletion(latency) .setNotExecutedInThread(); ExecutionResult cacheOnlyForMetrics = ExecutionResult.from(HystrixEventType.RESPONSE_FROM_CACHE) .markUserThreadCompletion(latency); metrics.markCommandDone(cacheOnlyForMetrics, commandKey, threadPoolKey, commandExecutionStarted); eventNotifier.markEvent(HystrixEventType.RESPONSE_FROM_CACHE, commandKey); } private void handleCommandEnd(boolean commandExecutionStarted) { Reference<TimerListener> tl = timeoutTimer.get(); if (tl != null) { tl.clear(); } long userThreadLatency = System.currentTimeMillis() - commandStartTimestamp; executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency); if (executionResultAtTimeOfCancellation == null) { metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted); } else { metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted); } if (endCurrentThreadExecutingCommand != null) { endCurrentThreadExecutingCommand.call(); } }
cleanUpAfterResponseFromCache以及handleCommandEnd会清理掉这个timeoutTimer
hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/util/HystrixTimer.javathis
private static class TimerReference extends SoftReference<TimerListener> { private final ScheduledFuture<?> f; TimerReference(TimerListener referent, ScheduledFuture<?> f) { super(referent); this.f = f; } @Override public void clear() { super.clear(); // stop this ScheduledFuture from any further executions f.cancel(false); } }
TimerReference的clear方法里头,除了调用父类的clear方法外,还调用了ScheduledFuture的cancel(false)方法,这样子来取消掉线程的调度
hystrix的timeout处理是经过添加一个TimeoutListener来进行调度处理的,调度是采用线程池的scheduleAtFixedRate方式调度的(executionTimeoutInMilliseconds以后执行
),调度执行的是listener的tick方法。该方法会去设置isCommandTimedOut,从TimedOutStatus.NOT_EXECUTED改成TimedOutStatus.TIMED_OUT,若是成功则触发timeoutRunnable方法,抛出HystrixTimeoutException异常。线程