Elasticsearch Transport 模块建立及启动分析

Elasticsearch 通讯模块的分析从宏观上介绍了ES Transport模块整体功能,因而就很好奇ElasticSearch是怎么把服务启动起来,以接收Client发送过来的Index索引操做、GET获取文档操做 等一系列操做的呢?本文分析:ElasticSearch6.3.2 Netty Http Server 服务的启动过程。ES节点启动,就是启动各个服务,初始化各个服务代码实现 在 org.elasticsearch.node.Node的构造方法中,从建立 org.elasticsearch.common.network.NetworkModule 对象开始,NetworkModule 就是ES中全部关于网络通讯相关的功能的建立与注册吧。html

final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
networkService, restController);

在建立NetworkModule对象时,主要是建立2个用于通讯的Serverjava

  • 一个是Sever是用来接收用户发起的各类操做请求的(external REST clients),好比GET、INDEX、BULK WRITE、DELETE...这个Server叫HttpServerTransport(具体实现是Netty4HttpServerTransport)。
  • 另外一个Server用于节点之间的通讯(transport layer),好比:节点之间相互发送ping命令、集群各个节点之间的信息交换、还有,当GET index/_doc/1 这样的用户操做发送到coordinator 节点上,当docid为1的文档不在本机节点上,那么就会使用TcpTransport(具体实现是Netty4TcpTransport)将命令转发到目标节点上

A client can either be retrieved from a org.elasticsearch.node.Node started, or connected remotely to one or more nodes using org.elasticsearch.client.transport.TransportClient. Every node in the cluster can handle HTTP and Transport traffic by default. The transport layer is used exclusively for communication between nodes and the Java TransportClient; the HTTP layer is used only by external REST clients.node

Netty4HttpServerTransport 对象建立以下,Netty4TcpTransport 也是相似的逻辑。
org.elasticsearch.common.network.NetworkModule#NetworkModuleapi

Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings,threadPool,bigArrays,circuitBreakerService,namedWriteableRegistry, xContentRegistry, networkService, dispatcher);
                for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
                    registerHttpTransport(entry.getKey(), entry.getValue());
                }

Netty4Plugin#getHttpTransports 建立 Netty Http Server:Netty4HttpServerTransport网络

@Override
    public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,CircuitBreakerService,circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NamedXContentRegistry xContentRegistry,NetworkService networkService,HttpServerTransport.Dispatcher dispatcher) {
        return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
            () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher));
    }

将构造好的 Transport 对象封装到 TransportServiceapp

//获取构造好的 Netty4Transport
final Transport transport = networkModule.getTransportSupplier().get();
//将 Netty4Transport 封装到 TransportService
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);

而后其余须要使用通讯功能的模块,只须要封装 TransportService 对象便可。好比执行用户SEARCH操做的搜索模块 TransportSearchAction,它有一个实例属性SearchTransportService,而SearchTransportService就封装了 TransportService,这样TransportSearchAction就能使用TcpTransport进行通讯了。以下代码所示:
Node.java 构造方法:框架

//构造SearchTransportService对象时f须要TransportService,TransportService对象 是一个"公共链接对象",许多服务都会用到它
final SearchTransportService searchTransportService =  new SearchTransportService(settings,transportService,SearchExecutionStatsCollector.makeWrapper(responseCollectorService));

这里额外提一句:各类Action对象所依赖的Service,应该都是在Node.java的构造方法里面建立的:好比TransportSearchAction依赖的SearchTransportService、ClusterService等都是在节点启动时建立的。elasticsearch

当Netty4HttpServerTransport建立完毕后,就须要绑定端口,启动服务。在org.elasticsearch.node.Node.start方法是ES节点中全部服务的启动入口(固然也包括Netty Http Server了)
org.elasticsearch.node.Node#start方法tcp

if (NetworkModule.HTTP_ENABLED.get(settings)) {
            injector.getInstance(HttpServerTransport.class).start();
        }

由于Netty4HttpServerTransport继承了AbstractLifecycleComponent,所以它的启动逻辑在org.elasticsearch.common.component.AbstractLifecycleComponent.start中实现,执行doStart()启动Netty Http Server,并绑定端口到9200
Netty4HttpServerTransport#doStart()ide

protected void doStart() {
        boolean success = false;
        try {
            this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);//---> es for test

            serverBootstrap = new ServerBootstrap();//workerCount=8, elasticsearch[debug_node][http_server_worker]
            //channel一旦分配给EventLoopGroup里面的某个EventLoop线程后,该channel上的全部的事件都将由这个EventLoop线程处理
            serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
                HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
            serverBootstrap.channel(NioServerSocketChannel.class);//处理链接请求,每一个链接创建后建立一个'child channel'处理该链接的全部IO事件
            //为child channel 绑定一个handler, 即用该handler处理该 channel 上的io event
            serverBootstrap.childHandler(configureServerChannelHandler());//--->Netty4HttpRequestHandler
            //指定 child channel 一些配置参数 (父channel是处理链接请求的channel, child channel是已创建的链接的事件处理通道)
            serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
            //---> TCP 发送缓冲区大小
            final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
            if (tcpSendBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
            }
            //---> TCP 接收缓冲区大小
            final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
            if (tcpReceiveBufferSize.getBytes() > 0) {
                serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
            }

            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
            serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);

            final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
            serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
            serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);

            this.boundAddress = createBoundHttpAddress();//--->ServerBootStrap绑定端口
            if (logger.isInfoEnabled()) {
                logger.info("{}", boundAddress);
            }
            success = true;
        } finally {
            if (success == false) {
                doStop(); // otherwise we leak threads since we never moved to started
            }
        }
    }

Netty Http Server的worker线程数量是:节点所在的机器上的可用CPU核数:(Runtime.getRuntime().availableProcessors()*2)
其余的一些默认配置以下:
TCP_NODELAY=true, SO_KEEPALIVE=true

ServerBootstrap(ServerBootstrapConfig(group: NioEventLoopGroup, channelFactory: NioServerSocketChannel.class, options: {RCVBUF_ALLOCATOR=io.netty.channel.FixedRecvByteBufAllocator@72ce8a9b, SO_REUSEADDR=true}, childGroup: NioEventLoopGroup, childOptions: {TCP_NODELAY=true, SO_KEEPALIVE=true, RCVBUF_ALLOCATOR=io.netty.channel.FixedRecvByteBufAllocator@72ce8a9b, SO_REUSEADDR=true}, childHandler: org.elasticsearch.http.netty4.Netty4HttpServerTransport$HttpChannelHandler@56ec6ac0))

ES Server 接收用户请求(GET/WRITE/DELETE...)的起始处理点 在哪里?

因为ES Server(实在找不到其余更好的名字来描述了...)是基于 Netty的,那确定有个ChannelHandler负责处理发生在SocketChannel上的事件。而这个ChannelHandler就是:org.elasticsearch.http.netty4.Netty4HttpRequestHandler
org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#initChannel 方法中注册了Netty4HttpRequestHandler,所以用户请求就交给Netty4HttpRequestHandler来处理了。

ch.pipeline().addLast("handler", requestHandler);//Netty4HttpRequestHandler 业务逻辑处理

那根据Netty框架,毫无疑问 接收用户请求的起始处理点在 org.elasticsearch.http.netty4.Netty4HttpRequestHandler#channelRead0 方法里面了。
所以,若是想debug一下INDEX操做、GET操做、DELETE操做的入口,在入口点: org.elasticsearch.http.netty4.Netty4HttpRequestHandler#channelRead0 打上debug断点,在返回处:org.elasticsearch.http.netty4.Netty4HttpChannel#sendResponse 打上debug断点,根据IDEA的 dubuger frames 栈追踪 查看各个操做的执行路径。

既然全部的用户操做都是统一的入口,那么又是如何解析这些操做,并最终传递给合适的 TransportXXXAction 来处理的呢?其大概步骤以下:

  • 1,ES每一个操做(JAVA API/rest api)都有对应的Action类,好比:DELETE APID的Action类是:RestDeleteAction;GET API 的Action类是:RestGetAction。
  • 2,每一个Action类都重写了父类的org.elasticsearch.rest.BaseRestHandler#prepareRequest方法,构造出相应的Action对象,并在方法中返回一个lambda表达式,表明须要执行该操做。接下来,该操做在 BaseRestHandler#handleRequest 方法中的 action.accept(channel)语句触发执行。
  • 3,触发执行后,这些Action操做由 NodeClient#doExecute 方法发送到相应的节点上执行:先得到 执行Action操做所对应的 TransportXXXAction类,再经过 execute(request,listener) 执行,代码以下:
    return transportAction(action).execute(request, listener)
    TransportAction#execute(Request, org.elasticsearch.action.ActionListener )是执行各类Action操做的统一入口,最终在在:TransportAction.RequestFilterChain#proceed 中 this.action.doExecute(task, request, listener);调用每一个实现类TransportXXXAction#doExecute()执行对应的操做!
    好比说:GET操做由:TransportSingleShardAction#doExecute处理;DELETE操做由:TransportBulkAction#doExecute(Task,BulkRequest, ActionListener )处理。
  • 4,继续深刻分析DELETE操做。TransportBulkAction#doExecute 调用 org.elasticsearch.action.bulk.TransportBulkAction#executeBulk启动一个新任务:BulkOperation。因为DELETE操做是与分片相关的操做,即须要从分片上删除数据,所以在org.elasticsearch.action.bulk.TransportBulkAction.BulkOperation#doRun 方法中判断该操做是一个DELETE类型的操做,并执行:shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>(){...});将删除操做提交给"分片处理Action"---TransportShardBulkAction执行。
  • 5,TransportShardBulkAction继承自TransportAction,execute固然仍是走“相同的”逻辑到这个方法里面:TransportAction#execute(Task,Request,ActionListener),再到processed()方法里面this.action.doExecute(task, request, listener);,这时就是调用:TransportShardBulkAction的doExecute方法了。而TransportShardBulkAction的doExecute()方法是继承自TransportReplicationAction,能够看到在这里面执行的是ReroutePhase任务,这也很好理解,由于删除一篇文档,须要知道这篇文档在哪一个分片上,须要把删除请求发送到这个分片上去,这也是为何须要ReroutePhase的缘由吧:
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) { new ReroutePhase((ReplicationTask) task, request, listener).run(); }
  • 6,跟踪到ReroutePhase的doRun()方法里面看:删除操做在本机节点上执行performLocalAction,删除操做在其余远程节点上执行:performRemoteAction。这里,又经过TransportService#sendRequest 方法把请求发送出去了。。。烦,那我就继续跟踪,看看你翻跟斗到哪里去了……
    if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { performLocalAction(state, primary, node, indexMetaData); } else { performRemoteAction(state, primary, node); }

  • 7,那跟斗到底翻到哪里去了呢?其实这个也很好判断,这是一个DELETE操做,它所对应的Action执行是TransportReplicationAction,并且DELETE操做确定是要走primary shard的,结果在TransportReplicationAction的内部类PrimaryOperationTransportHandler里面发现了接收方法:PrimaryOperationTransportHandler#messageReceived(ConcreteShardRequest,TransportChannel,Task),里面建立AsyncPrimaryAction任务,在TransportReplicationAction.AsyncPrimaryAction#doRun里面,才是真正地开始在分片上获取访问锁,并删除文档。
  • 8,AsyncPrimaryAction#doRun成功获取到锁(PrimaryShardReference)后,回调:AsyncPrimaryAction#onResponse,在createReplicatedOperation(...).execute()触发底层Lucene删除逻辑。

删除的时候,有相应的删除策略,具体实如今:org.elasticsearch.index.engine.InternalEngine#planDeletionAsPrimary

if (versionValue == null) {
            currentVersion = Versions.NOT_FOUND;
            currentlyDeleted = true;
        } else {
            currentVersion = versionValue.version;
            currentlyDeleted = versionValue.isDelete();
        }
        final DeletionStrategy plan;
        if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
            final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
            plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
        } else {
            plan = DeletionStrategy.processNormally(
                    currentlyDeleted,
                    generateSeqNoForOperation(delete),
                    delete.versionType().updateVersion(currentVersion, delete.version()));
        }
        return plan;

删除doc的时候,还要判断docid在不在,具体实如今:org.elasticsearch.index.engine.InternalEngine#loadCurrentVersionFromIndex

private long loadCurrentVersionFromIndex(Term uid) throws IOException {
        assert incrementIndexVersionLookup();
        try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) {
            return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid);
        }
    }

另外在看源码的时候发现,** delete-by-doc-id 是不会触发 段合并的 **。因此,delete by id 这种方式的删除是很快的且对集群负载影响很小:

// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:

最终在:org.elasticsearch.index.engine.InternalEngine#delete 方法里面进行Lucene层面上的文档删除:

if (delete.origin() == Operation.Origin.PRIMARY) {
                plan = planDeletionAsPrimary(delete);
            } else {
                plan = planDeletionAsNonPrimary(delete);
            }

            if (plan.earlyResultOnPreflightError.isPresent()) {
                deleteResult = plan.earlyResultOnPreflightError.get();
            } else if (plan.deleteFromLucene) {
                deleteResult = deleteInLucene(delete, plan);
            } else {
                deleteResult = new DeleteResult(
                        plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false);
            }

具体实如今:org.elasticsearch.index.engine.InternalEngine#deleteInLucene里面,代码就不贴了。以上,就是一个完整的 ES delete by doc id 的执行流程。感兴趣的能够再细究。

这篇文章最后,详细介绍了DELET API的执行路径,其余操做也是相似的,按这个分析便可。 原文:https://www.cnblogs.com/hapjin/p/11018479.html

相关文章
相关标签/搜索