一. elasticsearch分析 - HttpServerTransport和Transport服务端请求处理分析

        elasticsearch提供了两种方式TcpTransport和HttpServerTransport方便用户进行各类操做(增,删,改,查等)。node

Netty3HttpServerTransport

public ChannelPipeline getPipeline() throws Exception {
    ChannelPipeline pipeline = Channels.pipeline();
    pipeline.addLast("openChannels", transport.serverOpenChannels);
    HttpRequestDecoder requestDecoder = new HttpRequestDecoder(
        (int) transport.maxInitialLineLength.getBytes(),
        (int) transport.maxHeaderSize.getBytes(),
        (int) transport.maxChunkSize.getBytes()
    );
    if (transport.maxCumulationBufferCapacity.getBytes() >= 0) {
        if (transport.maxCumulationBufferCapacity.getBytes() > Integer.MAX_VALUE) {
            requestDecoder.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
        } else {
            requestDecoder.setMaxCumulationBufferCapacity((int) transport.maxCumulationBufferCapacity.getBytes());
        }
    }
    if (transport.maxCompositeBufferComponents != -1) {
        requestDecoder.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
    }
    pipeline.addLast("decoder", requestDecoder);
    pipeline.addLast("decoder_compress", new HttpContentDecompressor());
    HttpChunkAggregator httpChunkAggregator = new HttpChunkAggregator((int) transport.maxContentLength.getBytes());
    if (transport.maxCompositeBufferComponents != -1) {
        httpChunkAggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
    }
    pipeline.addLast("aggregator", httpChunkAggregator);
    pipeline.addLast("encoder", new ESNetty3HttpResponseEncoder());
    if (transport.compression) {
        pipeline.addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel));
    }
    if (SETTING_CORS_ENABLED.get(transport.settings())) {
        pipeline.addLast("cors", new Netty3CorsHandler(transport.getCorsConfig()));
    }
    if (transport.pipelining) {
        pipeline.addLast("pipelining", new HttpPipeliningHandler(transport.pipeliningMaxEvents));
    }
    pipeline.addLast("handler", requestHandler);
    return pipeline;
}

         上面是Netty3HttpServerTransport服务端pipline的配置代码,咱们重点关注倒数第二行代码pipline.addLast("handler",requestHandler)。这行代码是HttpServerTransport中对request的处理方式,从requestHandler咱们也能够大体推测其意义。app

public class Netty3HttpRequestHandler extends SimpleChannelUpstreamHandler {

    private final Netty3HttpServerTransport serverTransport;
    private final boolean httpPipeliningEnabled;
    private final boolean detailedErrorsEnabled;
    private final ThreadContext threadContext;

    public Netty3HttpRequestHandler(Netty3HttpServerTransport serverTransport, boolean detailedErrorsEnabled, ThreadContext threadContext) {
        this.serverTransport = serverTransport;
        this.httpPipeliningEnabled = serverTransport.pipelining;
        this.detailedErrorsEnabled = detailedErrorsEnabled;
        this.threadContext = threadContext;
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        HttpRequest request;
        OrderedUpstreamMessageEvent oue = null;
        if (this.httpPipeliningEnabled && e instanceof OrderedUpstreamMessageEvent) {
            oue = (OrderedUpstreamMessageEvent) e;
            request = (HttpRequest) oue.getMessage();
        } else {
            request = (HttpRequest) e.getMessage();
        }

        // the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally
        // when reading, or using a cumulation buffer
        Netty3HttpRequest httpRequest = new Netty3HttpRequest(serverTransport.xContentRegistry, request, e.getChannel());
        Netty3HttpChannel channel = new Netty3HttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled, threadContext);
        serverTransport.dispatchRequest(httpRequest, channel);
        super.messageReceived(ctx, e);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        Netty3Utils.maybeDie(e.getCause());
        serverTransport.exceptionCaught(ctx, e);
    }
}

        上面是requestHandler的具体实现类,咱们知道messageReceived是对收到的请求的处理方法。逻辑比较简单,将请求从新封装为Netty3HttpRequest,并经过相关的channel进行处理,而后将请求进行分发(dispatchRequest),最后将其继续往下传递(super.messageReceived->ctx.sendUpStream())。在这里咱们重点关注方法serverTransport.dispatchRequest(httpRequest,channel)。咱们继续往下探究:    cors

protected void dispatchRequest(RestRequest request, RestChannel channel) {
    httpServerAdapter.dispatchRequest(request, channel, threadPool.getThreadContext());
}
public interface HttpServerAdapter {

    void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext context);

}

        继续探究HttpServerAdapter的实现类,在这里咱们经过idea的Hierarchy能够看到只有HttpServer实现了该接口,进入HttpServer类elasticsearch

public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
    if (request.rawPath().equals("/favicon.ico")) {
        handleFavicon(request, channel);
        return;
    }
    RestChannel responseChannel = channel;
    try {
        int contentLength = request.content().length();
        if (restController.canTripCircuitBreaker(request)) {
            inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
        } else {
            inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
        }
        // iff we could reserve bytes for the request we need to send the response also over this channel
        responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
        restController.dispatchRequest(request, responseChannel, client, threadContext);
    } catch (Exception e) {
        try {
            responseChannel.sendResponse(new BytesRestResponse(channel, e));
        } catch (Exception inner) {
            inner.addSuppressed(e);
            logger.error((Supplier<?>) () ->
                new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
        }
    }
}

        上述为HttpServer里对dispatchRequest的具体实现方法,一样咱们重点关注try里的最后一行代码restController.dispatchRequest(request,responseChannel,client,threadContext)。继续探究:ide

public void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext) throws Exception {
    if (!checkRequestParameters(request, channel)) {
        return;
    }
    try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
        for (String key : headersToCopy) {
            String httpHeader = request.header(key);
            if (httpHeader != null) {
                threadContext.putHeader(key, httpHeader);
            }
        }

        final RestHandler handler = getHandler(request);

        if (handler == null) {
            if (request.method() == RestRequest.Method.OPTIONS) {
                // when we have OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added)
                channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
            } else {
                final String msg = "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]";
                channel.sendResponse(new BytesRestResponse(BAD_REQUEST, msg));
            }
        } else {
            final RestHandler wrappedHandler = Objects.requireNonNull(handlerWrapper.apply(handler));
            wrappedHandler.handleRequest(request, channel, client);
        }
    }
}

       上述为RestController里的dispatchRequest方法,handler是对每一类request都有相关的处理类,咱们按正常流程走即有相关的处理方法,则咱们进入else的分支,在这个分支里开始处理request请求,继续往下探究:ui

/**
 * Handler for REST requests
 */
public interface RestHandler {

    /**
     * Handles a rest request.
     *
     * @param request The request to handle
     * @param channel The channel to write the request response to
     * @param client A client to use to make internal requests on behalf of the original request
     */
    void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception;

    default boolean canTripCircuitBreaker() {
        return true;
    }
}

       上述为RestHandler的接口定义,上述的注释清楚的解释了该接口是为了处理rest请求。该接口有两个实现类,分别为BaseRestHandler和DeprecationRestHandler,其中this

BaseRestHandler是咱们暂时要特别关注的一个抽象类。上图为该类的相关实现类(数量过多,未能截全)咱们能够发现该类为模板类,它的实现类定义了不一样操做请求对应的不一样处理方式。idea

@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
    // prepare the request for execution; has the side effect of touching the request parameters
    final RestChannelConsumer action = prepareRequest(request, client);

    // validate unconsumed params, but we must exclude params used to format the response
    // use a sorted set so the unconsumed parameters appear in a reliable sorted order
    final SortedSet<String> unconsumedParams =
        request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

    // validate the non-response params
    if (!unconsumedParams.isEmpty()) {
        final Set<String> candidateParams = new HashSet<>();
        candidateParams.addAll(request.consumedParams());
        candidateParams.addAll(responseParams());
        throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
    }

    // execute the action
    action.accept(channel);
}

 

       上面是BaseRestHandler类里的handlerRequest方法。一样在最后一行以前的diamante是对request的一些相关信息进行验证,核心依然是在最后一行代码action.accept(channel)。继续探究:spa

/**
 * REST requests are handled by preparing a channel consumer that represents the execution of
 * the request against a channel.
 */
@FunctionalInterface
protected interface RestChannelConsumer {
    /**
     * Executes a request against the given channel.
     *
     * @param channel the channel for sending the response
     * @throws Exception if an exception occurred executing the request
     */
    void accept(RestChannel channel) throws Exception;
}

        

        一样,RestChannelConsumer 是一个模本接口。因为我一直对elasticsearch的index过程好奇。所以咱们在这里选择RestIndexAction做为下一层的探究类。3d

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
    IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"));
    indexRequest.routing(request.param("routing"));
    indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
    if (request.hasParam("timestamp")) {
        deprecationLogger.deprecated("The [timestamp] parameter of index requests is deprecated");
    }
    indexRequest.timestamp(request.param("timestamp"));
    if (request.hasParam("ttl")) {
        deprecationLogger.deprecated("The [ttl] parameter of index requests is deprecated");
        indexRequest.ttl(request.param("ttl"));
    }
    indexRequest.setPipeline(request.param("pipeline"));
    indexRequest.source(request.content());
    indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
    indexRequest.setRefreshPolicy(request.param("refresh"));
    indexRequest.version(RestActions.parseVersion(request));
    indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
    String sOpType = request.param("op_type");
    String waitForActiveShards = request.param("wait_for_active_shards");
    if (waitForActiveShards != null) {
        indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));
    }
    if (sOpType != null) {
        indexRequest.opType(IndexRequest.OpType.fromString(sOpType));
    }

    return channel ->
        client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> {
            try {
                return r.getLocation(indexRequest.routing());
            } catch (URISyntaxException ex) {
                logger.warn("Location string is not a valid URI.", ex);
                return null;
            }
        }));
}

        经过阅读该方法,咱们能够看到将RestRequest转换为IndexRequest,最终经过nodeClient来处理index操做。

TransportClient

        一样,咱们对TransportClient.get方法进行追踪分析,可是此次咱们是经过正向来进行追踪,发现是经过其代理类TransportProxyClient来进行执行的

@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
    proxy.execute(action, request, listener);
}
final class TransportProxyClient {

    private final TransportClientNodesService nodesService;
    private final Map<Action, TransportActionNodeProxy> proxies;

    TransportProxyClient(Settings settings, TransportService transportService,
                                TransportClientNodesService nodesService, List<GenericAction> actions) {
        this.nodesService = nodesService;
        Map<Action, TransportActionNodeProxy> proxies = new HashMap<>();
        for (GenericAction action : actions) {
            if (action instanceof Action) {
                proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
            }
        }
        this.proxies = unmodifiableMap(proxies);
    }

    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
        ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action,
                                                                              final Request request, ActionListener<Response> listener) {
        final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
        nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
    }
}
相关文章
相关标签/搜索