专栏系列文章:SpringCloud系列专栏java
系列文章:web
SpringCloud 源码系列(1)— 注册中心Eureka 之 启动初始化spring
SpringCloud 源码系列(2)— 注册中心Eureka 之 服务注册、续约安全
SpringCloud 源码系列(3)— 注册中心Eureka 之 抓取注册表markdown
SpringCloud 源码系列(4)— 注册中心Eureka 之 服务下线、故障、自我保护机制并发
SpringCloud 源码系列(5)— 注册中心Eureka 之 EurekaServer集群app
SpringCloud 源码系列(6)— 注册中心Eureka 之 总结篇负载均衡
SpringCloud 源码系列(7)— 负载均衡Ribbon 之 RestTemplateide
SpringCloud 源码系列(8)— 负载均衡Ribbon 之 核心原理微服务
SpringCloud 源码系列(9)— 负载均衡Ribbon 之 核心组件与配置
SpringCloud 源码系列(10)— 负载均衡Ribbon 之 HTTP客户端组件
SpringCloud 源码系列(11)— 负载均衡Ribbon 之 重试与总结篇
SpringCloud 源码系列(12)— 服务调用Feign 之 基础使用篇
SpringCloud 源码系列(13)— 服务调用Feign 之 扫描@FeignClient注解接口
SpringCloud 源码系列(14)— 服务调用Feign 之 构建@FeignClient接口动态代理
SpringCloud 源码系列(15)— 服务调用Feign 之 结合Ribbon进行负载均衡请求
SpringCloud 源码系列(16)— 熔断器Hystrix 之 基础入门篇
SpringCloud 源码系列(17)— 熔断器Hystrix 之 获取执行订阅对象Observable
SpringCloud 源码系列(18)— 熔断器Hystrix 之 执行核心原理
前面的文章 SpringCloud 源码系列(14)— 服务调用Feign 之 构建@FeignClient接口动态代理 已经详细分析了 @FeignClient
接口生成动态代理并进行远程调用的原理,下面把文章中的原理图贴出来,便于回顾和理解。
图种和 Hystrix 相关的就是 HystrixTargeter
,Targeter
就是用于建立 FeignClient 动态代理对象的,在这一步,feign 若是启用了 hystrix,就会使用 HystrixTargeter 来建立动态代理对象,不然就会使用 DefaultTargeter
建立代理对象。那么分析 Hystrix 与 Feign 的整合就能够从 HystrixTargeter
这个入口来分析。
在配置类 FeignAutoConfiguration
中,能够看到有以下配置来配置 Targeter 的具体实现类。在引入 feign.hystrix.HystrixFeign
的状况下,Targeter 的实现类为 HystrixTargeter
,不然就是默认的 DefaultTargeter
。
feign.hystrix.HystrixFeign
这个类属于 feign-hystrix
依赖包,也就是说 feign 要开启 hystrix 的功能,须要先加入 feign-hystrix
的组件包。不过 spring-cloud-starter-openfeign
已经帮咱们引入 feign-hystrix
的依赖了,不须要咱们单独引入。
@ConditionalOnClass(name = "feign.hystrix.HystrixFeign")
protected static class HystrixFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new HystrixTargeter();
}
}
@ConditionalOnMissingClass("feign.hystrix.HystrixFeign")
protected static class DefaultFeignTargeterConfiguration {
@Bean
@ConditionalOnMissingBean
public Targeter feignTargeter() {
return new DefaultTargeter();
}
}
复制代码
在 FeignClientsConfiguration
配置类中,能够看到有以下配置决定 Feign.Builder
的具体类型,Feign.Builder
是 Targeter 用来构建 Feign
对象的构造器。能够看到,默认状况下 Feign.Builder 是 Feign.Builder
。若是引入了 Hystrix 且 feign.hystrix.enabled=true
的状况下,Feign.Builder 的实际类型就是 HystrixFeign.Builder
,这块后面会分析。
也就是说,Feign 要启用 Hystrix,不只须要加入 feign-hystrix
的依赖,还须要配置 feign.hystrix.enabled=true
才会生效。
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
public Feign.Builder feignBuilder(Retryer retryer) {
return Feign.builder().retryer(retryer);
}
@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
protected static class HystrixFeignConfiguration {
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "feign.hystrix.enabled")
public Feign.Builder feignHystrixBuilder() {
return HystrixFeign.builder();
}
}
复制代码
来看一下 HystrixTargeter 的 target
方法,fein 启用 hystrix 后,Feign.Builder 是 HystrixFeign.Builder
,因此会走 if
以后的逻辑。
groupName
。@Override
public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context, Target.HardCodedTarget<T> target) {
if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) {
return feign.target(target);
}
feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign;
// HystrixCommand 的名称,取自 Feign 定义的服务名称
String name = StringUtils.isEmpty(factory.getContextId()) ? factory.getName()
: factory.getContextId();
SetterFactory setterFactory = getOptional(name, context, SetterFactory.class);
if (setterFactory != null) {
builder.setterFactory(setterFactory);
}
// 回调类
Class<?> fallback = factory.getFallback();
if (fallback != void.class) {
return targetWithFallback(name, context, target, builder, fallback);
}
// 回调工厂
Class<?> fallbackFactory = factory.getFallbackFactory();
if (fallbackFactory != void.class) {
return targetWithFallbackFactory(name, context, target, builder, fallbackFactory);
}
return feign.target(target);
}
复制代码
不管是回调类仍是回调工厂,最后异曲同工,都是走到 build(fallbackFactory)
。
public <T> T target(Target<T> target, T fallback) {
return build(fallback != null ? new FallbackFactory.Default<T>(fallback) : null)
.newInstance(target);
}
public <T> T target(Target<T> target, FallbackFactory<? extends T> fallbackFactory) {
return build(fallbackFactory).newInstance(target);
}
复制代码
继续看 HystrixFeign.Builder
的 build()
方法,这下就明白了,与未启用 Hystrix 时的最大区别,就在于建立的 InvocationHandler
不一样。启用 hystrix 后,设置的匿名 InvocationHandlerFactory 建立的 InvocationHandler 是 HystrixInvocationHandler
。
同时,用于处理接口注解的接口协议组件 Contract 也设置为了 HystrixDelegatingContract
,默认为 SpringMvcContract
,HystrixDelegatingContract 其实是代理了 SpringMvcContract。
Feign build(final FallbackFactory<?> nullableFallbackFactory) {
super.invocationHandlerFactory(new InvocationHandlerFactory() {
@Override
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
return new HystrixInvocationHandler(target, dispatch, setterFactory,
nullableFallbackFactory);
}
});
super.contract(new HystrixDelegatingContract(contract));
return super.build();
}
复制代码
未启用 hystrix 时,默认建立 InvocationHandler 是 ReflectiveFeign.FeignInvocationHandler
。
public interface InvocationHandlerFactory {
//...
static final class Default implements InvocationHandlerFactory {
@Override
public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
return new ReflectiveFeign.FeignInvocationHandler(target, dispatch);
}
}
}
复制代码
HystrixDelegatingContract 其实是一个装饰器,能够看到,若是 FeignClient 返回类型是 HystrixCommand、Observable、Single、Completable、CompletableFuture
,它会去设置 MethodMetadata 的返回类型为实际类型。
public final class HystrixDelegatingContract implements Contract {
private final Contract delegate;
public HystrixDelegatingContract(Contract delegate) {
this.delegate = delegate;
}
@Override
public List<MethodMetadata> parseAndValidateMetadata(Class<?> targetType) {
List<MethodMetadata> metadatas = this.delegate.parseAndValidateMetadata(targetType);
for (MethodMetadata metadata : metadatas) {
Type type = metadata.returnType();
if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(HystrixCommand.class)) {
Type actualType = resolveLastTypeParameter(type, HystrixCommand.class);
metadata.returnType(actualType);
} else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(Observable.class)) {
Type actualType = resolveLastTypeParameter(type, Observable.class);
metadata.returnType(actualType);
} else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(Single.class)) {
Type actualType = resolveLastTypeParameter(type, Single.class);
metadata.returnType(actualType);
} else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(Completable.class)) {
metadata.returnType(void.class);
} else if (type instanceof ParameterizedType && ((ParameterizedType) type).getRawType().equals(CompletableFuture.class)) {
metadata.returnType(resolveLastTypeParameter(type, CompletableFuture.class));
}
}
return metadatas;
}
}
复制代码
看 HystrixInvocationHandler
的 invoke
方法,调用 FeignClient 接口时,会进入代理对象并调用这个 invoke 方法来执行。
到这里其实就弄清楚 hystrix 与 feign 的整合了,这个代理对象反射调用时,会将本来方法的反射调用封装到 HystrixCommand
的 run()
方法中,而后根据不一样的返回类型调用不一样的方法执行,默认就是调用 HystrixCommand 的 execute()
方法。
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
//...
// 构造 HystrixCommand
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
try {
// run() 方法中就是反射调用原始方法
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw (Error) t;
}
}
// 重写了获取回调的方法
@Override
protected Object getFallback() {
// 执行回调
}
};
if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();
}
复制代码
RestTemplate 远程调用也须要考虑到熔断、降级等,避免服务级联故障,那 RestTemplate 如何与 Hystrix 整合呢?
RestTemplate 的远程调用,咱们能够封装到一个方法中,而后用 @HystrixCommand
注解标注在调用方法上,并配置 grouopKey、回调方法等参数,通常 groupKey 配置远程调用的服务名或提供接口的第三方便可。
其原理想一想其实也很简单,hystrix 确定是增长了一个切面来拦截带有 @HystrixCommand
注解的方法的执行,而后相似 feign 整合 hystrix,将方法的调用封装到 HystrixCommand 中,而后执行命令。
@Service
public class ProducerWithHystrixService {
@Autowired
private RestTemplate restTemplate;
@HystrixCommand( groupKey = "demo-producer", fallbackMethod = "queryId_fallback" )
public String queryId() {
ResponseEntity<String> result = restTemplate.postForEntity("http://demo-producer/v1/uuid",
new LinkedMultiValueMap<String, Object>(), String.class);
return result.getBody();
}
public String queryId_fallback() {
return "error";
}
}
复制代码
Hystrix 有线程池隔离和信号量隔离两种资源隔离模式,若是是线程池隔离的状况下,有个问题须要思考下,那就是若是业务中使用了 ThreadLocal
来存储线程本地变量,而 HystrixCommand 的执行是在子线程中执行的,要知道 ThreadLocal 中的本地变量是不会传递到子线程中的,那业务逻辑在子线程中执行时,就没法获取到 ThreadLocal 中的本地变量了。那这个时候不就会影响本来业务逻辑的执行了吗?又该怎么处理呢?
最简单的解决办法,就是修改隔离策略,使用信号量隔离模式,可是 Hystrix 默认是线程池隔离模式,并且从实际的场景来讲也是使用线程池隔离,这个办法就不行了。
其次是 Hystrix 官方推荐的使用 HystrixConcurrencyStrategy
,实现 wrapCallable
方法,在里面复制线程的状态。
在前面分析 HystrixContextSchedulerWorker
的调度时,有个东西还没分析,看 schedule 方法的代码,worker.schedule
调度的 Action0 其实是上下文调度Action HystrixContexSchedulerAction
。它是对原始 Action0 的再一次封装,建立 HystrixContexSchedulerAction 时传入了 HystrixConcurrencyStrategy 对象和原始的 Action0。
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("...");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}
复制代码
接着看 HystrixContexSchedulerAction
,在构造方法中,原始的 Action0 被封装到了一个 Callable 中,主要就是将主线程中的 Hystrix请求上下文 HystrixRequestContext
设置到子线程中,而后再去调用原始的 Action0,执行结束后,又重置了子线程的 Hystrix请求上下文。
重要的是,再次封装的 Callable 传入了 concurrencyStrategy 的 wrapCallable
方法中,所以,这也是 Hystrix 给咱们提供的一个可扩展的口子,让咱们能够在 Hystrix 请求执行前注入一些自定义的动做,好比复制线程的状态。
public class HystrixContexSchedulerAction implements Action0 {
private final Action0 actual;
private final HystrixRequestContext parentThreadState;
private final Callable<Void> c;
public HystrixContexSchedulerAction(Action0 action) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), action);
}
public HystrixContexSchedulerAction(final HystrixConcurrencyStrategy concurrencyStrategy, Action0 action) {
this.actual = action;
// 主线程状态
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
this.c = concurrencyStrategy.wrapCallable(new Callable<Void>() {
@Override
public Void call() throws Exception {
// 子线程状态
HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
try {
// 主线程状态设置到子线程
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// 在主线程状态下执行原始的 Action0
actual.call();
return null;
} finally {
// 重置子线程状态
HystrixRequestContext.setContextOnCurrentThread(existingState);
}
}
});
}
@Override
public void call() {
try {
c.call();
} catch (Exception e) {
throw new RuntimeException("Failed executing wrapped Action0", e);
}
}
}
复制代码
Spring Security 中,会在 ThreadLocal 中存储安全上下文 SecurityContext
,为了能让 SecurityContext 传递到子线程,spring-cloud-netflix-core
模块中就自定义了 SecurityContextConcurrencyStrategy
安全上下文并发策略类。下面咱们就经过 SecurityContextConcurrencyStrategy 来看看如何自定义 HystrixConcurrencyStrategy
达到复制线程状态的目的。
开发步骤以下:
call()
方法中,首先暂存子线程中的状态,再将主线程中的状态设置到子线程中。这一步就达到了复制主线程状态的目的。finally
中,重置子线程的状态为以前的状态。public final class DelegatingSecurityContextCallable<V> implements Callable<V> {
// 要代理的 Callable
private final Callable<V> delegate;
// 主线程中的 SecurityContext
private final SecurityContext delegateSecurityContext;
// 子线程中的 SecurityContext
private SecurityContext originalSecurityContext;
// 构造方法传入要代理的 Callable 和主线程中的 SecurityContext
public DelegatingSecurityContextCallable(Callable<V> delegate, SecurityContext securityContext) {
this.delegate = delegate;
this.delegateSecurityContext = securityContext;
}
// 构造方法传入要代理的 Callable 和主线程中的 SecurityContext
public DelegatingSecurityContextCallable(Callable<V> delegate) {
// 保存主线程的 SecurityContext
this(delegate, SecurityContextHolder.getContext());
}
@Override
public V call() throws Exception {
// 暂存子线程的 SecurityContext
this.originalSecurityContext = SecurityContextHolder.getContext();
try {
// 将主线程的 SecurityContext 设置到子线程中
SecurityContextHolder.setContext(delegateSecurityContext);
// 调用原始的 Callable
return delegate.call();
} finally {
// 重置为原子线程的 SecurityContext
SecurityContextHolder.setContext(originalSecurityContext);
}
}
}
复制代码
接着开发自定义的Hystrix并发策略类继承自 HystrixConcurrencyStrategy
,有以下的要点:
wrapCallable
方法中,建立自定义的 DelegatingSecurityContextCallable,这样就包装了咱们自定义的动做。public class SecurityContextConcurrencyStrategy extends HystrixConcurrencyStrategy {
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
public SecurityContextConcurrencyStrategy(HystrixConcurrencyStrategy existingConcurrencyStrategy) {
this.existingConcurrencyStrategy = existingConcurrencyStrategy;
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize)
: super.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getRequestVariable(rv)
: super.getRequestVariable(rv);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue)
: super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, threadPoolProperties)
: super.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.wrapCallable(new DelegatingSecurityContextCallable<T>(callable))
: super.wrapCallable(new DelegatingSecurityContextCallable<T>(callable));
}
}
复制代码
自定义并发策略类开发完成后,就须要注册到 Hystrix 中,在 HystrixSecurityAutoConfiguration
中可看看如何注册:
@Configuration
@Conditional(HystrixSecurityCondition.class)
@ConditionalOnClass({Hystrix.class, SecurityContext.class})
public class HystrixSecurityAutoConfiguration {
// 注入已存在的并发策略类
@Autowired(required = false)
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
@PostConstruct
public void init() {
// 保存 Hystrix 插件本来的引用
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
// 重置
HystrixPlugins.reset();
// 注册新的并发策略
HystrixPlugins.getInstance().registerConcurrencyStrategy(new SecurityContextConcurrencyStrategy(existingConcurrencyStrategy));
// 注册其它的插件
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
}
static class HystrixSecurityCondition extends AllNestedConditions {
public HystrixSecurityCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}
// 启用条件
@ConditionalOnProperty(name = "hystrix.shareSecurityContext")
static class ShareSecurityContext {
}
}
}
复制代码
为何要重置以后再从新注册呢?看这些注册方法,能够知道,这些注册方法只能被调用一次,不然将抛出异常。所以为了不已经注册过了,因此须要重置以后再从新注册。
public void registerConcurrencyStrategy(HystrixConcurrencyStrategy impl) {
if (!concurrencyStrategy.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered.");
}
}
public void registerEventNotifier(HystrixEventNotifier impl) {
if (!notifier.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered.");
}
}
复制代码
在上面的代码中还能够看到,HystrixSecurityAutoConfiguration
配置类生效的前提是 hystrix.shareSecurityContext=true
,所以,若是想要在 spring security + hystrix 的环境中,可以在 hystrix 子线程获取 SecurityContext,须要配置 hystrix.shareSecurityContext=true
。
Hystrix 有不少配置,咱们能够从他们的 Properties 配置类中找到有哪些配置以及默认的配置值。
熔断器相关配置和默认值能够在 HystrixCommandProperties
中找到。
隔离策略相关配置和默认值能够在 HystrixCommandProperties
中找到。
Hystrix线程池相关配置和默认值能够在 HystrixThreadPoolProperties
中找到。
在微服务中,通常是经过 yml 文件来配置的,不会使用 HystrixCommandProperties.Setter().withCircuitBreakerEnabled(true)
这种形式,那如何配置 Hystrix 全局默认值和不一样组的呢?
全局默认配置使用 default
做为 key:
hystrix:
threadpool:
# default 做为 key
default:
coreSize: 10
maximumSize: 20
maxQueueSize: 10
command:
# default 做为 key
default:
execution:
isolation:
strategy: THREAD
thread:
timeoutInMilliseconds: 5000
复制代码
针对特定客户端则使用命令的名称做为 key:
hystrix:
threadpool:
# default
default:
coreSize: 10
maximumSize: 10
maxQueueSize: -1
# demo-consumer
demo-consumer:
coreSize: 5
maximumSize: 5
maxQueueSize: 10
command:
## default
default:
execution:
isolation:
strategy: THREAD
thread:
timeoutInMilliseconds: 5000
# demo-producer
demo-producer:
execution:
isolation:
strategy: SEMAPHORE
thread:
timeoutInMilliseconds: 2000
复制代码
针对特定客户端的某个方法,用#
分隔客户端名称和方法名称:
hystrix:
threadpool:
# demo-consumer
demo-consumer#sayHello(Long,String,Integer):
coreSize: 5
maximumSize: 5
maxQueueSize: 10
复制代码
最后,用一张图将前面的Hystrix源码分析原理作个汇总。