Soul网关学习Http请求探险

回顾

在Soul 请求处理概览概览这篇文章中,咱们已经知晓了Soul针对于请求的处理入库在DefaultSoulPluginChain的excute,其中执行了一个插件链的模式来完成了请求的处理。前端

咱们大致梳理了注入到plugins的插件,可是即便这样依然不能纵观全局,对此特意对soul插件所涉及的类进行了相关梳理,总体梳理结果以下图。web

在梳理文章中能够看到核心类是SoulPlugin、PluginEnum、PluginDataHandler、MetaDataSubscriber,在梳理请求的相关文章中咱们目前只须要重点关注SoulPlugin与PluginEnum类。编程

SoulPlugin类咱们已经有了必定的理解,那PluginEnum枚举类的主要做用是什么呢?websocket

PluginEnum:插件的枚举类markdown

属性 做用
code 插件的执行顺序 越小越先执行
role 角色 暂时未发现实际引用地址
name 插件名称

其实咱们不难发如今DefaultSoulPluginChain的plugins的插件都是有固定的执行顺序的,那这个插件的执行顺序是在哪定义的呢?cookie

最终能够追溯到SoulConfiguration类下app

public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
        //省略
        final List<SoulPlugin> soulPlugins = pluginList.stream()
               .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
        return new SoulWebHandler(soulPlugins);
    }

复制代码

整理整个PluginEnum类相关引用,整理出以下表格,不难看出插件与插件之间的顺序关系 负载均衡

等级 做用
第一等级 只有GlobalPlugin 全局插件
第二等级到第八等级 能够理解为在请求发起前的前置处理插件
第九等级到第十一等级 能够理解为针对调用方的方式所针对的不一样调用处理
第十二等级 只有MonitorPlugin 监控插件
第十三等级 是针对于各个调用方返回结果处理的Response相关插件

在刚才的回顾中咱们已经明白soul处理请求的大致流程 - 1.GloBalPlugin插件 进行全局的初始化 - 2.部分插件根据鉴权、限流、熔断等规则对请求进行处理 - 3.选择适合本身的调用方式进行拼装参数,发起调用。 - 4.进行监控 - 5.对调用的结果进行处理dom

请求流程梳理

如下演示代码截图来自于soul-examples下的http demo,调用的接口地址为http://127.0.0.1:9195/http/test/findByUserId?userId=10socket

DefaultSoulPluginChain的excute方法进行埋点,查看一次http请求调用通过了哪些类?

public Mono<Void> execute(final ServerWebExchange exchange) {
            return Mono.defer(() -> {
                if (this.index < plugins.size()) {
                    SoulPlugin plugin = plugins.get(this.index++);
                    Boolean skip = plugin.skip(exchange);
                    if (skip) {
                        System.out.println("跳过的插件为"+plugin.getClass().getName().replace("org.dromara.soul.plugin.",""));
                        return this.execute(exchange);
                    }
                    System.out.println("未跳过的插件为"+plugin.getClass().getName().replace("org.dromara.soul.plugin.",""));
                    return plugin.execute(exchange, this);
                }
                return Mono.empty();
            });
        }

复制代码

最终输出的未跳过的插件以下:

未跳过的插件为global.GlobalPlugin
未跳过的插件为sign.SignPlugin
未跳过的插件为waf.WafPlugin
未跳过的插件为ratelimiter.RateLimiterPlugin
未跳过的插件为hystrix.HystrixPlugin
未跳过的插件为resilience4j.Resilience4JPlugin
未跳过的插件为divide.DividePlugin
未跳过的插件为httpclient.WebClientPlugin
未跳过的插件为alibaba.dubbo.param.BodyParamPlugin
未跳过的插件为monitor.MonitorPlugin
未跳过的插件为httpclient.response.WebClientResponsePlugin

这里有个小疑惑,为啥这个alibaba.dubbo.param.BodyParamPlugin插件会被执行,暂时忽略,后期跟踪。

咱们发现一次针对于http请求的网关调用 所执行的插件的大致流程与咱们猜测的处理流程一致。
目前咱们只挑重点来说,即GlobalPlugin、DividePlugin、WebClientPlugin、WebClientResponsePlugin

发起Debug调用依次追踪上述四个插件的做用。

GlobalPlugin SoulContext对象封装插件

GlobalPlugin的插件的excute方法以下所示

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        final ServerHttpRequest request = exchange.getRequest();
        final HttpHeaders headers = request.getHeaders();
        final String upgrade = headers.getFirst("Upgrade");
        SoulContext soulContext;
        if (StringUtils.isBlank(upgrade) || !"websocket".equals(upgrade)) {
            soulContext = builder.build(exchange);
        } else {
            final MultiValueMap<String, String> queryParams = request.getQueryParams();
            soulContext = transformMap(queryParams);
        }
        exchange.getAttributes().put(Constants.CONTEXT, soulContext);
        return chain.execute(exchange);
    }

复制代码

不难看出 在GlobalPlugin的excute方法中主要目的就是封装一个SoulContext对象,放入exchange中(exchange对象是整个插件链上的共享对象,有一个插件执行完成后传递给下一个插件,本人理解的就是一个相似于ThreadLocal对象)。

那SoulContext对象中又包含哪些属性呢?

属性 含义
module 每种RPCType针对的值不一样http调用时指代网关调用的前置地址
method 切割后的方法名(在RpcType为http时)
rpcType RPC调用类型有Http、dubbo、sofa等
httpMethod Http调用的方式目前只支持get、post
sign 鉴权的相关属性目前不知道具体做用,可能与SignPlugin插件有关
timestamp 时间戳
appKey 鉴权的相关属性目前不知道具体做用,可能与SignPlugin插件有关
path 路径指代调用到soul网关的全路径(在RpcType为http时)
contextPath 与module取值一致(在RPCType为http时)
realUrl 与method的值一致(在RpcType为http时)
dubboParams dubbo的参数?
startDateTime 开始时间怀疑与监控插件和统计指标模块有联用

在执行完GlobalPlugin插件后,最终封装完成的SoulContext对象以下所示。

其余RPCType的SoulContext的参数封装能够查看DefaultSoulContextBuilder的build方法进行追踪,因为本编文章主要追溯http调用,故在这里不在多余讨论。

DividePlugin 路由选择插件

在执行完成GlobalPlugin插件后,最终封装成了一个SoulContext对象,并将其放在了ServerWebExchange中,供下游的调用链使用。

接下来让咱们看一下DividePlugin插件在整个链式调用过程当中到底起了一个什么样的做用?

AbstractSoulPlugin

经过追溯源码得知DividePlugin插件继承于AbstractSoulPlugin类,而AbstractSoulPlugin类实现了SoulPlugin接口

那么AbstractSoulPlugin又作了哪些扩展呢?让咱们梳理一下该类的方法。

方法名 做用
excute 实现于SoulPlugin接口,在AbstractSoulPlugin中起到一个模板方法的做用
doexcute 抽象方法 交由各个子类实现
matchSelector 匹配选择器
filterSelector 筛选选择器
matchRule 匹配规则
filterRule 筛选规则
handleSelectorIsNull 处理选择器为空状况
handleRuleIsNull 处理规则为空状况
selectorLog 选择器日志打印
ruleLog 规则日志打印

看一下excute方法的具体做用

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        String pluginName = named();
        //获取对应插件
        final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
        //判断插件是否启用
        if (pluginData != null && pluginData.getEnabled()) {
            //获取插件下的全部选择器
            final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
            if (CollectionUtils.isEmpty(selectors)) {
                return handleSelectorIsNull(pluginName, exchange, chain);
            }
            //匹配选择器
            final SelectorData selectorData = matchSelector(exchange, selectors);
            if (Objects.isNull(selectorData)) {
                return handleSelectorIsNull(pluginName, exchange, chain);
            }
            //打印选择器日志
            selectorLog(selectorData, pluginName);
            final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
            if (CollectionUtils.isEmpty(rules)) {
                return handleRuleIsNull(pluginName, exchange, chain);
            }
            RuleData rule;
            if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
                rule = rules.get(rules.size() - 1);
            } else {
                //匹配规则
                rule = matchRule(exchange, rules);
            }
            if (Objects.isNull(rule)) {
                return handleRu![](https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f523f655f0014d288b7a4502cc6a08d1~tplv-k3u1fbpfcp-watermark.image)leIsNull(pluginName, exchange, chain);
            }
            //打印规则日志
            ruleLog(rule, pluginName);
            //执行子类具体实现
            return doExecute(exchange, chain, selectorData, rule);
        }
        return chain.execute(exchange);
    }

复制代码

最终整理的流程图以下所示:

ps:在上述的流程图中并无细化到具体的方法级别的处理。

但仍有几个点须要着重解释一下: - 1.插件数据、选择器数据、规则数据的获取所有来自于BaseDataCache,该类是数据同步过程当中最终会影响的类。 - 2.选择器的类型,在使用SpringMvc项目进行接口注册时,会有一个isFull的选项为true表明全局代理,在全局代理模式下只会注册一个选择器\规则(指代代理全部的接口),因此这里的对应处理为rule.size()-1. - 3.选择器和规则的选择,实际的处理要复杂的多,考虑到是介绍一次请求流程的大致逻辑,因此这里不展开阐述,有兴趣的能够查看MatchStrategy、AbstractMatchStrategy及其相关实现类(后期会单独开一篇具体讲解),此处对应页面的以下:

梳理一下AbstractSoulPlugin的exeute方法做用,通过上述流程图的引导,咱们已经知晓该方法的做用是为了选取插件—>选取选择器—>选取规则,最后交由子类的doexcute方法。

接下来让咱们看一下DividePlugin的doexcute方法具体作了哪些事。

DividePlugin

protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        //获取规则处理数据
        final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
        //获取该选择器下的注入的地址
        final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
        if (CollectionUtils.isEmpty(upstreamList)) {
            log.error("divide upstream configuration error: {}", rule.toString());
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
        //经过规则对应的负载均衡策略选择一个地址
        DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
        if (Objects.isNull(divideUpstream)) {
            log.error("divide has no upstream");
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // set the http url
        String domain = buildDomain(divideUpstream);
        //拼装真实调用地址
        String realURL = buildRealURL(domain, soulContext, exchange);
        exchange.getAttributes().put(Constants.HTTP_URL, realURL);
        //设置超时时间 及重试次数
        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
        return chain.execute(exchange);
    }

复制代码

经过上述代码梳理完成后大致逻辑以下: - 1.获取选择器对应的注册地址,对应页面数据以下 - 2.根据规则的handle字段获取负载均衡策略,并选择真实的调用地址(LoadBalanceUtils),重试次数和超时时间,对应页面数据以下。 - 3.将真实调用地址,超时时间,重试次数传递到ServerWebExchange中,供下游调用链使用。 debug演示: ps:在上述的主题逻辑中咱们没有看到参数在哪里?那这个参数在哪封装的呢?答案在buildRealURL方法中,是从exchange上下文中获取到的。

WebClientPlugin Http请求调用插件

接下来让咱们看看Soul如何发起的请求调用

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
        //获取真实地址
        String urlPath = exchange.getAttribute(Constants.HTTP_URL);
        if (StringUtils.isEmpty(urlPath)) {
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        //获取超时时间
        long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
        //获取重试次数
        int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
        log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
        HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
        WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
        return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
    }

复制代码

在webClient的excute方法中,主要作了三个事 - 1.将从Divide插件中放入exchange的属性取出来,调用的真实地址、超时时间、重试次数。 - 2.封装了一个RequestBodySpec对象(不认识这个响应式编程的东西) - 3.调用了一个handleRequestBody方法

先认识handleRequestBody方法

private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
                                         final ServerWebExchange exchange,
                                         final long timeout,
                                         final int retryTimes,
                                         final SoulPluginChain chain) {
        return requestBodySpec.headers(httpHeaders -> {
            httpHeaders.addAll(exchange.getRequest().getHeaders());
            httpHeaders.remove(HttpHeaders.HOST);
        })
                .contentType(buildMediaType(exchange))
                .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
                .exchange()
                //失败打印日志
                .doOnError(e -> log.error(e.getMessage()))
                //设置超时时间
                .timeout(Duration.ofMillis(timeout))
                //设置请求重试实际
                .retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
                    .retryMax(retryTimes)
                    .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
                //请求结束后对应的处理
                .flatMap(e -> doNext(e, exchange, chain));

    }

复制代码

在这个方法里,大致能够理解为 - exchange中的请求头放到本次调用的请求头中 - 设置contentType - 设置超时时间 - 设置失败响应 - 设置重试的场景及重试次数 - 最终结果的处理。 在流程中须要还须要看一个doNext方法

大致逻辑就是判断请求是否成功,将请求结果放入exchange中交给下游插件处理。

private Mono<Void> doNext(final ClientResponse res, final ServerWebExchange exchange, final SoulPluginChain chain) {
        if (res.statusCode().is2xxSuccessful()) {
            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
        } else {
            exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
        }
        exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res);
        return chain.execute(exchange);
    }

复制代码

ps: 虽然并不懂响应式编程,但并不影响咱们阅读代码。

WebClientResponsePlugin Http结果处理插件

该实现的excute方法没有什么核心逻辑,就是判断请求状态码,根据状态码返回给前端不一样的数据格式。

public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
        return chain.execute(exchange).then(Mono.defer(() -> {
            ServerHttpResponse response = exchange.getResponse();
            ClientResponse clientResponse = exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
            if (Objects.isNull(clientResponse)
                    || response.getStatusCode() == HttpStatus.BAD_GATEWAY
                    || response.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
                Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);
                return WebFluxResultUtils.result(exchange, error);
            }
            if (response.getStatusCode() == HttpStatus.GATEWAY_TIMEOUT) {
                Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_TIMEOUT.getCode(), SoulResultEnum.SERVICE_TIMEOUT.getMsg(), null);
                return WebFluxResultUtils.result(exchange, error);
            }
            response.setStatusCode(clientResponse.statusCode());
            response.getCookies().putAll(clientResponse.cookies());
            response.getHeaders().putAll(clientResponse.headers().asHttpHeaders());
            return response.writeWith(clientResponse.body(BodyExtractors.toDataBuffers()));
        }));
    }

复制代码

总结

到此为止,一个基于Soul网关发起的Http请求调用流程大致已经结束。

梳理http请求调用流程 - Global插件封装SoulContext对象 - 前置插件处理熔断限流鉴权等操做。 - Divide插件选择对应调用的真实地址,重试次数,超时时间。 - WebClient插件发起真实的Http调用 - WebClientResponse插件处理对应结果,返回前台。

基于Http调用的大致流程,咱们能够大致猜想出基于别RPC调用的流程,就是替换发起请求的插件和返回结果处理的插件。

在上文中咱们还提到了路由规则的选择LoadBalanceUtils,选择器和规则的处理MatchStrategy

以后将会开启新篇章一步步揭开RPC泛化调用,路由选择,选择器、规则匹配的神秘面纱。

相关文章
相关标签/搜索