// 熔断器的建立 class Factory { // String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly) private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>(); public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { // 先从缓存中获取,key是commandkey,因此说熔断器是否共用取决于commandkey是不是一个 HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name()); if (previouslyCached != null) { return previouslyCached; } //使用concurrentHashMap 保证线程安全 HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics)); if (cbForCommand == null) { return circuitBreakersByCommand.get(key.name()); } else { return cbForCommand; } } public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) { return circuitBreakersByCommand.get(key.name()); } /* package */static void reset() { circuitBreakersByCommand.clear(); } } //熔断器主要提供的功能 public interface HystrixCircuitBreaker { //判断某个请求是否放行,而且半开的逻辑也在这里处理(当熔断器开的时候,放行一部分请求) boolean allowRequest(); boolean isOpen(); // 熔断器半开状态时,请求成功会调用该方法 void markSuccess(); // 熔断器半开状态时,请求失败会调用该方法 void markNonSuccess(); // 尝试请求的时候,会调用该方法 boolean attemptExecution(); }
熔断器的具体实现缓存
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; // 熔断器的三种状态:关,开,半开 enum Status { CLOSED, OPEN, HALF_OPEN; } // 初始是关 private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED); private final AtomicLong circuitOpened = new AtomicLong(-1); //熔断器打开时的系统时间 // Suscription 是RxJava中用来取消订阅关系的,该接口提供两个方法:一个是取消订阅,一个是查看当前是否还在订阅中 private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur Subscription s = subscribeToStream(); activeSubscription.set(s); } //订阅请求统计的Observable // Observable.suscribe 该方法会返回一个Subscription,即传参中的Subscriber订阅了Observable private Subscription subscribeToStream() { return metrics.getHealthCountsStream() .observe() .subscribe(new Subscriber<HealthCounts>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(HealthCounts hc) { if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // if it was half-open, we need to wait for a successful command execution // if it was open, we need to wait for sleep window to elapse } else { if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { //we are not past the minimum error threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // if it was half-open, we need to wait for a successful command execution // if it was open, we need to wait for sleep window to elapse } else { // our failure rate is too high, we need to set the state to OPEN if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } }); } @Override public void markSuccess() { //当前若是是半开状态,在请求成功后改为关闭状态,关闭熔断器 if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { metrics.resetStream(); Subscription previousSubscription = activeSubscription.get(); if (previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = subscribeToStream(); activeSubscription.set(newSubscription); circuitOpened.set(-1L); } } @Override public void markNonSuccess() { // 请求失败,且当前状态是半开,状态更新为打开 if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { //从新记录打开的时间 circuitOpened.set(System.currentTimeMillis()); } } @Override public boolean isOpen() { //强制打开熔断 if (properties.circuitBreakerForceOpen().get()) { return true; } //强制关闭熔断 if (properties.circuitBreakerForceClosed().get()) { return false; } //circuitOpened大于等于0,熔断器打开,小于0,熔断器关闭 //初始值-1,关闭熔断器 //打开的时机:1上面方法markNonSuccess(半开-开),2 初始化HystrixCircuirBreakerImpl中,上面的subcribeToStream() return circuitOpened.get() >= 0; } @Override public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } //初始为-1的时候,熔断器关,容许请求进来 if (circuitOpened.get() == -1) { return true; } else { //半开的时候拒绝请求? if (status.get().equals(Status.HALF_OPEN)) { return false; } else { // 熔断器当前是开,且若是当前时间已经到了能够再次尝试的时间,容许请求过来 return isAfterSleepWindow(); } } } private boolean isAfterSleepWindow() { final long circuitOpenTime = circuitOpened.get(); final long currentTime = System.currentTimeMillis(); // 熔断器打开后,拒绝请求多长时间后再开始尝试执行,可配置,默认5000ms final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); //判断当前时间是否已通过了窗口睡眠时间:是否能够开始尝试执行请求 return currentTime > circuitOpenTime + sleepWindowTime; } @Override public boolean attemptExecution() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { return true; } else { if (isAfterSleepWindow()) { if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { return true; } else { return false; } } else { return false; } } } }