index action分析

上一篇从结构上分析了action的,本篇将以index action为例仔分析一下action的实现方式。html

再归纳一下action的做用:对于每种功能(如index)action都会包括两个基本的类*action(IndexAction)和Transport*action(TransportIndexAction),前者类中会有一个实例(IndexAction INSTANCE = new IndexAction())这个实例用于client绑定对应的TransportAction(registerAction(IndexAction.INSTANCE, TransportIndexAction.class)),绑定过程发送在ActionModuel中。另外在Action类中还会定义一个action的名字(String NAME = "indices:data/write/index")这个名字用于TransportService绑定对于的handle,用于处理NettyTransport接收到的信息。TransportAction的是最终的逻辑处理者,当接收到请求时,会首先判断本节点可否处理,若是可以处理则调用相关的方法处理获得结果返回,不然将经过NettyTransport转发该请求到对应的node进行处理。全部的Transport的结构都是这种类型。node

首先看一下TransportAction的类图,所的Transport*action都继承自于它。app

它主要由两个方法execute和doExecute,execute方法有两种实现,第一种实现须要自行添加actionListener。最终的逻辑都在doExecute方法中,这个方法在各个功能模块中实现。如下是TransportIndexAction的继承关系:异步

实现上因为功能划分的缘由,TransportIndexAction直接继承自TranspShardReplicationOperationAction,这个抽象类中的方法是全部须要操做shard副本的功能action的父,所以它的实现还包括delete,bulk等功能action。它实现了多个内部类,这些内部类用来辅助完成相关的功能。这里主要说一下OperationTransportHandler,ReplicaOperationTransportHandler及AsyncShardOperationAction三个子类。首先看一下OperationTransportHandler的代码,以下所示:elasticsearch

class OperationTransportHandler extends BaseTransportRequestHandler<Request> {
//继承自BaseTransportRequestHanlder
……………… @Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception { // no need to have a threaded listener since we just send back a response request.listenerThreaded(false); // if we have a local operation, execute it on a thread since we don't spawn request.operationThreaded(true);
      //调用Transport的execute方法,经过channel返回结果 execute(request,
new ActionListener<Response>() { @Override public void onResponse(Response result) { try { channel.sendResponse(result); } catch (Throwable e) { onFailure(e); } } @Override public void onFailure(Throwable e) { try { channel.sendResponse(e); } catch (Throwable e1) { logger.warn("Failed to send response for " + actionName, e1); } } }); }

看过NettyTransport请求发送和处理的同窗必定对这个代码不陌生,这就是elasticsearch节点间处理信息的典型模式。当请求经过NettyTransport发送到本节点时会根据请求的action名称找到对应的handler,使用对应的handler来处理该请求。这个handler就对应着“indices:data/write/index”,能够看到它调用execute方法来处理。它的注册时在TransportShardReplicationOperationAction构造函数中完成的。知道了OperationTransportHandler,ReplicaOperationTransportHandler就好理解了它的实现方式跟前者彻底同样,对应的action名称加了一个“[r]”,它的做用是处理须要在副本上进行的操做,代码以下所示:ide

    class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
…………………… @Override public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception { try { shardOperationOnReplica(request); } catch (Throwable t) { failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t); throw t; } channel.sendResponse(TransportResponse.Empty.INSTANCE); } }

能够看到代码结构很是像,只是调用了副本操做的方法shardOperationOnReplica,这个方法在这TransportShardReplicationOperationAction中是抽象的,它的实如今各个子类中,例如deleteaction中实现了对于delete请求如何在副本上处理。函数

分析完这两个handle是否是对于action的处理过程有了必定的眉目了呢?可是这才是冰山一角,这两个Handler是用来接收来自其它节点的请求,若是请求的正好是本节点该如何处理呢?这些逻辑都在AsyncShardOperationAction类中。首先看一下它的内部结构:ui

由于TransportShardReplicationOperationAction的全部子类都是对索引的修改,会引发数据不一致,所以它的操做流程都是如今primaryShard上操做而后是Replicashard上操做。代码以下所示:spa

 protected void doStart() throws ElasticsearchException {
            try {
          //检查是否有阻塞 ClusterBlockException blockException
= checkGlobalBlock(observer.observedState()); if (blockException != null) { if (blockException.retryable()) { logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); retry(blockException); return; } else { throw blockException; } }
          //检测是不是建立索引
if (resolveIndex()) { internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions())); } else { internalRequest.concreteIndex(internalRequest.request().index()); } // check if we need to execute, and if not, return if (!resolveRequest(observer.observedState(), internalRequest, listener)) { return; }
          //再次检测是否有阻塞 blockException
= checkRequestBlock(observer.observedState(), internalRequest); if (blockException != null) { if (blockException.retryable()) { logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); retry(blockException); return; } else { throw blockException; } } shardIt = shards(observer.observedState(), internalRequest); } catch (Throwable e) { listener.onFailure(e); return; }         //查找primaryShard boolean foundPrimary = false; ShardRouting shardX; while ((shardX = shardIt.nextOrNull()) != null) { final ShardRouting shard = shardX; // we only deal with primary shardIt here... if (!shard.primary()) { continue; } if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) { logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId()); retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node."); return; } if (!primaryOperationStarted.compareAndSet(false, true)) { return; } foundPrimary = true;
          //primaryShard就在本地,直接进行相关操做
if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) { try { if (internalRequest.request().operationThreaded()) { internalRequest.request().beforeLocalFork(); threadPool.executor(executor).execute(new Runnable() { @Override public void run() { try { performOnPrimary(shard.id(), shard); } catch (Throwable t) { listener.onFailure(t); } } }); } else { performOnPrimary(shard.id(), shard); } } catch (Throwable t) { listener.onFailure(t); } } else {//primaryShard在其它节点上,将请求经过truansport发送到对应的节点。 DiscoveryNode node = observer.observedState().nodes().get(shard.currentNodeId()); transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() { @Override public Response newInstance() { return newResponseInstance(); } @Override public String executor() { return ThreadPool.Names.SAME; } @Override public void handleResponse(Response response) { listener.onResponse(response); } @Override public void handleException(TransportException exp) { // if we got disconnected from the node, or the node / shard is not in the right state (being closed) if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException || retryPrimaryException(exp)) { primaryOperationStarted.set(false); internalRequest.request().setCanHaveDuplicates(); // we already marked it as started when we executed it (removed the listener) so pass false // to re-add to the cluster listener logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage()); retry(exp); } else { listener.onFailure(exp); } } }); } break; } ……………… }

这就是对应请求的处理过程,接下来是primary操做的方法:code

 void performOnPrimary(int primaryShardId, final ShardRouting shard) {
           ……
                PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request()));
                performReplicas(response);
            …………
        }

以上就是performOnPrimary方法的部分代码,首先调用外部类的shardOperationOnPrimary方法,该方法实如今各个子类中,在TransportIndexAction中的实现以下所示:

    @Override
    protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
        final IndexRequest request = shardRequest.request;

        // 查看是否须要routing
IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex()); MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type()); if (mappingMd != null && mappingMd.routing().required()) { if (request.routing() == null) { throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id()); } }
      //调用indexserice执行对应的index操做 IndexService indexService
= indicesService.indexServiceSafe(shardRequest.shardId.getIndex()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); long version; boolean created; try { Engine.IndexingOperation op; if (request.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates()); if (index.parsedDoc().mappingsModified()) { mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID()); } indexShard.index(index); version = index.version(); op = index; created = index.created(); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId()); if (create.parsedDoc().mappingsModified()) { mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID()); } indexShard.create(create); version = create.version(); op = create; created = true; } if (request.refresh()) { try { indexShard.refresh("refresh_flag_index"); } catch (Throwable e) { // ignore } } // update the version on the request, so it will be used for the replicas request.version(version); request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); assert request.versionType().validateVersionForWrites(request.version()); IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created); return new PrimaryResponse<>(shardRequest.request, response, op); } catch (WriteFailureException e) { if (e.getMappingTypeToUpdate() != null) { DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate()); if (docMapper != null) { mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID()); } } throw e.getCause(); } }

上面的代码就是index的执行过程,这一过程涉及到index的底层操做,这里就不展开,只是说明它在action中是如何实现的,后面会有详细说明。接下来看在副本上的操做。副本可能有多个,所以首先调用了performReplicas方法,在这个方法中首先开始监听集群的状态,而后便利全部的副本进行处理,若是是异步则加入一个listener,不然同步执行返回结果。最后调用performReplica,在该方法中调用外部类的抽象方法shardOperationOnReplica。 这一过程比较简单,这里就再也不贴代码,有兴趣能够参考相关源码。

总结一下:这里以TransportIndexAction为例分析了tansportaction的结构层次。它在TransportAction直接还有一层那就是TransportShardReplicationOperationAction,这个类是actionsupport包中的一个,这个包把全部的子操做方法作了进一步的抽象,抽象出几个大类放到了这里,全部其它子功能不少都继承自这。这个包会在后面有详细分析。 

相关文章
相关标签/搜索