elasticsearch提供了两种方式TcpTransport和HttpServerTransport方便用户进行各类操做(增,删,改,查等)。node
Netty3HttpServerTransport
public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("openChannels", transport.serverOpenChannels); HttpRequestDecoder requestDecoder = new HttpRequestDecoder( (int) transport.maxInitialLineLength.getBytes(), (int) transport.maxHeaderSize.getBytes(), (int) transport.maxChunkSize.getBytes() ); if (transport.maxCumulationBufferCapacity.getBytes() >= 0) { if (transport.maxCumulationBufferCapacity.getBytes() > Integer.MAX_VALUE) { requestDecoder.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); } else { requestDecoder.setMaxCumulationBufferCapacity((int) transport.maxCumulationBufferCapacity.getBytes()); } } if (transport.maxCompositeBufferComponents != -1) { requestDecoder.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); } pipeline.addLast("decoder", requestDecoder); pipeline.addLast("decoder_compress", new HttpContentDecompressor()); HttpChunkAggregator httpChunkAggregator = new HttpChunkAggregator((int) transport.maxContentLength.getBytes()); if (transport.maxCompositeBufferComponents != -1) { httpChunkAggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); } pipeline.addLast("aggregator", httpChunkAggregator); pipeline.addLast("encoder", new ESNetty3HttpResponseEncoder()); if (transport.compression) { pipeline.addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel)); } if (SETTING_CORS_ENABLED.get(transport.settings())) { pipeline.addLast("cors", new Netty3CorsHandler(transport.getCorsConfig())); } if (transport.pipelining) { pipeline.addLast("pipelining", new HttpPipeliningHandler(transport.pipeliningMaxEvents)); } pipeline.addLast("handler", requestHandler); return pipeline; }
上面是Netty3HttpServerTransport服务端pipline的配置代码,咱们重点关注倒数第二行代码pipline.addLast("handler",requestHandler)。这行代码是HttpServerTransport中对request的处理方式,从requestHandler咱们也能够大体推测其意义。app
public class Netty3HttpRequestHandler extends SimpleChannelUpstreamHandler { private final Netty3HttpServerTransport serverTransport; private final boolean httpPipeliningEnabled; private final boolean detailedErrorsEnabled; private final ThreadContext threadContext; public Netty3HttpRequestHandler(Netty3HttpServerTransport serverTransport, boolean detailedErrorsEnabled, ThreadContext threadContext) { this.serverTransport = serverTransport; this.httpPipeliningEnabled = serverTransport.pipelining; this.detailedErrorsEnabled = detailedErrorsEnabled; this.threadContext = threadContext; } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { HttpRequest request; OrderedUpstreamMessageEvent oue = null; if (this.httpPipeliningEnabled && e instanceof OrderedUpstreamMessageEvent) { oue = (OrderedUpstreamMessageEvent) e; request = (HttpRequest) oue.getMessage(); } else { request = (HttpRequest) e.getMessage(); } // the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally // when reading, or using a cumulation buffer Netty3HttpRequest httpRequest = new Netty3HttpRequest(serverTransport.xContentRegistry, request, e.getChannel()); Netty3HttpChannel channel = new Netty3HttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled, threadContext); serverTransport.dispatchRequest(httpRequest, channel); super.messageReceived(ctx, e); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { Netty3Utils.maybeDie(e.getCause()); serverTransport.exceptionCaught(ctx, e); } }
上面是requestHandler的具体实现类,咱们知道messageReceived是对收到的请求的处理方法。逻辑比较简单,将请求从新封装为Netty3HttpRequest,并经过相关的channel进行处理,而后将请求进行分发(dispatchRequest),最后将其继续往下传递(super.messageReceived->ctx.sendUpStream())。在这里咱们重点关注方法serverTransport.dispatchRequest(httpRequest,channel)。咱们继续往下探究: cors
protected void dispatchRequest(RestRequest request, RestChannel channel) { httpServerAdapter.dispatchRequest(request, channel, threadPool.getThreadContext()); }
public interface HttpServerAdapter { void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext context); }
继续探究HttpServerAdapter的实现类,在这里咱们经过idea的Hierarchy能够看到只有HttpServer实现了该接口,进入HttpServer类elasticsearch
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
if (request.rawPath().equals("/favicon.ico")) {
handleFavicon(request, channel);
return;
}
RestChannel responseChannel = channel;
try {
int contentLength = request.content().length();
if (restController.canTripCircuitBreaker(request)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
} else {
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
restController.dispatchRequest(request, responseChannel, client, threadContext);
} catch (Exception e) {
try {
responseChannel.sendResponse(new BytesRestResponse(channel, e));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error((Supplier<?>) () ->
new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
}
}
}
上述为HttpServer里对dispatchRequest的具体实现方法,一样咱们重点关注try里的最后一行代码restController.dispatchRequest(request,responseChannel,client,threadContext)。继续探究:ide
public void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext) throws Exception { if (!checkRequestParameters(request, channel)) { return; } try (ThreadContext.StoredContext ignored = threadContext.stashContext()) { for (String key : headersToCopy) { String httpHeader = request.header(key); if (httpHeader != null) { threadContext.putHeader(key, httpHeader); } } final RestHandler handler = getHandler(request); if (handler == null) { if (request.method() == RestRequest.Method.OPTIONS) { // when we have OPTIONS request, simply send OK by default (with the Access Control Origin header which gets automatically added) channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)); } else { final String msg = "No handler found for uri [" + request.uri() + "] and method [" + request.method() + "]"; channel.sendResponse(new BytesRestResponse(BAD_REQUEST, msg)); } } else { final RestHandler wrappedHandler = Objects.requireNonNull(handlerWrapper.apply(handler)); wrappedHandler.handleRequest(request, channel, client); } } }
上述为RestController里的dispatchRequest方法,handler是对每一类request都有相关的处理类,咱们按正常流程走即有相关的处理方法,则咱们进入else的分支,在这个分支里开始处理request请求,继续往下探究:ui
/** * Handler for REST requests */ public interface RestHandler { /** * Handles a rest request. * * @param request The request to handle * @param channel The channel to write the request response to * @param client A client to use to make internal requests on behalf of the original request */ void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception; default boolean canTripCircuitBreaker() { return true; } }
上述为RestHandler的接口定义,上述的注释清楚的解释了该接口是为了处理rest请求。该接口有两个实现类,分别为BaseRestHandler和DeprecationRestHandler,其中this
BaseRestHandler是咱们暂时要特别关注的一个抽象类。上图为该类的相关实现类(数量过多,未能截全)咱们能够发现该类为模板类,它的实现类定义了不一样操做请求对应的不一样处理方式。idea
@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
// prepare the request for execution; has the side effect of touching the request parameters
final RestChannelConsumer action = prepareRequest(request, client);
// validate unconsumed params, but we must exclude params used to format the response
// use a sorted set so the unconsumed parameters appear in a reliable sorted order
final SortedSet<String> unconsumedParams =
request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));
// validate the non-response params
if (!unconsumedParams.isEmpty()) {
final Set<String> candidateParams = new HashSet<>();
candidateParams.addAll(request.consumedParams());
candidateParams.addAll(responseParams());
throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
}
// execute the action
action.accept(channel);
}
上面是BaseRestHandler类里的handlerRequest方法。一样在最后一行以前的diamante是对request的一些相关信息进行验证,核心依然是在最后一行代码action.accept(channel)。继续探究:spa
/**
* REST requests are handled by preparing a channel consumer that represents the execution of
* the request against a channel.
*/
@FunctionalInterface
protected interface RestChannelConsumer {
/**
* Executes a request against the given channel.
*
* @param channel the channel for sending the response
* @throws Exception if an exception occurred executing the request
*/
void accept(RestChannel channel) throws Exception;
}
一样,RestChannelConsumer 是一个模本接口。因为我一直对elasticsearch的index过程好奇。所以咱们在这里选择RestIndexAction做为下一层的探究类。3d
@Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id")); indexRequest.routing(request.param("routing")); indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing if (request.hasParam("timestamp")) { deprecationLogger.deprecated("The [timestamp] parameter of index requests is deprecated"); } indexRequest.timestamp(request.param("timestamp")); if (request.hasParam("ttl")) { deprecationLogger.deprecated("The [ttl] parameter of index requests is deprecated"); indexRequest.ttl(request.param("ttl")); } indexRequest.setPipeline(request.param("pipeline")); indexRequest.source(request.content()); indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT)); indexRequest.setRefreshPolicy(request.param("refresh")); indexRequest.version(RestActions.parseVersion(request)); indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType())); String sOpType = request.param("op_type"); String waitForActiveShards = request.param("wait_for_active_shards"); if (waitForActiveShards != null) { indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } if (sOpType != null) { indexRequest.opType(IndexRequest.OpType.fromString(sOpType)); } return channel -> client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> { try { return r.getLocation(indexRequest.routing()); } catch (URISyntaxException ex) { logger.warn("Location string is not a valid URI.", ex); return null; } })); }
经过阅读该方法,咱们能够看到将RestRequest转换为IndexRequest,最终经过nodeClient来处理index操做。
TransportClient
一样,咱们对TransportClient.get方法进行追踪分析,可是此次咱们是经过正向来进行追踪,发现是经过其代理类TransportProxyClient来进行执行的
@Override protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) { proxy.execute(action, request, listener); }
final class TransportProxyClient { private final TransportClientNodesService nodesService; private final Map<Action, TransportActionNodeProxy> proxies; TransportProxyClient(Settings settings, TransportService transportService, TransportClientNodesService nodesService, List<GenericAction> actions) { this.nodesService = nodesService; Map<Action, TransportActionNodeProxy> proxies = new HashMap<>(); for (GenericAction action : actions) { if (action instanceof Action) { proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService)); } } this.proxies = unmodifiableMap(proxies); } public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener) { final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action); nodesService.execute((n, l) -> proxy.execute(n, request, l), listener); } }