本文基于 spring cloud gateway 2.0.1java
GlobalGilter 全局过滤器接口与 GatewayFilter 网关过滤器接口具备相同的方法定义。全局过滤器是一系列特殊的过滤器,会根据条件应用到全部路由中。网关过滤器是更细粒度的过滤器,做用于指定的路由中。web
从类图中能够看到 GlobalFilter 有十一个实现类,包括路由转发、负载均衡、ws 路由、netty 路由等全局过滤器。下面咱们就分别介绍一下这些全局路由过滤器的实现。spring
ForwardRoutingFilter 在交换属性 ServerWebExchangeUtils.GATEWAY_ REQUEST_ URL_ ATTR 中 查找 URL, 若是 URL 为转发模式即 forward:/// localendpoint, 它将使用Spring DispatcherHandler 来处 理请求。 未修改的原始 URL 将保存到 GATEWAY_ ORIGINAL_ REQUEST_ URL_ ATTR 属性的列表中。后端
public class ForwardRoutingFilter implements GlobalFilter, Ordered { private static final Log log = LogFactory.getLog(ForwardRoutingFilter.class); private final ObjectProvider<DispatcherHandler> dispatcherHandler; public ForwardRoutingFilter(ObjectProvider<DispatcherHandler> dispatcherHandler) { this.dispatcherHandler = dispatcherHandler; } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); //获取请求URI的请求结构 String scheme = requestUrl.getScheme(); //该路由已经被处理或者URI格式不是forward则继续其它过滤器 if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) { return chain.filter(exchange); } setAlreadyRouted(exchange); //TODO: translate url? if (log.isTraceEnabled()) { log.trace("Forwarding to URI: "+requestUrl); } // 使用dispatcherHandler进行处理 return this.dispatcherHandler.getIfAvailable().handle(exchange); } }
转发路由过滤器实现比较简单,构造函数传入请求的分发处理器DispatcherHandler。过滤器执行时,首先获取请求地址的url前缀,而后判断该请求是否已被路由处理或者URL的前缀不是forward,则继续执行过滤器链;不然设置路由处理状态并交由DispatcherHandler进行处理。缓存
请求路由是否被处理的判断以下:websocket
// ServerWebExchangeUtils.javasession
public static void setAlreadyRouted(ServerWebExchange exchange) { exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true); } public static boolean isAlreadyRouted(ServerWebExchange exchange) { return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false); }
两个 方法 定义 在 ServerWebExchangeUtils 中, 这 两个 方法 用于 修改 与 查询 ServerWebExchange 中的 Map< String, Object> getAttributes(),# getAttributes 方法 返回 当前 exchange 所请 求 属性 的 可变 映射。负载均衡
这两个方法定义在 ServerWebExchangeUtils 中,分别用于修改和查询 GATEWAY_ALREADY_ROUTED_ATTR 状态。socket
spring: cloud: gateway: routes: - id: myRoute uri: lb://service predicates: - Path=/service/**
LoadBalancerClientFilter 在交换属性 GATEWAY_ REQUEST_ URL_ ATTR 中查找URL, 若是URL有一个 lb 前缀 ,即 lb:// myservice,将使用 LoadBalancerClient 将名称 解析为实际的主机和端口,如示例中的 myservice。 未修改的原始 URL将保存到 GATEWAY_ ORIGINAL_ REQUEST_ URL_ ATTR 属性的列表中。过滤器还将查看ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR属性以查看它是否等于lb,而后应用相同的规则。ide
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) { return chain.filter(exchange); } //保留原始url addOriginalRequestUrl(exchange, url); log.trace("LoadBalancerClientFilter url before: " + url); //负载均衡到具体服务实例 final ServiceInstance instance = choose(exchange); if (instance == null) { throw new NotFoundException("Unable to find instance for " + url.getHost()); } URI uri = exchange.getRequest().getURI(); //若是没有提供前缀的话,则会使用默认的'< scheme>',不然使用' lb:< scheme>' 机制。 String overrideScheme = null; if (schemePrefix != null) { overrideScheme = url.getScheme(); } //根据获取的服务实例信息,从新组装请求的 url URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri); // Routing 相关 的 GatewayFilter 会 经过 GATEWAY_ REQUEST_ URL_ ATTR 属性, 发起 请求。 log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl); return chain.filter(exchange); }
从过滤器执行方法中能够看出,负载均衡客户端过滤器的实现步骤以下:
一、构造函数传入负载均衡客户端,依赖中添加 Spring Cloud Netflix Ribbon 便可 注入 该 Bean。
二、获取请求的 URL 及其前缀,若是 URL 不为空且前缀为lb或者网关请求的前缀是 lb,则保存原始的URL,负载到具体的服务实例并根据获取的服务实例信息,从新组装请求的URL。
三、最后,添加请求的URL到GATEWAY_ REQUEST_ URL_ ATTR,并提交到过滤器链中继续执行
在组装请求的地址时,若是loadbalancer没有提供前缀的话,则使用默认的,即overrideScheme 为null,不然的话使用 lb:
若是 ServerWebExchangeUtils.GATEWAY_ REQUEST_ URL_ ATTR 请求属性中的URL 具备http或https前缀,NettyRoutingFilter 路由过滤器将运行,它使用 Netty HttpClient 代理对下游的请求。响应信息放在ServerWebExchangeUtils.CLIENT_ RESPONSE_ ATTR 属性中,在过滤器链中进行传递。
该过滤器实际处理 和客户端负载均衡的实现方式相似:
首先获取请求的URL及前缀,判断前缀是否是http或者https,若是该请求已经被路由或者前缀不合法,则调用过滤器链直接向后传递;不然正常对头部进行过滤操做。
public class NettyRoutingFilter implements GlobalFilter, Ordered { private final HttpClient httpClient; private final ObjectProvider<List<HttpHeadersFilter>> headersFilters; private final HttpClientProperties properties; public NettyRoutingFilter(HttpClient httpClient, ObjectProvider<List<HttpHeadersFilter>> headersFilters, HttpClientProperties properties) { this.httpClient = httpClient; this.headersFilters = headersFilters; this.properties = properties; } @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } setAlreadyRouted(exchange); ServerHttpRequest request = exchange.getRequest(); final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString()); final String url = requestUrl.toString(); HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(), exchange); final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING); boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding); boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); Mono<HttpClientResponse> responseMono = this.httpClient.request(method, url, req -> { final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach) .headers(httpHeaders) .chunkedTransfer(chunkedTransfer) .failOnServerError(false) .failOnClientError(false); if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); proxyRequest.header(HttpHeaders.HOST, host); } if (properties.getResponseTimeout() != null) { proxyRequest.context(ctx -> ctx.addHandlerFirst( new ReadTimeoutHandler(properties.getResponseTimeout().toMillis(), TimeUnit.MILLISECONDS))); } return proxyRequest.sendHeaders() //I shouldn't need this .send(request.getBody().map(dataBuffer -> ((NettyDataBuffer) dataBuffer).getNativeBuffer())); }); return responseMono.doOnNext(res -> { ServerHttpResponse response = exchange.getResponse(); // put headers and status so filters can modify the response HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter( this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE); response.getHeaders().putAll(filteredResponseHeaders); HttpStatus status = HttpStatus.resolve(res.status().code()); if (status != null) { response.setStatusCode(status); } else if (response instanceof AbstractServerHttpResponse) { // https://jira.spring.io/browse/SPR-16748 ((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code()); } else { throw new IllegalStateException("Unable to set status code on response: " +res.status().code()+", "+response.getClass()); } // Defer committing the response until all route filters have run // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); }) .onErrorMap(t -> properties.getResponseTimeout() != null && t instanceof ReadTimeoutException, t -> new TimeoutException("Response took longer than timeout: " + properties.getResponseTimeout())) .then(chain.filter(exchange)); } }
NettyRoutingFilter 过滤器的构造函数有三个参数:
HttpClient httpClient : 基于 Netty 实现的 HttpClient,经过该属性请求后端 的 Http 服务
ObjectProvider<List> headersFilters: ObjectProvider 类型 的 headersFilters,用于头部过滤
HttpClientProperties properties: Netty HttpClient 的配置属性
filterRequest 用于对请求头部的信息进行处理,是定义在接口 HttpHeadersFilter 中的默认方法,该接口有三个实现类,请求头部将会通过这三个头部过滤器,并最终返回修改以后的头部。
public interface HttpHeadersFilter { enum Type { REQUEST, RESPONSE } /** * Filters a set of Http Headers * * @param input Http Headers * @param exchange * @return filtered Http Headers */ HttpHeaders filter(HttpHeaders input, ServerWebExchange exchange); static HttpHeaders filterRequest(List<HttpHeadersFilter> filters, ServerWebExchange exchange) { HttpHeaders headers = exchange.getRequest().getHeaders(); return filter(filters, headers, exchange, Type.REQUEST); } static HttpHeaders filter(List<HttpHeadersFilter> filters, HttpHeaders input, ServerWebExchange exchange, Type type) { HttpHeaders response = input; if (filters != null) { HttpHeaders reduce = filters.stream() .filter(headersFilter -> headersFilter.supports(type)) .reduce(input, (headers, filter) -> filter.filter(headers, exchange), (httpHeaders, httpHeaders2) -> { httpHeaders.addAll(httpHeaders2); return httpHeaders; }); return reduce; } return response; } default boolean supports(Type type) { return type.equals(Type.REQUEST); } }
HttpHeadersFilter 接口的三个实现类:
ForwardedHeadersFilter:
增长 Forwarded头部,头部值为协议类型、host和目标地址
XForwardedHeadersFilter:
增长 X- Forwarded- For、 X- Forwarded- Host、 X- Forwarded- Port 和 X- Forwarded- Proto 头部。 代理转发时,用以自定义的头部信息向下游传递。
RemoveHopByHopHeadersFilter:
为了定义缓存和非缓存代理的行为,咱们将HTTP头字段分为两类:端到端的头部字段,发送给请求或响应的最终接收人;逐跳头部字段,对单个传输级别链接有意义,而且不被缓存存储或由代理转发。
因此该头部过滤器会移除逐跳头部字段,包括如下8个字段:
Proxy- Authenticate
Proxy- Authorization
TE
Trailer
Transfer- Encoding
Upgrade
proxy- connection
content- length
NettyWriteResponseFilter 与 NettyRoutingFilter 成对使用。“ 预” 过滤阶段没有任何内容,由于 CLIENT_ RESPONSE_ ATTR 在 WebHandler 运行以前不会被添加。
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added // until the WebHandler is run return chain.filter(exchange).then(Mono.defer(() -> { HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR); if (clientResponse == null) { return Mono.empty(); } log.trace("NettyWriteResponseFilter start"); ServerHttpResponse response = exchange.getResponse(); NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); //TODO: what if it's not netty final Flux<NettyDataBuffer> body = clientResponse.receive() .retain() //TODO: needed? .map(factory::wrap); MediaType contentType = null; try { contentType = response.getHeaders().getContentType(); } catch (Exception e) { log.trace("invalid media type", e); } return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })); }
若是 CLIENT_ RESPONSE_ ATTR 请求 属性 中 存在 Netty HttpClientResponse, 则 会应用 NettyWriteResponseFilter。 它在其余过滤器完成后运行,并将代理响应写回 网关客户端响应。成对出现的 WebClientHttpRoutingFilter 和 WebClientWriteResponseFilter 过滤器,与基于Nettty 的路由和响应过滤器执行相同 的功能,但不须要使用Netty。
若是 ServerWebExchangeUtils.GATEWAY_ ROUTE_ ATTR 请求属性中有Route对象, 则 会运行 RouteToRequestUrlFilter 过滤器。他会根据请求URI建立一个新的URI。 新的 URI 位于 ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR 请求属性中。该过滤器会组装成发送到代理服务的URL地址,向后传递到路由转发的过滤器。
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); if (route == null) { return chain.filter(exchange); } log.trace("RouteToRequestUrlFilter start"); URI uri = exchange.getRequest().getURI(); boolean encoded = containsEncodedParts(uri); URI routeUri = route.getUri(); if (hasAnotherScheme(routeUri)) { // this is a special url, save scheme to special attribute // replace routeUri with schemeSpecificPart exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme()); routeUri = URI.create(routeUri.getSchemeSpecificPart()); } URI mergedUrl = UriComponentsBuilder.fromUri(uri) // .uri(routeUri) .scheme(routeUri.getScheme()) .host(routeUri.getHost()) .port(routeUri.getPort()) .build(encoded) .toUri(); exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl); return chain.filter(exchange); }
首先获取请求中的 Route, 如 果为 空 则 直接 提交 过滤器 链; 不然 获取 routeUri, 并 判断 routeUri 是否 特殊, 若是 是 则需 要 处理 URL, 保存 前缀 到 GATEWAY_SCHEME_PREFIX_ATTR, 并将 routeUri 替换
首先获取请求中的Route,若是为空则直接提交给过滤器链
获取routeUri并判断是否特殊,若是是则须要处理URL,保存前缀到GATEWAY_SCHEME_PREFIX_ATTR,并将routeUri 替换为schemeSpecificPart
而后拼接requestUrl,将请求的URI转换为路由定义的routeUri
最后,提交到过滤器链继续执行
若是请求中的ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR 属性对应的URL前缀为 ws 或 wss,则启用Websocket 路由过滤器。它使用Spring Web Socket 做为底层通讯组件向下游转发 WebSocket 请求。Websocket 能够经过添加前缀 lb来实现负载均衡,如 lb:ws://serviceid
若是您使用SockJS做为普通http的回调,则应配置正常的HTTP路由以及Websocket路由
spring: cloud: gateway: routes: # SockJS route - id: websocket_sockjs_route uri: http://localhost:3001 predicates: - Path=/websocket/info/** # Normwal Websocket route - id: websocket_route uri: ws://localhost:3001 predicates: - Path=/websocket/**
Websocket 路由过滤器进行处理时,首先获取请求的URL及其前缀,判断是否知足 Websocket 过滤器启用的条件;对于未被路由处理且请求前缀为ws或wss的请求,设置路由处理状态位,构造过滤后的头部。最后将请求经过代理转发。
// WebsocketRoutingFilter.java
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { //检查websocket 是不是 upgrade changeSchemeIfIsWebSocketUpgrade(exchange); URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); //判断是否知足websocket启用条件 if (isAlreadyRouted(exchange) || (!"ws".equals(scheme) && !"wss".equals(scheme))) { return chain.filter(exchange); } setAlreadyRouted(exchange); HttpHeaders headers = exchange.getRequest().getHeaders(); HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL); if (protocols != null) { protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream() .flatMap(header -> Arrays.stream(commaDelimitedListToStringArray(header))) .map(String::trim) .collect(Collectors.toList()); } //将请求代理转发 return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols)); }
ProxyWebSocketHandler 是 WebSocketHandler 的实现类,处理客户端 WebSocket Session。 下面看一下代理 WebSocket 处理器的具体实现:
// WebsocketRoutingFilter.java
private static class ProxyWebSocketHandler implements WebSocketHandler { private final WebSocketClient client; private final URI url; private final HttpHeaders headers; private final List<String> subProtocols; public ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List<String> protocols) { this.client = client; this.url = url; this.headers = headers; if (protocols != null) { this.subProtocols = protocols; } else { this.subProtocols = Collections.emptyList(); } } @Override public List<String> getSubProtocols() { return this.subProtocols; } @Override public Mono<Void> handle(WebSocketSession session) { // pass headers along so custom headers can be sent through return client.execute(url, this.headers, new WebSocketHandler() { @Override public Mono<Void> handle(WebSocketSession proxySession) { // Use retain() for Reactor Netty Mono<Void> proxySessionSend = proxySession .send(session.receive().doOnNext(WebSocketMessage::retain)); // .log("proxySessionSend", Level.FINE); Mono<Void> serverSessionSend = session .send(proxySession.receive().doOnNext(WebSocketMessage::retain)); // .log("sessionSend", Level.FINE); return Mono.zip(proxySessionSend, serverSessionSend).then(); } /** * Copy subProtocols so they are available downstream. * @return */ @Override public List<String> getSubProtocols() { return ProxyWebSocketHandler.this.subProtocols; } }); } }
WebSocketClient# execute 方法链接后端被代理的 WebSocket 服务。
链接成功后,回调WebSocketHandler实现的内部类的handle( WebSocketSession session)方法
WebSocketHandler 实现的内部类实现对消息的转发: 客户端=> 具体业务服务=> 客户 端; 而后合并代理服务的会话信息 proxySessionSend 和业务服务的会话信息serverSessionSend。
AdaptCachedBodyGlobalFilter— 用于缓存请求体的过滤器,在全局过滤器中的优先级较高。
ForwardPathFilter— 请求中的 gatewayRoute 属性对应 Route 对象,当 Route 中的 URI scheme 为 forward 模式 时, 该过滤器用于设置请求的 URI 路径为 Route 对象 中的 URI 路径。