ES 源代码阅读(二)

1 基本概念node

集群: 一个集群有一个或多个节点组织在一块儿,并将数据组织在一块儿,提供索引和搜索服务.
节点:一个节点是一个集群中的服务器,提供存储数据,提供搜索服务.
索引:文档的逻辑的集合
分片:一个逻辑索引有若干分片,其中一个分片被设置为主分片.分片为索引的存储位置. 会涉及到分布式问题.
类型:文档的类型
文档:与lucene中的document相似.服务器

若是集群状态发生改变,发生改变状态的节点,先通知其余节点更新状态,最后才更新本地节点
的状态.所以在提交版本信息同步时,都会涉及到这个过程.app

2 提交请求初次建立索引curl

概述:elasticsearch

解析请求,根据请求建立index和mapping,并提交版本信息到集群.而后根据配置建立0,1,2,3
的shard(分片)而后更新版本信息到集群.最后建立shard(分片)4 而后更新版本信息到集群.
最后根据请求更新已有的mapper信息,最后发布到集群中各个节点,而后更新版本信息.分布式

curl -XPOST 'localhost:9200/twitter/tweet/1' -d '{"name":"parker","age":18}'
源码阅读:
HttpServer.internalDispatchRequest (HttpRequest,HttpChannel) ->
(RestController)restController.dispatchRequest(HttpRequest,HttpChannel) ->
RestController.executeHandler(HttpRequest,HttpChannel)->
获取RestHandler->
handler.handleRequest(request, channel)->
BaseRestHandler.handleRequest(request, channel, new HeadersAndContextCopyClient(client, request, controller.relevantHeaders()))->
RestIndexAction.handleRequest(request, channel, new HeadersAndContextCopyClient(client, request, controller.relevantHeaders()))->
建立IndexRequest对象.
client.index(indexRequest,RestBuilderListener)->
AbstraceClient.index(final IndexRequest request, final ActionListener<IndexResponse> listener)->
execute(IndexAction.INSTANCE, request, listener)->
BaseRestHandler.execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener)
处理前的准备工做copyHeaderSAndContext(request, restRequest, headers)->
<-copyHeaderSAndContext
FilterClient.execute(action, request, listener)->
(TransportAction)transportAction.execute(request, listener)->
TransportAction.doExecute(request, listener)->
TransportIndexAction.doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener)->
判断是否须要新建索引,若是须要则新建,不然直接创建索引.
若是须要建立索引,先构造建立索引的请求.而后建立索引
(TransportCreateIndexAction)createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>)->
TransportMasterNodeOperationAction.doExecute(final Request request, final ActionListener<Response> listener)->
TransportMasterNodeOperationAction.innerExecute(final Request request, final ActionListener<Response> listener, final ClusterStateObserver observer, final boolean retrying)->
先判断是不是本地master节点的请求
若是经过验证,将其交给线程池来处理.
TransportMasterNodeOperationAction.masterOperation(request, clusterService.state(), listener);->
TransportCreateIndexAction.masterOperation((final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener)->
建立请求
(CreateIndexClusterStateUpdateRequest)updateRequest
MetaDataCreateIndexService.createIndexService.createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener)->
若是能获取锁,直接建立.
不然放入线程池,交由后台执行获取锁操做,何时获取到何时建立.
MetaDataCreateIndexService.createIndex(request, listener, mdLock);->
建立并完善请求.
(InternalClusterService extends ClusterService)clusterService.submitStateUpdateTask(String, Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener))->
首先生成UpdateTask,而后被提交的线程池中,而后被执行UpdateTask的run方法.
UpdateTask.run->
newClusterState = updateTask.execute(previousClusterState);->
MetaDataCreateIndexService.execute->
建立相关配置文件.
在indices中建立index 而且增长mapping
indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());->
建立相关索引,添加模块.
<-indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());
初始化indexService
初始化mapperService
初始化索引解析服务indexQueryParserService
更新相关其余相关信息
<-MetaDataCreateIndexService.execute
<-newClusterState = updateTask.execute(previousClusterState);ui

Discovery.AckListener ackListener = new NoOpAckListener();
若是是主节点,而后发布状态.
更新状态.
                for (ClusterStateListener listener : preAppliedListeners) {
                    try {
                        listener.clusterChanged(clusterChangedEvent);
                    } catch (Exception ex) {
                        logger.warn("failed to notify ClusterStateListener", ex);
                    }
                }
RoutingService.clusterChanged(Event)->
<-RoutingService.clusterChanged(Event)
IndicesClusterStateService.clusterChanged(Event)->
执行一系列与event相关的操做.
cleanFailedShards(event);
applyDeletedIndices(event);
applyNewIndices(event);
applyMappings(event);
applyAliases(event);
applyNewOrUpdatedShards(event);
applyDeletedShards(event);
applyCleanedIndices(event);
applySettings(event);
<-IndicesClusterStateService.clusterChanged(Event)
NodeSettingsService.clusterChanged(Event)->
<-NodeSettingsService.clusterChanged(Event)
InternalClusterInfoService.clusterChanged(Event)->
<-InternalClusterInfoService.clusterChanged(Event)
RepositoriesService.clusterChanged(Event)->
<-RepositoriesService.clusterChanged(Event)
MetaDataUpdateSettingsService.clusterChanged(Event)->
<-MetaDataUpdateSettingsService.clusterChanged(Event)
RestoreService.clusterChanged(Event)->
<-RestoreService.clusterChanged(Event)
RiverRouter.clusterChanged(Event)->
<-RiverRouter.clusterChanged(Event)
org.elasticsearch.cluster.service.InternalClusterService$LocalNodeMasterListeners.clusterChanged(Event)->
<-org.elasticsearch.cluster.service.InternalClusterService$LocalNodeMasterListeners.clusterChanged(Event)
LocalGatewayAllocator.clusterChanged(Event)->
<-LocalGatewayAllocator.clusterChanged(Event)
IndexCache.clusterChanged(Event)->
<-IndexCache.clusterChanged(Event)
SnapshotsService.clusterChanged(Event)->
<-SnapshotsService.clusterChanged(Event)
IndicesStore.clusterChanged(Event)->
<-IndicesStore.clusterChanged(Event)
LocalGateway.clusterChanged(Event)->
<-LocalGateway.clusterChanged(Event)
GatewayService.clusterChanged(Event)->
<-GatewayService.clusterChanged(Event)
若是node有掉节点的状况,卸下节点.
获取发布相关请求.
//manual ack only from the master at the end of the publish
<-UpdateTask.run
<-(InternalClusterService extends ClusterService)clusterService.submitStateUpdateTask
<-MetaDataCreateIndexService.createIndexService.createIndex
<-TransportCreateIndexAction.masterOperation
<-TransportMasterNodeOperationAction.masterOperation
<-TransportMasterNodeOperationAction.innerExecute
<-TransportMasterNodeOperationAction.doExecute
<-(TransportCreateIndexAction)createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>)
不然直接建立索引
TransportCreateIndexAction.innerExecute(request, listener);
<-TransportIndexAction.doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener)
<-TransportAction.doExecute(request, listener);
<-(TransportAction)transportAction.execute
<-FilterClient.execute
<-BaseRequestHandler.execute
<-AbstraceClient.index
<-RestIndexAction.handleRequest
<-BaseRestHandler.handleRequest
<-RestHandler
<-RestController
<-HttpServerurl

 

后期会详细研究这一过程spa

相关文章
相关标签/搜索