上一篇文章中阐述了zuul的基本架构组成,而且将核心关键类相应作了标注以及分析,可是并未详细深刻到关键性的细节,本篇文章主要是是探索zuul超时重试,服务降级的机制。java
不少时候,当一个请求被转发至tomcat服务器处理的过程当中,极有可能由于某种缘由(好比服务器链接池爆满,好比sql查询过久等等)被卡主,在没有超时重试/服务降级的状况下,此时客户端彻底不知情,一直处于等待状态。nginx
指当服务调用方发起请求超过XXXms后,请求还未处理完,则服务调用方会抛出异常,切断请求并进行重试。spring
好比向目标服务发起请求,不幸的是,因为正巧存在网络波动以致于请求超时事后依旧没法访问到目标服务,或者目标服务返回的结果没法被正确的收到,可是此时目标服务并不是是不可服务的状态,因此经过少许重试能够减小因为网络波动等因素所带来的影响。sql
指当服务调用方发起请求超过XXXms后,依旧没法收到正确的响应,则切断请求,接口降级,返回可接受的数据。apache
当在屡次重试后依旧无果,客户端判断此时目标服务不可用(也许目标服务此时并不是不可用),可是客户端已经提早预料到存在这样一个问题,与调用方约定服务不可用时将降级为另外接口,以返回特定的数据。后端
熔断降级机制在广大互联网公司是很是常见的,且在SOA服务,微服务等架构盛行的今天,面对复杂的业务设计,海量的大数据,服务降级策略愈加的重要。设计模式
目前服务降级的策略也很是多,好比nginx,hystrix……api
想要了解zuul的重试/降级等机制的前提下,有必要优先了解zuul的线程模型。tomcat
从上图能够很是清晰的看出zuul1.x的线程模型,即每个请求都会以阻塞方式调用处理(经由RibbonRoutingFilter
处理的方式)服务器
查看HystrixCommand#queue()
源码能够看到以下代码的注释
/* * The Future returned by Observable.toBlocking().toFuture() does not implement the * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true; * thus, to comply with the contract of Future, we must wrap around it. */ final Future<R> delegate = toObservable().toBlocking().toFuture();
RibbonRoutingFilter#forward
经过debug方式能够看到ribbonCommandFactory
实际上是HttpClientRibbonCommandFactory
实例,并用以建立HttpClientRibbonCommand
实例。根据前文看到的zuul的线程模型,能够判定command.execute()
的调用确定是HttpClientRibbonCommand#run()
的方法
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception { Map<String, Object> info = this.helper.debug(context.getMethod(), context.getUri(), context.getHeaders(), context.getParams(), context.getRequestEntity()); // HttpClientRibbonCommandFactory#create // HttpClientRibbonCommand RibbonCommand command = this.ribbonCommandFactory.create(context); try { // HttpClientRibbonCommand#run ClientHttpResponse response = command.execute();// queue().get() this.helper.appendDebug(info, response.getStatusCode().value(), response.getHeaders()); return response; } catch (HystrixRuntimeException ex) { return handleException(info, ex); } }
HttpClientRibbonCommandFactory#create
在建立HttpClientRibbonCommand
之时,也会寻找是否存在相应的降级接口(自定义实现),若是ZuulFallbackProvider
若是为空则降级后按照调用HystrixCommand#getFallback()
抛出异常UnsupportedOperationException("No fallback available.")
@Override public HttpClientRibbonCommand create(final RibbonCommandContext context) { // ZuulFallbackProvider降级接口,每一个serviceId对应一个 // Hystrix 熔断时会调用该接口 ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId()); final String serviceId = context.getServiceId(); // 成功开启重试后的值为RetryableRibbonLoadBalancingHttpClient // 非成功开启重试为RibbonLoadBalancingHttpClient final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient( serviceId, RibbonLoadBalancingHttpClient.class); client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId)); return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider, clientFactory.getClientConfig(serviceId)); }
内部如何决策开启重试机制呢?
从建立bean的条件看,归根结底是根据是否引入srping-retry
来决定是否建立重试实例
@Configuration @ConditionalOnClass(name = "org.apache.http.client.HttpClient") @ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true) public class HttpClientRibbonConfiguration { @RibbonClientName private String name = "client"; // .... @Bean @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class) // 建立bean的条件是org.springframework.retry.support.RetryTemplate不存在 @ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate") public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler, CloseableHttpClient httpClient) { RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(httpClient, config, serverIntrospector); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); Monitors.registerObject("Client_" + this.name, client); return client; } @Bean @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class) // 建立bean的条件是org.springframework.retry.support.RetryTemplate存在 @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate") public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler, LoadBalancedRetryFactory loadBalancedRetryFactory, CloseableHttpClient httpClient) { RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient( httpClient, config, serverIntrospector, loadBalancedRetryFactory); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); Monitors.registerObject("Client_" + this.name, client); return client; } }
HttpClientRibbonCommand#AbstractRibbonCommand#run
前文提到,执行command.execute
的时候会执行HttpClientRibbonCommand#run
,可是因为HttpClientRibbonCommand
没有找到run
方法,因此前往父类AbstractRibbonCommand
寻找run
方法
final RequestContext context = RequestContext.getCurrentContext(); RQ request = createRequest(); // RibbonLoadBalancingHttpClient#AbstractLoadBalancerAwareClient#executeWithLoadBalancer // RetryableRibbonLoadBalancingHttpClient#AbstractLoadBalancerAwareClient#executeWithLoadBalancer RS response = this.client.executeWithLoadBalancer(request, config); context.set("ribbonResponse", response); // Explicitly close the HttpResponse if the Hystrix command timed out to // release the underlying HTTP connection held by the response. // if (this.isResponseTimedOut()) { if (response != null) { response.close(); } } return new RibbonHttpResponse(response); }
AbstractLoadBalancerAwareClient#
这里涉及到Observable至关多的API, 基于RxJava框架,相关的知识能够前往官网或者其余博文了解,这里不作多余赘述。
关键代码在于AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)
究竟作了什么事?
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { // 请求重试处理器 RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig); LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder() .withLoadBalancerContext(this) .withRetryHandler(handler) .withLoadBalancerURI(request.getUri()) .build(); try { // 将请求执行包装在Observable return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }
RibbonLoadBalancingHttpClient#getRequestSpecificRetryHandler RetryableRibbonLoadBalancingHttpClient#getRequestSpecificRetryHandler
查看以下源码发现 okToRetryOnConnectErrors
,okToRetryOnAllErrors
都被初始化为false
fallback
被初始化为DefaultLoadBalancerRetryHandler
@Override public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) { return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, null); } public RequestSpecificRetryHandler(boolean okToRetryOnConnectErrors, boolean okToRetryOnAllErrors, RetryHandler baseRetryHandler, @Nullable IClientConfig requestConfig) { Preconditions.checkNotNull(baseRetryHandler); this.okToRetryOnConnectErrors = okToRetryOnConnectErrors; this.okToRetryOnAllErrors = okToRetryOnAllErrors; this.fallback = baseRetryHandler; if (requestConfig != null) { if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetries)) { retrySameServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetries); } if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetriesNextServer)) { retryNextServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer); } } }
LoadBalancerCommand#submit
该方法代码量较多,且多数为Observable代码,截取其中关键信息查看
// 同一个服务地址最大重试次数,且根据建立条件, 该值走到 final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); // 整个集群内部同一个服务的多个实例的最大重试次数 final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); // 从建立RequestSpecificRetryHandler的条件看,maxRetrysSame 与 maxRetrysNext 都是0, // 也就说下边的重试条件永远不可能发生,详细请查阅DefaultLoadBalancerRetryHandler源码 if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); // 重试策略,也能够称之为断定是否重试 private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) { return new Func2<Integer, Throwable, Boolean>() { @Override public Boolean call(Integer tryCount, Throwable e) { if (e instanceof AbortExecutionException) { return false; } if (tryCount > maxRetrys) { return false; } if (e.getCause() != null && e instanceof RuntimeException) { e = e.getCause(); } return retryHandler.isRetriableException(e, same); } }; } @Override public boolean isRetriableException(Throwable e, boolean sameServer) { if (okToRetryOnAllErrors) { // 查看刚刚的源码发现,不论是否重试,这里的值都被设置为false,因此这里不可能返回 return true; } else if (e instanceof ClientException) { // 若是是客户端异常信息 ClientException ce = (ClientException) e; // 客户端限流 if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) { // sameServer是指是否都是同一个sever // 一旦某一个server实例跑了异常,则再也不对该服务进行重试 // 不一样实例地址则重试 return !sameServer; } else { // 不然再也不重试 return false; } } else { // 必false return okToRetryOnConnectErrors && isConnectionException(e); } }
从源码上看,咋看觉得重试的策略是主动去触发Observable#retry
重试机制进行重试,可是经过bebug的方式却发现太天真了。由于在经过getRequestSpecificRetryHandler
方法建立的RequestSpecificRetryHandler
都是同样的,内部的RetryHandler
都是默认构造的DefaultLoadBalancerRetryHandler
,因此retrySameServer
与retryNextServer
都是0,也就说经过触发Observable#retry
的机制至少在这个版本是不会发生的。
那么重试的机制明显就交给了spring-retry
来处理,那么具体的处理方式又定义在何处呢?
RetryableRibbonLoadBalancingHttpClient#execute
@Override public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { // final RequestConfig.Builder builder = RequestConfig.custom(); IClientConfig config = configOverride != null ? configOverride : this.config; builder.setConnectTimeout(config.get( CommonClientConfigKey.ConnectTimeout, this.connectTimeout));// 默认2s builder.setSocketTimeout(config.get( CommonClientConfigKey.ReadTimeout, this.readTimeout)); // 默认5s builder.setRedirectsEnabled(config.get( CommonClientConfigKey.FollowRedirects, this.followRedirects)); final RequestConfig requestConfig = builder.build(); return this.executeWithRetry(request, new RetryCallback() { // .... }); } private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request, RetryCallback<RibbonApacheHttpResponse, IOException> callback) throws Exception { LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this);//RibbonLoadBalancedRetryPolicyFactory RetryTemplate retryTemplate = new RetryTemplate(); boolean retryable = request.getContext() == null ? true : BooleanUtils.toBooleanDefaultIfNull(request.getContext().getRetryable(), true); retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy() : new RetryPolicy(request, retryPolicy, this, this.getClientName()));//RetryableRibbonLoadBalancingHttpClient return retryTemplate.execute(callback); } @Override public LoadBalancedRetryPolicy create(final String serviceId, final ServiceInstanceChooser loadBalanceChooser) { final RibbonLoadBalancerContext lbContext = this.clientFactory .getLoadBalancerContext(serviceId); return new LoadBalancedRetryPolicy() { // 因为这里是匿名实例,因此可能会比较难找 // 用以判断是否重试相同的服务实例 @Override public boolean canRetrySameServer(LoadBalancedRetryContext context) { return sameServerCount < lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context); } // 用以判断是否重试集群内下一个服务实例 @Override public boolean canRetryNextServer(LoadBalancedRetryContext context) { //this will be called after a failure occurs and we increment the counter //so we check that the count is less than or equals to too make sure //we try the next server the right number of times return nextServerCount <= lbContext.getRetryHandler().getMaxRetriesOnNextServer() && canRetry(context); } @Override public void close(LoadBalancedRetryContext context) { } @Override public void registerThrowable(LoadBalancedRetryContext context, Throwable throwable) { //Check if we need to ask the load balancer for a new server. //Do this before we increment the counters because the first call to this method //is not a retry it is just an initial failure. if(!canRetrySameServer(context) && canRetryNextServer(context)) { context.setServiceInstance(loadBalanceChooser.choose(serviceId)); } //This method is called regardless of whether we are retrying or making the first request. //Since we do not count the initial request in the retry count we don't reset the counter //until we actually equal the same server count limit. This will allow us to make the initial //request plus the right number of retries. if(sameServerCount >= lbContext.getRetryHandler().getMaxRetriesOnSameServer() && canRetry(context)) { //reset same server since we are moving to a new server sameServerCount = 0; nextServerCount++; if(!canRetryNextServer(context)) { context.setExhaustedOnly(); } } else { sameServerCount++; } } }; }
以上基本上把zuul的一次请求(包括开启重试功能以及不开启重试功能)的所有过程都了解了一遍,讲道理应该对zuul的请求转发有了比较深入的了解。
请求流至RibbonRoutingFilter
以后,决定是否重试的功能点在因而否引入了spring-retry
包,可否找到org.springframework.retry.support.RetryTemplate
这个全限定类名。若是找到则顺利开启重试机制,不然不开启重启机制。
除此以外,因为RibbonCommand
继承了HystrixExecutable
,理论上具有了熔断降级策略
的,测试是否具有熔断降级策略
,能够继承自ZuulFallbackProvider
,并将实现类加入到spring容器中(@Component
)。
从源码分析的角度来看,熔断降级策略
与spring-retry
并无产生直接的关系,也就说当请求发起重试的时候,即使已经被降级了以后,后端却仍是重试,而且在重试过程当中,在发生降级以后,后边全部的重试其实都是无心义的重试,由于无论重试是否成功,最后的返回值都是降级后的接口返回的数据。
通过测试发现,熔断降级策略
默认是1s降级,而超时重试默认为5s(请查看前文源码注释)。
spring-retry
依赖<dependency> <groupId>org.springframework.retry</groupId> <artifactId>spring-retry</artifactId> <version>1.1.2.RELEASE</version> </dependency>
zuul.retryable=true
,ribbon.MaxAutoRetries=3
,ribbon.MaxAutoRetriesNextServer=2
eureka: client: serviceUrl: defaultZone: http://localhost:8761/eureka/ server: port: 8769 spring: application: name: service-zuul zuul: routes: api-a: path: /api-a/** serviceId: service-ribbon retryable: true ribbon: MaxAutoRetries: 3 MaxAutoRetriesNextServer: 2
建立熔断后降级接口
@Component public class MyZuulFallbackProvider implements ZuulFallbackProvider { @Override public String getRoute() { return "service-ribbon"; } @Override public ClientHttpResponse fallbackResponse() { return new ClientHttpResponse() { @Override public InputStream getBody() throws IOException { Map<String, Object> map = new HashMap<String, Object>(); map.put("code", 1); map.put("text", "error"); final byte[] reqBodyBytes = map.toString().getBytes(); return new ServletInputStreamWrapper(reqBodyBytes); } @Override public HttpHeaders getHeaders() { return new HttpHeaders(); } @Override public HttpStatus getStatusCode() throws IOException { // TODO Auto-generated method stub return HttpStatus.OK; } @Override public int getRawStatusCode() throws IOException { return 0; } @Override public String getStatusText() throws IOException { return "201 error"; } @Override public void close() { } }; } }
在服务被调用方中加入一个count来计算重试的次数(count
值只用一次,作简单验证足以)
@RestController public class HelloControler { private Integer count = 4; @Autowired HelloService helloService; @RequestMapping(value = "/hi") public String hi(@RequestParam String name){ if( 0 == count --) { // 当尝试第4次请求时,直接返回。 return "hi has bean hystrix"; } System.out.println("request is coming..."); try { Thread.sleep(10000); } catch (InterruptedException e) { System.out.println("线程被打断... " + e.getMessage()); } return name; } }
阅读源码是一件让人兴奋愉悦,却有苦不堪言的事,可是坚持下来就好,原本想画一下整个调用过程的相关类图,但是有点懒,就不画了……
老外写的代码感受更难以看懂一些,不过还好,基本的设计模式没问题,配合编辑以看起来也就不是很累了。
zuul的源码阅读估计就到这里了,其余的坑等后续碰见了再学习。不太重试与降级的问题(降级后继续重试的问题),简直不能忍,是否是这个问题会在zuul2.x版本中解决呢?