在微服务架构中,咱们将系统拆分为不少个服务,各个服务之间经过注册与订阅的方式相互依赖,因为各个服务都是在各自的进程中运行,就有可能因为网络缘由或者服务自身的问题致使调用故障或延迟,随着服务的积压,可能会致使服务崩溃。为了解决这一系列的问题,断路器等一系列服务保护机制出现了。java
断路器自己是一种开关保护机制,用于在电路上保护线路过载,当线路中有电器发生短路时,断路器可以及时切断故障电路,防止发生过载、发热甚至起火等严重后果。web
在分布式架构中,断路器模式的做用也是相似的。spring
针对上述问题,Spring Cloud Hystrix 实现了断路器、线路隔离等一系列服务保护功能。它也是基于 Netflix 的开源框架 Hystrix 实现的,该框架的目标在于经过控制那些访问远程系统、服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力。Hystrix 具有服务降级、服务熔断、线程和信号隔离、请求缓存、请求合并以及服务监控等强大功能。apache
在开始实现断路器以前,先用以前实现的一些内容做为基础,构建一个以下图所示的服务调用关系。编程
须要启动的工程有以下一些:缓存
在未加入断路器以前,关闭8081的实例,发送 GET 请求到 http://localhost:3333/ribbon-consumer ,能够获取下面的输入。网络
下面引入 Spring Cloud Hystrix。架构
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency>
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.context.annotation.Bean; import org.springframework.web.client.RestTemplate; @EnableCircuitBreaker @EnableDiscoveryClient @SpringBootApplication public class DemoApplication { @Bean @LoadBalanced RestTemplate restTemplate(){ return new RestTemplate(); } public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
注:此处还可使用 Spring Cloud 应用中的 @SpringCloudApplication 注解来修饰主类,该注解的具体定义以下。能够看到,该注解中包含了上述所引用的三个注解,这意味着一个 Spring Cloud 标准应用应包含服务发现以及断路器。app
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.springframework.cloud.client; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @SpringBootApplication @EnableDiscoveryClient @EnableCircuitBreaker public @interface SpringCloudApplication { }
package com.example.demo.web; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.client.RestTemplate; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ @Service public class HelloService { @Autowired RestTemplate restTemplate; @HystrixCommand(fallbackMethod = "helloFallback") public String helloService(){ return restTemplate.getForEntity("http://hello-service/index", String.class).getBody(); } public String helloFallback(){ return "error"; } }
package com.example.demo.web; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; /** * @author lxx * @version V1.0.0 * @date 2017-8-9 */ @RestController public class ConsumerController { @Autowired HelloService helloService; @RequestMapping(value = "ribbon-consumer", method = RequestMethod.GET) public String helloConsumer(){ return helloService.helloService(); } }
下面,对断路器实现的服务回调逻辑进行验证,从新启动以前关闭的 2221 端口的 hello-service,确保此时服务注册中心、两个 hello-service 和 ribbon-consumer 均已启动,再次访问 http://localhost:3333/ribbon-consumer 能够轮询两个 hello-serive 并返回一些文字信息。此时断开其中任意一个端口的 hello-service,再次访问,当轮询到关闭的端口服务时,输出内容为 error ,再也不是以前的提示信息。框架
除了经过断开具体的服务实例来模拟某个节点没法访问的状况以外,还能够模拟一下服务阻塞(长时间未响应)的状况。下面对hello-serive 的 /index 接口作一些修改,具体以下:
package com.example.demo.web; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Random; /** * @author lxx * @version V1.0.0 * @date 2017-8-9 */ @RestController public class HelloController { private final Logger logger = Logger.getLogger(getClass()); @Autowired private DiscoveryClient client; @RequestMapping(value = "/index") public String index(){ ServiceInstance instance = client.getLocalServiceInstance(); // 让处理线程等待几秒钟 int sleepTime = new Random().nextInt(3000); logger.info("sleepTime:"+sleepTime); try { Thread.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("/hello:host:"+instance.getHost()+" port:"+instance.getPort() +" service_id:"+instance.getServiceId()); return "hello world!"; } }
经过Thread.sleep 函数可以让 /index 接口的处理线程不是立刻返回内容,而是在阻塞几秒后才返回内容。因为 Hystrix 默认超时时间为 2000 毫秒,因此这里采用了 0 至 3000 的随机数以让处理过程有必定几率发生超时来触发断路器。为了更精确的观察断路器的触发,在消费者调用函数中作一些时间记录,具体以下:
package com.example.demo.web; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.client.RestTemplate; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ @Service public class HelloService { @Autowired RestTemplate restTemplate; @HystrixCommand(fallbackMethod = "helloFallback") public String helloService(){ long beginTime = System.currentTimeMillis(); String body = restTemplate.getForEntity("http://hello-service/index", String.class).getBody(); long endTime = System.currentTimeMillis(); System.out.println("Spend Time : "+ (endTime - beginTime)); return body; } public String helloFallback(){ return "error"; } }
首先,建立一个 HystrixCommand 或 HystrixObservableCommand 对象,用来表示对依赖服务的操做请求,同时传递全部须要的参数。从其命名中咱们就能知道它采用了“命令模式” 来实现服务调用操做的封装。而这两个 Command 对象分别针对不一样的应用场景。
命令模式,未来自客户端的请求封装成一个对象,从而让你可使用不一样的请求对客户端进行参数化。它能够被用于实现“行为请求者” 与 “行为实现者” 的解耦,以便使二者能够适应变化。下面的示例是对命令模式的简单实现:
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ // 接收者 public class Receiver { public void active(){ //真正的业务逻辑 System.out.println("测试命令模式"); } }
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ //抽象命令 public interface Command { void excute(); }
package com.example.demo.command; import org.springframework.beans.factory.annotation.Autowired; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ //具体命令实现 public class CommandImpl implements Command { private Receiver receiver; public CommandImpl(Receiver receiver) { this.receiver = receiver; } @Override public void excute() { this.receiver.active(); } }
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ //客户端调用 public class Invoker { private Command command; public void setCommand(Command command) { this.command = command; } public void active (){ command.excute(); } }
package com.example.demo.command; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ public class Client { public static void main(String[] args) { Receiver receiver = new Receiver(); Command command = new CommandImpl(receiver); Invoker invoker = new Invoker(); invoker.setCommand(command); invoker.active(); //客户端经过调用者来执行命令 } }
从代码中,能够看到这样几个对象。
从上面的示例中,咱们能够看到,调用者 Invoker 与操做者 Receiver 经过 Command 命令接口实现了解耦。对于调用者来讲,咱们能够为其注入多个命令操做,调用者只需在须要的时候直接调用便可,而不须要知道这些操做命令实际是如何实现的。而在这里所提到的 HystrixCommand 和 HystrixObservableCommand 则是在 Hystrix 中对 Command 的进一步抽象定义。
2. 命令执行
命令执行方式一共有4种,而 Hystrix 在执行时会根据建立的Command对象以及具体的状况来选择一种执行。其中 HystrixCommand 实现了下面两个执行方式。
R execute();
Future<R> queue();
而 HystrixObservableCommand 实现了另外两种执行方式。
Observable<R> observe();
Observable<R> toObservable();
在 Hystrix 的底层实现中大量使用了 RxJava ,为了更容易的理解后续内容,在这里对 RxJava 的观察者-订阅者模式作一个简单的入门介绍。
上面提到的 Observable 对象就是 RxJava 中的核心内容之一,能够理解为 “事件源” 或者 “被观察者”,与其对应的 Subscriber 对象,能够理解为 “订阅者” 或者 “观察者”。这两个对象是 RxJava 响应式编程的重要组成部分。
下面经过一个简单的例子来直观理解一下 Observable 与 Subscribers:
package com.example.demo.Observable_Subsciber; import rx.Observable; import rx.Subscriber; /** * @author lxx * @version V1.0.0 * @date 2017-8-16 */ public class Obs_Subs { public static void main(String[] args) { //建立事件源 Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava "); subscriber.onNext("I'm XX"); subscriber.onCompleted(); } }); //建立订阅者 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(String s) { } }; observable.subscribe(subscriber); } }
在该示例中,建立了一个简单的事件源 observable,一个对事件传递内容输出的订阅者 subscriber ,经过 observable.subscribe(subscriber) 来触发事件的发布。
在这里咱们对于事件源 observable 提到了两个不一样的概念:Hot Observable 和 Cold Observable ,分别对应了上面的 command.observe() 和 command.toObservable() 的返回对象。其中 HotObservable,不论 “事件源” 是否有 “订阅者” ,都会在建立后对事件进行发布,因此对于 Hot Observable 的每个 “订阅者” 都有多是从 “事件源” 的中途开始的,并可能只是看到了整个操做的局部过程。而 Cold Observable 在没有 “订阅者” 的时候并不会发布事件,而是进行等待,直到有 “订阅者” 以后才发布事件,因此对于 Cold Observable 的订阅者,它能够保证从一开始看到整个操做的所有过程。
3. 结果是否被缓存
若当前命令的请求缓存功能是被启用的,而且该命令缓存命中,那么缓存的结果会当即以 Observable 对象的形式返回。
4. 断路器是否打开
在命令结果没有缓存命中的时候,Hystrix 在执行命令前须要检查断路器是否为打开状态:
5. 线程池 / 请求队列 / 信息量是否占满
若是与命令相关的线程池 / 请求队列 / 信息量已经占满,那么 Hystrix 不会执行命令,跳转到 fallback 处理逻辑(对应下面第8步)。
注意:此处的线程池并不是容器的线程池,而是每一个依赖服务的专有线程池。Hystrix 为了保证不会由于某个依赖服务的问题影响到其余依赖服务而采用了 “舱壁模式” 来隔离每一个依赖的服务。
6. HystrixObservableCommand.construct() 或 HystrixCommand.run()
Hystrix 会根据编写的方法来决定采起什么样的方式去请求依赖服务。
若是 run() 或 construct() 方法的执行时间超过了命令设置的超时阈值,当前处理线程会抛出 TimeoutException。这种状况下,也会跳转到 fallback 处理逻辑(第8步)。
7. 计算断路器的健康度
Hystrix 会将 “成功”、“失败”、“拒绝”、“超时” 等信息报告给断路器,而断路器会维护一组计数器来统计这些数据。
断路器会使用这些统计数据来决定是否要将断路器打开,来对某个依赖服务的请求进行 “熔断 / 短路”,直到恢复期结束。
8. fallback 处理
当命令执行失败的时候,Hystrix 会进入 fallback 尝试回退处理,咱们一般也称为 “服务降级”。下面就是可以引起服务降级处理的几种状况:
九、返回成功的响应
HystrixCircuitBreaker 的定义:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package com.netflix.hystrix; import com.netflix.hystrix.HystrixCommandGroupKey; import com.netflix.hystrix.HystrixCommandKey; import com.netflix.hystrix.HystrixCommandMetrics; import com.netflix.hystrix.HystrixCommandProperties; import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import rx.Subscriber; import rx.Subscription; public interface HystrixCircuitBreaker { boolean allowRequest(); boolean isOpen(); void markSuccess(); void markNonSuccess(); boolean attemptExecution(); public static class NoOpCircuitBreaker implements HystrixCircuitBreaker { public NoOpCircuitBreaker() { } public boolean allowRequest() { return true; } public boolean isOpen() { return false; } public void markSuccess() { } public void markNonSuccess() { } public boolean attemptExecution() { return true; } } public static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; private final AtomicReference<HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status> status; private final AtomicLong circuitOpened; private final AtomicReference<Subscription> activeSubscription; protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.status = new AtomicReference(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED); this.circuitOpened = new AtomicLong(-1L); this.activeSubscription = new AtomicReference((Object)null); this.properties = properties; this.metrics = metrics; Subscription s = this.subscribeToStream(); this.activeSubscription.set(s); } private Subscription subscribeToStream() { return this.metrics.getHealthCountsStream().observe().subscribe(new Subscriber() { public void onCompleted() { } public void onError(Throwable e) { } public void onNext(HealthCounts hc) { if(hc.getTotalRequests() >= (long)((Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerRequestVolumeThreshold().get()).intValue() && hc.getErrorPercentage() >= ((Integer)HystrixCircuitBreakerImpl.this.properties.circuitBreakerErrorThresholdPercentage().get()).intValue() && HystrixCircuitBreakerImpl.this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) { HystrixCircuitBreakerImpl.this.circuitOpened.set(System.currentTimeMillis()); } } }); } public void markSuccess() { if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.CLOSED)) { this.metrics.resetStream(); Subscription previousSubscription = (Subscription)this.activeSubscription.get(); if(previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = this.subscribeToStream(); this.activeSubscription.set(newSubscription); this.circuitOpened.set(-1L); } } public void markNonSuccess() { if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) { this.circuitOpened.set(System.currentTimeMillis()); } } public boolean isOpen() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?true:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?false:this.circuitOpened.get() >= 0L); } public boolean allowRequest() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(((HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status)this.status.get()).equals(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN)?false:this.isAfterSleepWindow()))); } private boolean isAfterSleepWindow() { long circuitOpenTime = this.circuitOpened.get(); long currentTime = System.currentTimeMillis(); long sleepWindowTime = (long)((Integer)this.properties.circuitBreakerSleepWindowInMilliseconds().get()).intValue(); return currentTime > circuitOpenTime + sleepWindowTime; } public boolean attemptExecution() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(this.isAfterSleepWindow()?this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN):false))); } static enum Status { CLOSED, OPEN, HALF_OPEN; private Status() { } } } public static class Factory { private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap(); public Factory() { } public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { HystrixCircuitBreaker previouslyCached = (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()); if(previouslyCached != null) { return previouslyCached; } else { HystrixCircuitBreaker cbForCommand = (HystrixCircuitBreaker)circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreaker.HystrixCircuitBreakerImpl(key, group, properties, metrics)); return cbForCommand == null?(HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()):cbForCommand; } } public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) { return (HystrixCircuitBreaker)circuitBreakersByCommand.get(key.name()); } static void reset() { circuitBreakersByCommand.clear(); } } }
主要定义了三个断路器的抽象方法。
另外还有三个静态类。
HystrixCircuitBreakerImpl 的各个实现方法以下:
public boolean isOpen() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?true:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?false:this.circuitOpened.get() >= 0L); }
public boolean allowRequest() { return ((Boolean)this.properties.circuitBreakerForceOpen().get()).booleanValue()?false:(((Boolean)this.properties.circuitBreakerForceClosed().get()).booleanValue()?true:(this.circuitOpened.get() == -1L?true:(((HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status)this.status.get()).equals(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN)?false:this.isAfterSleepWindow()))); }
public void markNonSuccess() { if(this.status.compareAndSet(HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.HALF_OPEN, HystrixCircuitBreaker.HystrixCircuitBreakerImpl.Status.OPEN)) { this.circuitOpened.set(System.currentTimeMillis()); } }
Hystrix 使用 “舱壁模式” 实现线程池的隔离,它为每个依赖服务建立一个独立的线程池,就算某个依赖服务出现延迟太高的状况,也不会拖慢其余的依赖服务。
Hystrix 命令就是咱们以前所说的 HystrixCommand,它用来封装具体的依赖服务调用逻辑。
能够经过继承的方式来实现,好比: