ES6.3.2 index操做源码流程

ES 6.3.2 index 操做源码流程

  1. client 发送请求html

  2. TransportBulkAction#doExecute(Task,BulkRequest,listener)java

    • 解析请求,是否要自动建立索引?请求中 是否有mapping信息?
  3. TransportBulkAction#doRun()node

    • 获取集群的状态信息git

      /** sets the last observed state to the currently applied cluster state and returns it */
          public ClusterState setAndGetObservedState() {
              if (observingContext.get() != null) {
                  throw new ElasticsearchException("cannot set current cluster state while waiting for a cluster state change");
              }
              ClusterState clusterState = clusterApplierService.state();
              lastObservedState.set(new StoredState(clusterState));
              return clusterState;
          }
      cluster uuid: 5yBoKgbYQ1ibdZ5WG7bRAA
      version: 7
      state uuid: QVCOkCv_Q_mBGzjwTVDNJw
      from_diff: true
      meta data version: 5
         [test/t-tC0rHESDqNm5SQFO7kPQ]: v[4]
            0: p_term [1], isa_ids [UDR6UFa0Sa27ul74kRpyTQ]
            1: p_term [1], isa_ids [VeuqdSp8R3ub2_a1a9zHJg]
            2: p_term [1], isa_ids [0q3mCMLaSFWgOG5eQJ-EXQ]
            3: p_term [1], isa_ids [maBX8A3sRRK8FPG3VzmfKA]
      metadata customs:
         index-graveyard: IndexGraveyard[[]]
      nodes: 
         {node_sm0}{Xs6SXo4kRj6ylKwLE1dgkA}{bLOl8jv2SGWXt1hk7b_V7g}{127.0.0.1}{127.0.0.1:42641}, master
         {node_sd3}{H4rct3ZxRvKJG2dnF0oFtg}{OwRuFVwkTLufu5LBzvFa0w}{127.0.0.1}{127.0.0.1:33747}
         {node_sm2}{dUEAma7HQJG4eRFx18dRnA}{WOf3n9RoSSCEOkXa9fgWPQ}{127.0.0.1}{127.0.0.1:36963}
         {node_sm1}{kSSol9RjSwyfueUowUdHnQ}{HAgo4XEHS5qWRAokNtzFow}{127.0.0.1}{127.0.0.1:34537}, local
      routing_table (version 4):
      -- index [[test/t-tC0rHESDqNm5SQFO7kPQ]]
      ----shard_id [test][0]
      --------[test][0], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=UDR6UFa0Sa27ul74kRpyTQ]
      ----shard_id [test][1]
      --------[test][1], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=VeuqdSp8R3ub2_a1a9zHJg]
      ----shard_id [test][2]
      --------[test][2], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=0q3mCMLaSFWgOG5eQJ-EXQ]
      ----shard_id [test][3]
      --------[test][3], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=maBX8A3sRRK8FPG3VzmfKA]
      
      routing_nodes:
      -----node_id[H4rct3ZxRvKJG2dnF0oFtg][V]
      --------[test][3], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=maBX8A3sRRK8FPG3VzmfKA]
      --------[test][2], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=0q3mCMLaSFWgOG5eQJ-EXQ]
      --------[test][1], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=VeuqdSp8R3ub2_a1a9zHJg]
      --------[test][0], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=UDR6UFa0Sa27ul74kRpyTQ]
      ---- unassigned
      customs:
         snapshots: SnapshotsInProgress[]   snapshot_deletions: SnapshotDeletionsInProgress[]   restore: RestoreInProgress[]

    • 解析路由信息github

      /* resolve the routing if needed */
          public void resolveRouting(MetaData metaData) {
              routing(metaData.resolveIndexRouting(parent, routing, index));
          }
      routing_table (version 4):
      -- index [[test/t-tC0rHESDqNm5SQFO7kPQ]]
      ----shard_id [test][0]
      --------[test][0], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=UDR6UFa0Sa27ul74kRpyTQ]
      ----shard_id [test][1]
      --------[test][1], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=VeuqdSp8R3ub2_a1a9zHJg]
      ----shard_id [test][2]
      --------[test][2], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=0q3mCMLaSFWgOG5eQJ-EXQ]
      ----shard_id [test][3]
      --------[test][3], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=maBX8A3sRRK8FPG3VzmfKA]

    • 请求中是否有docId?若没有doc id,则自动生成。网络

      // generate id if not already provided
              if (id == null) {
                  assert autoGeneratedTimestamp == -1 : "timestamp has already been generated!";
                  autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia
                  String uid;
                  if (indexCreatedVersion.onOrAfter(Version.V_6_0_0_beta1)) {
                      uid = UUIDs.base64UUID();
                  } else {
                      uid = UUIDs.legacyBase64UUID();
                  }
                  id(uid);
              }

    • 批量请求分组。计算出请求将要发往哪些shard,路由到相同shard上的请求做为一组。app

      ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
                      List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
                      shardRequests.add(new BulkItemRequest(i, request));

    • 向各个分片提交请求,回调中检查提交了请求的那些分片 是否 都成功响应了?异步

      shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
                          @Override
                          public void onResponse(BulkShardResponse bulkShardResponse) {
                              for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
                                  // we may have no response if item failed
                                  if (bulkItemResponse.getResponse() != null) {
                                      bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
                                  }
                                  responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
                              }
                              if (counter.decrementAndGet() == 0) {
                                  finishHim();
                              }
                          }

  4. TransportReplicatonAction#messageReceived(ConcreteShardRequest)elasticsearch

    接收到请求信息。这里是primary shard所在的节点接收到 index 请求开始时的入口点。Ingest node 在发送 index 请求时,首先根据路由信息和 docid 计算出该请求发往哪一个shard,而后从cluster state 中获取 allocationId(allocationId惟一标识了一个shard)。ide

    request: BulkShardRequest [[test][0]] containing [index {[test][type][bogus_doc_ݑݜݢݧݧݯݼa1], source[{}]}], target allocation id: UDR6UFa0Sa27ul74kRpyTQ, primary term: 1

    建立异步的primary操做任务AsyncPrimaryAction

    @Override
            public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
                new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run();
            }

  5. TransportReplicationAction.AsyncPrimaryAction#doRun

    异步primary操做任务执行。在IndexShard上的操做须要得到 permits

    protected void doRun() throws Exception {
                acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request);
            }

    放一张IndexShard对象的状态,感觉一下:

  6. TransportReplicatonAction#acquirePrimaryShardReference()

    • 获取IndexShard对象,IndexShard是个很重要的类,里面封装了不少shard操做

    • 建立得到 permits 的监听器,成功得到 permits 后回调 onResponse()

      ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
                  @Override
                  public void onResponse(Releasable releasable) {
                      //--->TransportReplicationAction.AsyncPrimaryAction.onResponse
                      onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable));
                  }
    • 开始获取 permit

      indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo);
  7. IndexShardOperationPermits#acquire(listener,executor...)

    • 获取 permit

      synchronized (this) {
                      if (delayed) {
                          final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
                          final ActionListener<Releasable> wrappedListener;
                          if (executorOnDelay != null) {
                              wrappedListener =
                                  new PermitAwareThreadedActionListener(threadPool, executorOnDelay,
                                              new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution);
                          } else {
                              wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
                          }
                          delayedOperations.add(new DelayedOperation(wrappedListener, debugInfo, stackTrace));
                          return;
                      } else {
                          releasable = acquire(debugInfo, stackTrace);
                      }
                  }

    • 获取成功后,回调onAcquired.onResponse(releasable);

  8. TransportReplicationAction.AsyncPrimaryAction#onResponse

    • 判断是否 relocated,若是relocated则从新转发请求,不然建立操做对象

      if (primaryShardReference.isRelocated()) {
          final ShardRouting primary = primaryShardReference.routingEntry();
          DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
          transportService.sendRequest(relocatingNode, transportPrimaryAction,....);
      }else{
                              setPhase(replicationTask, "primary");
                          final ActionListener<Response> listener = createResponseListener(primaryShardReference);
                          createReplicatedOperation(request,
                                  ActionListener.wrap(result -> result.respond(listener), listener::onFailure),primaryShardReference)
                                  .execute();//开始真正执行副本操做
      }

      索引请求

      BulkShardRequest [[test][1]] containing [index {[test][type][w3KQpWkBhFoYx7tjRcg3], source[{"field":"value_0"}]}]
  9. ReplicationOperation#execute

    • 检查 active shards数量是否符合要求,若是 active shards 数量小于wait_for_active_shards则拒绝执行。ReplicationGroup 里面有三种类型的分片集合:inSyncAllocationIds(同步副本集合,当前活跃的分片)、trackedAllcationIds、unavailableInSyncShards(stale replica)。

      /**
           * Checks whether we can perform a write based on the required active shard count setting.
           * Returns **null* if OK to proceed, or a string describing the reason to stop
           */
          protected String checkActiveShardCount() {
              final ShardId shardId = primary.routingEntry().shardId();
              final ActiveShardCount waitForActiveShards = request.waitForActiveShards();
              if (waitForActiveShards == ActiveShardCount.NONE) {
                  return null;  // not waiting for any shards
              }
              final IndexShardRoutingTable shardRoutingTable = primary.getReplicationGroup().getRoutingTable();
              if (waitForActiveShards.enoughShardsActive(shardRoutingTable)) {
                  return null;
              } else {
                  final String resolvedShards = waitForActiveShards == ActiveShardCount.ALL ? Integer.toString(shardRoutingTable.shards().size())
                                                    : waitForActiveShards.toString();
                  logger.trace("[{}] not enough active copies to meet shard count of [{}] (have {}, needed {}), scheduling a retry. op [{}], " +
                               "request [{}]", shardId, waitForActiveShards, shardRoutingTable.activeShards().size(),
                               resolvedShards, opType, request);
                  return "Not enough active copies to meet shard count of [" + waitForActiveShards + "] (have " +
                             shardRoutingTable.activeShards().size() + ", needed " + resolvedShards + ").";
              }
          }

    • 获取主分片信息,在主分片上执行 primary 请求

      final ShardRouting primaryRouting = primary.routingEntry();
       final ShardId primaryId = primaryRouting.shardId();
      totalShards.incrementAndGet();
      pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
      primaryResult = primary.perform(request);//---> primary 操做的执行
    • primary shard 更新 local checkpoint

      primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());

  10. TransportReplicationAction.PrimaryShardReference#perform

    主分片上请求执行成功,才会去建立副本分片请求result.replicaRequest()

    @Override
            public PrimaryResult perform(Request request) throws Exception {
                PrimaryResult result = shardOperationOnPrimary(request, indexShard);
                assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest()
                    + "] with a primary failure [" + result.finalFailure + "]";
                return result;
            }

  11. TransportShardBulkAction#shardOperationOnPrimary

    indexmetadata 和 translog,批量执行executeBulkItemRequest

    final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
    Translog.Location location = null;
            for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
                if (isAborted(request.items()[requestIndex].getPrimaryResponse()) == false) {
                    location = executeBulkItemRequest(metaData, primary, request, location, requestIndex,
                        updateHelper, nowInMillisSupplier, mappingUpdater);
                }
            }

  12. IndexShard#applyIndexOperationOnPrimary

    Secequence Number生成

    return applyIndexOperation(SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, autoGeneratedTimestamp,
                isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);

  13. IndexShard#applyIndexOperation

    • 验证 primary shard的 primary term 是不是最新的(防止已通过时的primary shard 还在执行操做致使脏数据)

      assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
              assert versionType.validateVersionForWrites(version);

    • 生成底层Index操做:primary term、source源文本……

      operation = prepareIndex(docMapper(sourceToParse.type()), indexSettings.getIndexVersionCreated(), sourceToParse, seqNo,
                          opPrimaryTerm, version, versionType, origin,
                      autoGeneratedTimeStamp, isRetry);

  14. org.elasticsearch.index.shard.IndexShard#prepareIndex

    Lucene Engine执行

    return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);

    因为debug断点跟踪的时候,线程挂起时间太长,会致使底层transport 关闭。因此文档写入主分片后,同步到副本的过程,是第二个请求的debug,真实情形下是一个请求,但不影响index操做执行的整个流程。

    接下来,在primary shard上执行成功后,从新返回到第9步的:ReplicationOperation#execute

  15. ReplicationOperation#performOnReplicas(ReplicaRequest,globalCheckpoint,ReplicationGroup)

    • if 语句表示:这不是在primary shard上执行,而是在replica上执行
    for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
                if (shard.isSameAllocation(primaryRouting) == false) {
                    performOnReplica(shard, replicaRequest, globalCheckpoint);
                }
            }

    当前的ShardRouting信息

    [test][1], node[H4rct3ZxRvKJG2dnF0oFtg], [P], s[STARTED], a[id=VeuqdSp8R3ub2_a1a9zHJg]

    当前的副本请求信息

    BulkShardRequest [[test][1]] containing [index {[test][type][w3KQpWkBhFoYx7tjRcg3], source[{"field":"value_0"}]}]

  16. ReplicationOperation#performOnReplica(ShardRouting,ReplicaRequest,globalCheckpoint,)

    • 操做在副本上执行成功后,在回调中更新local checkpoint 和 global checkpoint

      replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
                  @Override
                  public void onResponse(ReplicaResponse response) {
                      successfulShards.incrementAndGet();
                      try {
                          primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());//执行成功回调更新检查点
                          primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
                      } catch (final AlreadyClosedException e) {
                          // okay, the index was deleted or this shard was never activated after a relocation; fall through and finish normally
                      } catch (final Exception e) {
                          // fail the primary but fall through and let the rest of operation processing complete
                          final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
                          primary.failShard(message, e);
                      }
                      decPendingAndFinishIfNeeded();
                  }
  17. TransportReplicationAction.ReplicasProxy#performOn

    • 副本操做是在一个代理类TransportReplicationAction.ReplicasProxy上执行的

    • 建立 ConcreteReplicaRequest 对象,里面有 global checkpoint 这样副本就能更新到最新的全局检查点、有primary term 这样副本就能判断当前的primary shard是不是最新的(副本会拒绝那些已经被 master 节点标记为stale的主分片,好比由于网络故障primary shard未意识到它本身已通过时了)、有allocationId 这样就能找到目标副本shard。

    • sendReplicaRequest将它转发到各个副本所在的节点上去执行。

      String nodeId = replica.currentNodeId();
      final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
      final ConcreteReplicaRequest<ReplicaRequest> replicaRequest =
                          new ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, globalCheckpoint);
                  sendReplicaRequest(replicaRequest, node, listener);
  18. TransportReplicationAction#sendReplicaRequest(ConcreteReplicaRequest,DiscoveryNode,ActionListener)

    • 将主分片上已经成功执行了的操做转发到各个副本所在的节点上执行。
  19. 完。

总结

这篇文章详细记录了ES写操做的流程。是从测试方法org.elasticsearch.indexing.IndexActionIT#testAutoGenerateIdNoDuplicates开始调试的。从github上git clone下来ES的源码,gradle 编译成 IDEA 工程后,有若干测试目录,其中不少测试类可很好的模拟ES集群的功能,从这些测试方法入手,提升阅读源码效率。
因为ES是先将index操做在primary shard执行成功后,再“同步”到各个replica,各个replica 将同步的结果返回给primary shard,而后 primary shard再给Client返回ACK。显然,primary shard执行失败了,那这个 index 操做确定执行失败了,返回给Client的ACK那应该是失败的。若是 index 操做在 primary shard 上执行成功了,在primary shard将 index 操做同步给各个replica时,在有些replica上执行失败,那么 primary shard 最终返回给Client的ACK 是成功的。在默认状况下,只要 primary shard 是活跃的,即只要 primary shard 成功执行了 index 操做,就算该 index 操做同步到全部的replica上都失败了时,也会给Client返回一个成功的确认。只不过,在返回的响应中,有一个_shards 的参数,其中的 total 标识了一共须要在多少个分片上执行、successful 标识了执行成功的分片有多少个,这样Client也能知道 一共有多少个分片(primary和相应的replica)成功执行了 index 操做。
为了保证数据的高可靠性,ES中有个配置参数 wait_for_active_shards,默认为1,也即前面提到的只要 primary shard 是活跃的,就能够执行 index 操做。这个参数在上面的第9步操做流程中起做用。在第9步ReplicationOperation#execute方法执行时,首先检查当前的 ReplicationGroup 中的活跃分片是否大于等于wait_for_active_shards,只有大于等于才会继续执行后续的 index 操做。若是将 wait_for_active_shards 设置为2,那么当整个ES集群中只有 primary shard 可用时,index 操做是不能执行的,Client最终会收到一个 Client request timeout 的响应,由于还须要一个活跃的replica才知足 index 操做要求,这样就避免了 只有 primary shard 一个分片接收数据的状况(试想,若是primary shard 所在的节点宕机了会怎么样?)

原文:https://www.cnblogs.com/hapjin/p/10577427.html

相关文章
相关标签/搜索