ElasticSearch 写操做 剖析

ElasticSearch 写操做 剖析

在看ElasticSearch权威指南基础入门中关于:分片内部原理这一小节内容后,大体对ElasticSearch的索引、搜索底层实现有了一个初步的认识。记录一下在看文档的过程当中碰到的问题以及个人理解。此外,在文章的末尾,还讨论分布式系统中的主从复制原理,以及采用这种副本复制方案带来的数据一致性问题。html

ElasticSearch index 操做背后发生了什么?

更具体地,就是执行PUT操做向ElasticSearch添加一篇文档时,底层发生的一系列操做。java

PUT user/profile/10
{
  "content":"向user索引中添加一篇id为10的文档"
}

经过PUT请求发起了索引新文档的操做,该操做可以执行的前提是:集群中有“必定数量”的活跃 shards。这个配置由wait_for_active_shards指定。ElasticSearch关于分片有2个重要的概念:primary shard 和 replica。在定义索引的时候指定索引有几个主分片,以及每一个主分片有多少个副本。好比:node

PUT user
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 2
  },

介绍一下集群的环境:ElasticSearch6.3.2三节点集群。定义了一个user索引,该索引有三个主分片,每一个主分片2个副本。如图,每一个节点上有三个shards:一个 primary shard,二个replicagit

wait_for_active_shardsgithub

To improve the resiliency of writes to the system, indexing operations can be configured to wait for a certain number of active shard copies before proceeding with the operation. If the requisite number of active shard copies are not available, then the write operation must wait and retry, until either the requisite shard copies have started or a timeout occurs.redis

在索引一篇文档时,经过wait_for_active_shards指定有多少个活跃的shards时,才能执行索引文档的操做。默认状况下,只要primary shard 是活跃的就能够索引文档。即wait_for_active_shards值为1算法

By default, write operations only wait for the primary shards to be active before proceeding (i.e. wait_for_active_shards=1)sql

来验证一下:在只有一台节点的ElasticSearch上:三个primary shard 所有分配在一台节点中,而且存在着未分配的replica缓存

执行:数据结构

PUT user/profile/10
{
  "content":"向user索引中添加一篇id为10的文档"
}

返回结果:

{
  "_index": "user",
  "_type": "profile",
  "_id": "10",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 3,
    "successful": 1,
    "failed": 0
  },
  "_seq_no": 0,
  "_primary_term": 1
}

在_shards 中,total 为3,说明该索引操做应该在3个(一个primary shard,二个replica)分片中执行成功;可是successful为1 说明 PUT操做 在其中一个分片中执行成功了,就返回给client索引成功的确认。这个分片就是primary shard,由于只有一台节点,另外2个replica 属于 unassigned shards,不可能在2个replica 中执行成功。总之,默认状况下,只要primary shard 是活跃的,就能索引文档(index操做)

如今在单节点的集群上,修改索引配置为:wait_for_active_shards=2,这意味着一个索引操做至少要在2个分片上执行成功,才能返回给client acknowledge。

"settings": {
        "index.write.wait_for_active_shards": "2"
    }

再次向user索引中PUT 一篇文档:

PUT user/profile/10
{
  "content":"向user索引中添加一篇id为10的文档"
}

返回结果:

{
  "statusCode": 504,
  "error": "Gateway Time-out",
  "message": "Client request timeout"
}

因为是单节点的ElasticSearch,另外的2个replica没法分配,所以不多是活跃的。而咱们指定的wait_for_active_shards为2,但如今只有primary shard是活跃的,还差一个replica,所以没法进行索引操做了。

The primary shard assigned to perform the index operation might not be available when the index operation is executed. Some reasons for this might be that the primary shard is currently recovering from a gateway or undergoing relocation. By default, the index operation will wait on the primary shard to become available for up to 1 minute before failing and responding with an error.

索引操做会在1分钟以后超时。再看建立索引的org.elasticsearch.cluster.metadata.MetaDataCreateIndexService源代码涉及到两个参数:一个是wait_for_active_shards,另外一个是ackTimeout,就好理解了。

public void createIndex(final CreateIndexClusterStateUpdateRequest request,
                            final ActionListener<CreateIndexClusterStateUpdateResponse> listener) {
        onlyCreateIndex(request, ActionListener.wrap(response -> {
            if (response.isAcknowledged()) {
                activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),
                    shardsAcknowledged -> {
                        if (shardsAcknowledged == false) {
                            logger.debug("[{}] index created, but the operation timed out while waiting for " +
                                             "enough shards to be started.", request.index());
                        }

总结一下:因为文档最终是存在在某个ElasticSearch shard下面的,而每一个shard又设置了副本数。默认状况下,在进行索引文档操做时,ElasticSearch会检查活跃的分片数量是否达到wait_for_active_shards设置的值。若未达到,则索引操做会超时,超时时间为1分钟。另外,值得注意的是:检查活跃分片数量只是在开始索引数据的时候检查,若检查经过后,在索引文档的过程当中,集群中又有分片由于某些缘由挂掉了,那么并不能保证这个文档必定写入到 wait_for_active_shards 个分片中去了 。

由于索引文档操做(也即写操做)发生在 检查活跃分片数量 操做以后。试想如下几个问题:

  • 问题1:检查活跃分片数量知足 wait_for_active_shards 设置的值以后,在持续 bulk index 文档过程当中有 shard 失效了(这里的shard是replica),那 难道不能继续索引文档了?
  • 问题2:在何时检查集群中的活跃分片数量?难道要 每次client发送索引文档请求时就要检查一次吗?仍是说周期性地隔多久检查一次?
  • 问题3:这里的 check-then-act 并非原子操做,所以wait_for_active_shards这个配置参数又有多大的意义?

所以,官方文档中是这么说的:

It is important to note that this setting greatly reduces the chances of the write operation not writing to the requisite number of shard copies, but it does not completely eliminate the possibility, because this check occurs before the write operation commences. Once the write operation is underway, it is still possible for replication to fail on any number of shard copies but still succeed on the primary.

  • 该参数只是尽量地保证新文档可以写入到咱们所要求的shard数量中(reduce the chance of ....)。好比:wait_for_active_shards设置为2,那也只是尽量地保证将新文档写入了2个shard中,固然一个是primary shard,另一个是某个replica
  • check 操做发生在 write操做以前,某个doc的写操做check actives shard发现符合要求,但check完以后,某个replica挂了,只要不是primary shard,那该doc的写操做仍是会继续进行。可是在返回给用户响应中,会标识出有多少个分片失败了。实际上,ES索引一篇文档时,是要求primary shard写入该文档,而后primary shard将文件并行转发给全部的replica,当全部的replica都"写入"以后,给primary shard响应,primary shard才返回ACK给client。从这里可看出,须要等到primary shard以及全部的replica都写入文档以后,client才能收到响应。那么,wait_for_active_shards这个参数的意义何在呢?个人理解是:这个参数只是"尽量"保证doc写入到全部的分片,若是活跃的分片数量未达到wait_for_active_shards,那么写是不容许的,而达到了以后,才容许写,而又因为check-then-act并非原子操做,并不能保证doc必定是成功地写入到wait_for_active_shards个分片中去了。(总之,在正常状况下,ES client 写请求一篇doc时,该doc在primary shard上写入,而后并行转发给各个replica,在各个replica上也执行写完成(这里的完成有多是成功,也有多是失败),primary shard收到各个replica写完成的响应,才返回响应给ES client。参考这篇文章

最后,说一下wait_for_active_shards参数的取值:能够设置成 all 或者是 1到 number_of_replicas+1 之间的任何一个整数。

Valid values are all or any positive integer up to the total number of configured copies per shard in the index (which is number_of_replicas+1)

number_of_replicas 为索引指定的副本的数量,加1是指:再算上primary shard。好比前面user索引的副本数量为2,那么wait_for_active_shards最多设置为3。

好,前面讨论完了ElasticSearch可以执行索引操做(写操做)了,接下来是在写操做过程当中发生了什么?好比说ElasticSearch是如何作到近实时搜索的?在将文档写入ElasticSearch时候发生了故障,那文档会不会丢失?

因为ElasticSearch底层是Lucene,在将一篇文档写入ElasticSearch,并最终能被Client查询到,涉及到如下几个概念:倒排索引、Lucene段、提交点、translog、ElasticSearch分片。这里概念都是参考《ElasticSearch definitive guide》中相关的描述。

In Elasticsearch, the most basic unit of storage of data is a shard. But, looking through the Lucene lens makes things a bit different. Here, each Elasticsearch shard is a Lucene index, and each Lucene index consists of several Lucene segments. A segment is an inverted index of the mapping of terms to the documents containing those terms.

它们之间的关系示意图以下:

一个ElasticSearch 索引可由多个 primary shard组成,每一个primary shard至关于一个Lucene Index;一个Lucene index 由多个Segment组成,每一个Segment是一个倒排索引结构表

从文档的角度来看:文章会被analyze(好比分词),而后放到倒排索引(posting list)中。倒排索引之于ElasticSearch就至关于B+树之于Mysql,是存储引擎底层的存储结构。

当文档写入ElasticSearch时,文档首先被保存在内存索引缓存中(in-memeory indexing buffer)。而in-memory buffer是每隔1秒钟刷新一次,刷新成一个个的可搜索的段(file system cache)--下图中的绿色圆柱表示(segment),而后这些段是每隔30分钟同步到磁盘中持久化存储,段同步到磁盘的过程称为 提交 commit。(这里要注意区份内存中2个不一样的区域:一个是 indexing buffer,另外一个是file system cache。写入indexing buffer中的文档 通过 refresh 变成 file system cache中的segments,从而搜索可见)

But the new segment is written to the filesystem cache first—which is cheap—and only later is it flushed to disk—which is expensive. But once a file is in the cache, it can be opened and read, just like any other file.

在这里涉及到了两个过程:① In-memory buffer中的文档被刷新成段;②段提交 同步到磁盘 持久化存储。

过程①默认是1秒钟1次,而咱们所说的ElasticSearch是提供了近实时搜索,指的是:文档的变化并非当即对搜索可见,但会在一秒以后变为可见,一秒钟以后,咱们写入的文档就能够被搜索到了,就是由于这个缘由。另外ElasticSearch提供了 refresh API 来控制过程①。refresh操做强制把In-memory buffer中的内容刷新成段。refresh示意图以下:

好比说,你能够在每次index一篇文档以后就调用一次refresh API,也即:每索引一篇文档就强制刷新生成一个段,这会致使系统中存在大量的小段,因为一次搜索须要查找全部的segments,所以大量的小段会影响搜索性能;此外,大量的小段也意味着OS打开了大量的文件描述符,在必定程度上影响系统资源消耗。这也是为何ElasticSearch/Lucene提供了段合并操做的缘由,由于无论是1s一次refresh,仍是每次索引一篇文档时手动执行refresh,均可能致使大量的小段(small segment)产生,大量的小段是会影响性能的。
此外,当咱们讨论segments时,该segment既能够是已提交的,也能够是未提交的segment。所谓已提交的segment,就是这些segment是已经fsync到磁盘上持久化存储了的,因为已提交的segments已经持久化了,那么它们对应的translog日志也能够删除了。而未提交的segments中包含的文档是搜索可见的,可是若是宕机,就可能致使未提交的segments包含的文档丢失了,此时能够从translog恢复。

对于过程②,就是将段刷新到磁盘中去,默认是每隔30分钟一次,这个刷新过程称为提交。若是还将来得及提交时,发生了故障,那岂不是会丢失大量的文档数据?这个时候,就引入了translog

每篇文档写入到In-memroy buffer中时,同时也会向 translog中写一条记录。In-memory buffer 每秒刷新一次,刷新后生成新段,in-memory被清空,文档能够被搜索。

而translog 默认是每5秒钟刷新一次到磁盘,或者是在每次请求(index、delete、update、bulk)以后就刷新到磁盘。每5秒钟刷新一次就是异步刷新,能够经过以下方式开启:

PUT /my_index/_settings
{
    "index.translog.durability": "async",
    "index.translog.sync_interval": "5s"
}

这种方式的话,仍是有可能会丢失文档数据,好比Client发起index操做以后,ElasticSearch返回了200响应,可是因为translog要等5秒钟以后才刷新到磁盘,若是在5秒内系统宕机了,那么这几秒钟内写入的文档数据就丢失了。

而在每次请求操做(index、delete、update、bulk)执行后就刷新translog到磁盘,则是translog同步刷新,好比说:当Client PUT一个文档:

PUT user/profile/10
{
  "content":"向user索引中添加一篇id为10的文档"
}

在前面提到的三节点ElasticSearch集群中,该user索引有三个primary shard,每一个primary shard2个replica,那么translog须要在某个primary shard中刷新成功,而且在该primary shard的两个replica中也刷新成功,才会给Client返回 200 PUT成功响应。这种方式就保证了,只要Client接收到的响应是200,就意味着该文档必定是成功索引到ElasticSearch中去了。由于translog是成功持久化到磁盘以后,再给Client响应的,系统宕机后在下一次重启ElasticSearch时,就会读取translog进行恢复。

By default, Elasticsearch fsyncs and commits the translog every 5 seconds if index.translog.durability is set to async or if set to request (default) at the end of every index, delete, update, or bulk request. More precisely, if set to request, Elasticsearch will only report success of an index, delete, update, or bulk request to the client after the translog has been successfully fsynced and committed on the primary and on every allocated replica.

这也是为何,在咱们关闭ElasticSearch时最好进行一次flush操做,将段刷新到磁盘中。由于这样会清空translog,那么在重启ElasticSearch就会很快(不须要恢复大量的translog了)

translog 也被用来提供实时 CRUD 。当你试着经过ID查询、更新、删除一个文档,它会在尝试从相应的段中检索以前, 首先检查 translog 任何最近的变动。这意味着它老是可以实时地获取到文档的最新版本。

放一张总结性的图,以下:

有个问题是:为何translog能够在每次请求以后刷新到磁盘?难道不会影响性能吗?相比于将 段(segment)刷新到磁盘,刷新translog的代价是要小得多的,由于translog是通过精心设计的数据结构,而段(segment)是用于搜索的"倒排索引",咱们没法作到每次将段刷新到磁盘;而刷新translog相比于段要轻量级得多(translog 可作到顺序写disk,而且数据结构比segment要简单),所以经过translog机制来保证数据不丢失又不太影响写入性能。

Changes to Lucene are only persisted to disk during a Lucene commit, which is a relatively expensive operation and so cannot be performed after every index or delete operation.
......
All index and delete operations are written to the translog after being processed by the internal Lucene index but before they are acknowledged. In the event of a crash, recent transactions that have been acknowledged but not yet included in the last Lucene commit can instead be recovered from the translog when the shard recovers.

若是宕机,那些 已经返回给client确认但还没有 lucene commit 持久化到disk的transactions,能够从translog中恢复。

总结一下:

这里一共有三个地方有“刷新操做”,其中 refresh 的应用场景是每一个Index/Update/Delete 单个操做以后,要不要refresh一下?而flush 是针对索引而言的:要不要对 twitter 这个索引 flush 一下,使得在内存中的数据是否已经持久化到磁盘上了,这里会引起Lucene的commit,当这些数据持久化磁盘上后,相应的translog就能够删除了(由于这些数据已经持久化到磁盘,那就是可靠的了,若是发生意外宕机,须要借助translog恢复的是那些还没有来得及flush到磁盘上的索引数据)

  1. in-memory buffer 刷新 生成segment

    每秒一次,文档刷新成segment就能够被搜索到了,ElasticSearch提供了refresh API 来控制这个过程

  2. translog 刷新到磁盘

    index.translog.durability来设置,或者由index.translog.flush_threshold_size来设置当translog达到必定大小以后刷新到磁盘(默认512MB)

  3. 段(segment) 刷新到磁盘(flush)

    每30分钟一次,ElasticSearch提供了flush API 来控制这个过程。在段被刷新到磁盘(就是一般所说的commit操做)中时,也会清空刷新translog。

写操做的可靠性如何保证?

数据的可靠性保证是理解存储系统差别的重要方面。好比说,要对比MySQL与ES的异同、对比ES与redis的区别,就能够从数据的可靠性这个点入手。在MySQL里面有redo log、基于 bin log的主从复制、有double write等机制,那ES的数据可靠性保证又是如何实现的呢?
我认为在分布式系统中讨论数据可靠性须要从二个角度出发:单个副本数据的本地刷盘策略 和 写多个副本的以后什么时候向client返回响应 也即 同步复制、异步复制的问题。

  • 1 单个副本数据的本地刷盘策略
    在ES中,这个策略叫translog机制。当向ES索引一篇doc时,doc先写入in-memory buffer,同时写入translog。translog可配置成每次写入以后,就flush disk。这与MySQL写redo log日志的的原理是相似的。

  • 2 写多个副本的以后什么时候向client返回响应
    这里涉及到参数wait_for_active_shards,前面提到这个参数虽有一些"缺陷"(check-then-act),但它仍是尽量地保证一篇doc在写入primary shard而且也"写入"(写入并不表明落盘)了若干个replica以后,才返回ack给client。ES中的 wait_for_active_shards参数与Kafka中 broker配置 min.insync.replicas 原理是相似的。

存在的一些问题

这个 issue 和 这个 issue 讨论了index.write.wait_for_active_shards参数的前因后果。

以三节点ElasticSearch6.3.2集群,索引设置为3个primary shard,每一个primary shard 有2个replica 来讨论:

  • client向其中一个节点发起Index操做索引文档,这个写操做请求固然是发送到primary shard上,可是当Client收到200响应时,该文档是否已经复制到另外2个replica上?
  • Client将一篇文档成功写入到ElasticSearch了(收到了200响应),它能在replica所在的节点上 GET 到这篇文档吗?Client发起查询请求,又能查询到这篇文档吗?(注意:GET 和 Query 是不同的)
  • 前面提到,当 index 一篇文档时,primary shard 和2个replica 上的translog 要 都刷新 到磁盘,才返回 200 响应,那它是否与参数 index.write.wait_for_active_shards默认值 矛盾?由于index.write.wait_for_active_shards默认值为1,即:只要primary shard 是活跃的,就能够进行 index 操做。也就是说:当Client收到200的index成功响应,此时primary shard 已经将文档 复制 到2个replica 上了吗?这两个 replica 已经将文档刷新成 segment了吗?仍是说这两个 replica 仅仅只是 将索引该文档的 translog 刷新到磁盘上了?

ElasticSearch副本复制方式讨论

ElasticSearch索引是一个逻辑概念,囊括现实世界中的数据。好比 定义一个 user 索引存储全部的用户资料信息。索引由若干个primary shard组成,就至关于把用户资料信息 分开成 若干个部分存储,每一个primary shard存储user索引中的一部分数据。为了保证数据可靠性不丢失,能够为每一个primary shard配置副本(replica)。显然,primary shard 和它对应的replica 是不会存储在同一台机器(node)上的,由于若是该机器宕机了,那么primary shard 和 副本(replica) 都会丢失,那整个系统就丢失一部分数据了。

primary shard 和 replica 这种副本备份方案,称为主从备份。primary shard是主(single leader),replica 是 从 (multiple replica)。因为是分布式环境,可能存在多个Client同时向ElasticSearch发起索引文档的请求,这篇文档会根据 文档id 哈希到某个 primary shard,primary shard写入该文档 并分发给 replica 进行存储。因为采用了哈希,这也是为何 在定义索引的时候,须要指定primary shard个数,而且 primary shard个数一经指定后不可修改的缘由。由于primary shard个数一旦改变,哈希映射 结果就变了。而采用这种主从副本备份方案,这也是为何 索引操做(写操做、update操做) 只能由 primary shard处理,而读操做既能够从 primary shard读取,也能够从 replica 读取的缘由。相对于文档而言,primary shard是single leader,全部的文档修改操做都统一由primary shard处理,能避免一些 并发修改 冲突。可是默认状况下,ElasticSearch 副本复制方式 是异步的,也正如前面 index.write.wait_for_active_shards讨论,只要primary shard 是活跃的就能够进行索引操做,primary shard 将文档 “ 存储 ” 以后,就返回给client 响应,而后primary shard 再将该文档同步给replicas,而这就是异步副本复制方式。在ElasticSearch官方讨论论坛里面,也有关于副本复制方式的讨论:这篇文章提出了一个问题:Client向primary shard写入文档成功,primary shard 是经过何种方式将该文档同步到 replica的?

其实以为这里的 异步复制 提法 有点不许确,不过放到这里,供你们讨论参考吧

关于primary shard 将文档同步给各个replica,涉及到 in-sync replica概念,在master节点中维护了一个 in-sync 副本列表。

Since replicas can be offline, the primary is not required to replicate to all replicas. Instead, Elasticsearch maintains a list of shard copies that should receive the operation. This list is called the in-sync copies and is maintained by the master node.

当index操做发送到 primary shard时,primary shard 并行转发给 in-sync副本,等待各个in-sync副本给primary 响应。primary shard收到全部in-sync副本响应后,再给Client响应说:Index操做成功。因为副本可能会出故障,当没有in-sync副本,只有primary shard在正常工做时,此时的index操做只在primary shard上执行成功就返回给Client了,这里就存在了所谓的“单点故障”,由于primary shard所在的节点挂了,那 就会丢失 index 操做的文档了。这个时候, index.write.wait_for_active_shards 参数就起做用了,若是将 index.write.wait_for_active_shards 设置为2,那么 当没有in-sync副本,只有primary shard在正常工做时,index 操做就会被拒绝。因此,在这里,index.write.wait_for_active_shards参数起到一个避免单点故障的功能。更具体的细节可参考data-replication model

采用异步副本复制方式带来的一个问题是:读操做能读取最新写入的文档吗?若是咱们指定读请求去读primary shard(经过ElasticSearch 的路由机制),那么是能读到最新数据的。可是若是读请求是由某个 replica 接收处理,那也许就不能读取到刚才最新写入的文档了。所以,从刚才Client 读请求的角度来看,ElasticSearch能提供 哪一种程度的 一致性呢?而出现这种一致性问题的缘由在于:为了保证数据可靠性,采用了副本备份,引入了副本,致使副本和primary shard上的数据不一致,即:存在 replication lag 问题。因为这种副本复制延迟带来的问题,系统须要给Client 某种数据一致性的 保证,好比说:

  • read your own write

    Client可以读取到它本身最新写入的数据。好比用户修改了昵称,那TA访问本身的主页时,能看到本身修改了的昵称,可是TA的好友 可能 并不能当即看到 TA 修改后的昵称。好友请求的是某个 replica 上的数据,而 primary shard还将来得及把刚才修改的昵称 同步 到 replica上。

  • Monotonic reads

    单调读。每次Client读取的值,是愈来愈新的值(站在Client角度来看的)。好比说NBA篮球比赛,Client每10分钟读一次比赛结果。第10分钟读取到的是 1:1,第20分钟读到的是2:2,第30分钟读到的是3:3,假设在第40分钟时,实际比赛结果是4:4,Cleint在第40分钟读取的时候,读到的值能够是3:3 这意味着未读取到最新结果而已,读到的值也能够是4:4, 可是不能是2:2 。

  • consistent prefix reads

    符合因果关系的一种读操做。好比说,用户1 和 用户2 对话:

    用户1:你如今干吗?
    用户2:写代码

    对于Client读:应该是先读取到“你如今干吗?”,而后再读取到 “写代码。若是读取结果顺序乱了,Client就会莫名其妙。

正是因为Client 有了系统给予的这种 一致性 保证,那么Client(或者说应用程序)就能基于这种保证 来开发功能,为用户提供服务。

那系统又是如何提供这种一致性保证的呢?或者说ElasticSearch集群又提供了何种一致性保证?常常听到的有:强一致性(linearizability)、弱一致性、最终一致性。对于强一致性,通俗的理解就是:实际上数据有多份(primary shard 以及多个 replica),但在Client看来,表现得就只有一份数据。在多个 client 并发读写情形下,某个Client在修改数据A,而又有多个Client在同时读数据A,linearizability 就要保证:若是某个Client读取到了数据A,那在该Client以后的读取请求返回的结果都不能比数据A要 旧,至少是数据A的当前值(不能是数据A的旧值)。不说了,再说,我本身都不明白了。

至于系统如何提供这种一致性,会用到一些分布式共识算法,我也没有深刻地去研究过。关于副本复制方式的讨论,也可参考这篇文章:分布式系统理论之Quorum机制

参考资料

原文:http://www.javashuo.com/article/p-kwtrleat-mr.html

相关文章
相关标签/搜索