本文主要分析es lucene写入流程,lucene segment的产生,flush, commit与es的refresh,flush。html
1 segment的产生api
当索引一个文档时,若是存在空闲的segment(未被其余线程锁定),则取出空闲segment list中的最后一个segment(LIFO),并锁定,将文档索引至该segment,less
找达到flush条件的segment,而后解锁,归还至空闲segment list,若是有达到flush条件的segment,flush该segment(同步执行)。异步
若是不存在,则建立新的segment,重复上述步骤。elasticsearch
总结1:若是并行的执行向一个索引,索引文档,则须要不一样的segment。post
相关代码:spa
//索引一个文档。 IndexWriter.updateDocument //索引一个文档。 DocumentsWriter.updateDocument //一个线程索引时锁定一个ThreadState对象,索引后归还至free list。 ThreadState //ThreadState的属性,一个DocumentsWriterPerThread对应一个segment,flush后,该ThreadState的dwpt为null, //下次使用该ThreadState,建立新的dwpt,新的segment。 DocumentsWriterPerThread
2 flush条件线程
索引一个文档后,找出是否有达到flush条件的segment。code
1:若是maxBufferedDocs(默认-1,es未设置)不等于-1,且当前segment在内存中的doc数量大于等于maxBufferedDocs,则标记该segment的flushPending。orm
2:若是不知足1,且ramBufferSizeMB(默认16.0,es设置为es.index.memory.max_index_buffer_size)不等于-1,当内存中当前IndexWriter全部segment之和(包括deleted docs)大于ramBufferSizeMB时,找出内存中最大的且未标记flushPending的segment,标记该segment的flushPending。
3:若是当前1,2以后,当前segment还未标记flushPending,则当前segment大于perThreadHardLimitMB(默认1945,es未设置),标记该segment的flushPending。
123以后,若是当前segment被标记,则flush当前segment。不然从flushQueue中poll一个segment,若是flushQueue(调用flush时,将全部segment加入queue)为空,则遍历segment取第一个标记flushPending的segment进行flush。
相关代码:
//查找符合flush的segment。 DocumentsWriterFlushControl.doAfterDocument //flush当前segment前,reset当前dwpt,下次使用当前ThreadState须要新的dwpt,新的segment。 DocumentsWriterFlushControl.internalTryCheckOutForFlush //flush当前segment,或者其余segment。 DocumentsWriter.postUpdate
注意:除了达到flush条件的自动flush,还能够经过调用api flush,如:
1:es refresh
2:es flush
3:es syncedFlush
3 flush
flush将内存中的segment写到文件(在调用线程中同步执行),但不执行fileChannel.force(nio,bio则fileOutputStream.flush),一部分数据可能在buffer中。
相关代码:
//flush一个segment。 DocumentsWriter.doFlush DocumentsWriterPerThread.flush DefaultIndexingChain.flush //写nvd, nvm文件。 writeNorms //写dvd, dvm文件。 writeDocValues //写dii, dim文件。 writePoints //写fdt, fdx文件(该文件在首次indexing时建立,flush时写入值)。 storedFieldsConsumer.flush //写doc, pos, tim, tip文件。 termsHash.flush //写fnm文件。 docWriter.codec.fieldInfosFormat().write //写cfs, cfe, si, liv(若是有删除)文件。 DocumentsWriterPerThread.sealFlushedSegment //删除cfs, cfe, si, liv(若是有删除)以外的文件。 IndexWriter.doAfterFlush
4 commit
commit执行fileChannel.force,将buffer中的数据写到磁盘。具体步骤为:
1:flush all segments 将内存中全部的segments写到文件。
2:依次sync pending_segments_n,segment files(fileChannel.force)将这写文件同步到磁盘。
3:将pending_segments_n重命名为segments_n,删除旧的segments_n-1。
4:若是步骤1 flush了segment,执行maybeMerge,若是达到merge条件,将会merge。
相关代码:
//commit。 IndexWriter.commit IndexWriter.commitInternal IndexWriter.prepareCommitInternal //flush segments。 DocumentsWriter.flushAllThreads //sync file。 IndexWriter.startCommit Directory.sync IOUtils.fsync FileChannel.force FileChannelImpl.force //更新commit信息segments_n,删除旧的segments_n-1。 IndexWriter.finishCommit //若是达到merge条件,将会merge。 IndexWriter.maybeMerge
5 maybeMerge
flush或者commit后,若是flush了segment,执行maybeMerge,若是达到merge条件,将执行merge(异步执行)。具体步骤为:
1:将segments按size降序排列。
2:计算total segments size 和 minimum segment size。
3:total segments size过滤掉tooBigSegment(大于max_merged_segment/2.0)的segment,并记录tooBigCount;minSegmentBytes若是小于floor_segment(默认2mb),取2mb。
4:计算allowedSegCountInt,当segments(不包含tooBigSegment)数量大于此数,将触发merge。
5:从大到小(以前的降序排列),贪心找出不大于maxMergeAtOnce个, 且size总和不大于maxMergedSegmentBytes个segments进行merge。
相关代码:
//maybeMerge。 IndexWriter.maybeMerge IndexWriter.updatePendingMerges //查找可merge的segments。 TieredMergePolicy.findMerges //执行merge。 ConcurrentMergeScheduler.merge
//控制merge线程数量 ConcurrentMergeScheduler.maybeStall
//用来异步执行merge的线程。
MergeThread
6 es refresh
主要执行lucene的flushAllThreads和maybeMerge。refresh的两个条件:
1:达到refresh_interval设置的时间间隔。
2:节点全部shard的segments占用内存(调用lucene api获取)之和达到indices.memory.index_buffer_size,找出占用最大的shard执行refresh。
相关代码:
//refresh_interval refresh。
IndexService.AsyncRefreshTask
//indices.memory.index_buffer_size refresh。
IndexingMemoryController.runUnlocked
IndexingMemoryController.writeIndexingBufferAsync
//es refresh。 InternalEngine.refresh //lucene refresh。 ReferenceManager.maybeRefreshBlocking DirectoryReader.openIfChanged StandardDirectoryReader.doOpenIfChanged IndexWriter.getReader //flush segments。 DocumentsWriter.flushAllThreads //若是flush了segment,则执行maybeMerge。 IndexWriter.maybeMerge
7 es flush
主要执行步骤为:
1:prepareCommit translog:
1.1 备份 translog.ckp到translog-1.ckp。
1.2 fsync translog-1.ckp以及translog 文件夹。
1.3 建立新的translog数据文件translog-n.tlog,更新translog.ckp(写入checkPoint)。
2:commit indexWriter(见4 commit)。
3:refresh(见6 es refresh)。
4:commit translog:删除备份的translog-1.ckp以及旧的translog数据文件translog-n-1.tlog。
相关代码:
//es flush。 InternalEngine.flush //prepareCommit translog。 Translog.prepareCommit //es commit index writer。 InternalEngine.commitIndexWriter //lucene commit。 IndexWriter.commit //es refresh。 InternalEngine.refresh //commit translog。 Translog.commit
总结2:lucene的flush是指将内存中的segment,写到磁盘但不执行fileChannel.force,一部分数据会在buffer中;commit会调用force,将buffer中的数据写到磁盘。
es的refresh调用lucene的flush;flush调用lucene的commit。
参考:
elasticsearch5.6.12,lucene6.6.1 源码
https://www.outcoldman.com/en/archive/2017/07/13/elasticsearch-explaining-merge-settings/
http://blog.mikemccandless.com/2011/02/visualizing-lucenes-segment-merges.html