从本篇开始,就进入了Index的核心代码部分。这里首先分析一下索引的建立过程。elasticsearch中的索引是多个分片的集合,它只是逻辑上的索引,并不具有实际的索引功能,全部对数据的操做最终仍是由每一个分片完成。建立索引的过程,从elasticsearch集群上来讲就是写入索引元数据的过程,这一操做只能在master节点上完成。这是一个阻塞式动做,在加上分配在集群上均衡的过程也很是耗时,所以在一次建立大量索引的过程master节点会出现单点性能瓶颈,可以看到响应过程很慢。html
在开始具体源码分析以前,首先回顾一下Action部分的内容(参考index action分析),elasticsearch的每个功能都对应两个Action,*action和Transport*action。*action中定义了每一个功能对应的路径,同时Action的instance绑定对应的Transport*Action。全部功能请求都须要在集群上转发,这大概也是每一个功能都有Transport*Action的缘由吧。对于create固然也不例外,它的开始点也是TransportCreateAction。另外,在action support分析中分析过,不一样的action须要通过和须要操做的节点也不一样。create index只能由master节点进行,并且也只在master节点上进行,保证集群数据的一致性。所以TransportCreateAction继承了TransportMasterNodeOperationAction,并实现了materOperation方法。它的方法以下所示:api
protected void masterOperation(final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener) throws ElasticsearchException { String cause = request.cause(); if (cause.length() == 0) { cause = "api"; } final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index()) .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .settings(request.settings()).mappings(request.mappings()) .aliases(request.aliases()).customs(request.customs()); createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() { @Override public void onResponse(ClusterStateUpdateResponse response) { listener.onResponse(new CreateIndexResponse(response.isAcknowledged())); } @Override public void onFailure(Throwable t) { if (t instanceof IndexAlreadyExistsException) { logger.trace("[{}] failed to create", t, request.index()); } else { logger.debug("[{}] failed to create", t, request.index()); } listener.onFailure(t); } }); }
这里看上很简单,只是调用了createIndexService(它实际上是MetaDataCreateIndexService)的方法,就是修改集群matedata过程。修改前首先获取到index名称对应的lock,这样保证操做数据一致性,而后生成updatetask,交给clusterservice处理。代码以下所示:app
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) { // 获取锁,只对该索引的操做加锁,而不是整个cluster final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index()); // 若是可以获取锁离开建立索引,不然在下面启动新的线程进行 if (mdLock.tryAcquire()) { createIndex(request, listener, mdLock); return; } threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) { @Override public void doRun() throws InterruptedException { if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) { listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock")); return; } createIndex(request, listener, mdLock); } }); }
createIndex方法,会封装create请求,而后向cluster发送一个updatetask。代码以下所示:elasticsearch
private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) { ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder(); updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX); request.settings(updatedSettingsBuilder.build()); clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener)
创建索引,修改配置,增长或者修改mapping都是对集群状态修改,它们的过程都很类似,都是经过clusterService提交一个更新操做,同时附带有优先级。clusterservice会根据优先级和更新状态task的类型来进行对应的操做。以下所示:ide
public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) { if (!lifecycle.started()) { return; } try { final UpdateTask task = new UpdateTask(source, priority, updateTask);//根据优先级新建不一样的task if (updateTask instanceof TimeoutClusterStateUpdateTask) {//超时任务,这类任务须要即时返回,所以马上执行。 final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask; updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() { @Override public void run() { threadPool.generic().execute(new Runnable() { @Override public void run() { timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source())); } }); } }); } else {//其它类型,能够延迟执行,则交给线程池来执行。 updateTasksExecutor.execute(task); } } catch (EsRejectedExecutionException e) { // ignore cases where we are shutting down..., there is really nothing interesting // to be done here... if (!lifecycle.stoppedOrClosed()) { throw e; } } }
说完它们的执行过程,再来看一下create index的具体逻辑。这个逻辑在matedataservice所提交的AckedClusterStateUpdateTask中的execute方法中。整体来讲,这一过程就是将request中关于索引的配置mapping等取出来加入到当前的clustermatedata中,构造一个新的matedata的过程。这一过程仍是比较复杂,限于篇幅将在下次中进行分析。源码分析
总结:建立索引的过程就是master节点更新集群matedata的过程,为了保证数据一致性,须要获取锁。所以存在单点瓶颈。对于外部调用来讲,跟其它功能同样,外部接口调用CreateIndexAction的相关方法,而后经过TransPortCreateIndexAction讲请求发送到集群上,进行索引建立。post