elasticsearch学习笔记(十三)——Elasticsearch乐观锁并发控制实战

一、elasticsearch基于_version(新版本更新为if_seq_no和if_primary_term)进行乐观锁的并发控制

(1)先构造一条数据html

PUT /test_index/_doc/1
{
  "test_field":"test test"
}

(2)模拟两个客户端都获取到同一条数据数据库

GET /test_index/_doc/1

(3)其中一个客户端先更新了一下这个数据多线程

PUT /test_index/_doc/1?version=1
{
    "test_field": "test client 1"
}

(4)另一个客户端,尝试基于version=1的数据去修改,一样带上version版本号,进行乐观锁的并发控制并发

PUT /test_index/_doc/1?version=1
{
    "test_field": "test client 2"
}

此时就会报错version conflict
(5)在乐观锁成功阻止并发问题以后,尝试正确的完成更新elasticsearch

GET /test_index/_doc/1

对于更新失败的客户端,查询出最新的版本号以后,基于最新的数据和版本号去再次进行修改,修改时,带上最新的版本号,可能这个步骤会须要反复执行好几回,才能成功,特别是在多线程并发更新同一条数据很频繁的状况下ide

注意特别说明,要完成上面的实验须要es版本低于6.7。
version Deprecated in 6.7.0. Please use if_seq_no & if_primary_term instead. See Optimistic concurrency control for more details.
这段说得很清楚,高版本是使用if_seg_no和if_primary_term这两个参数实现的。
下面把上面的实现用高版本作一下。我用的版本是7.0的
(1)先构造一条数据高并发

PUT /test_index/_doc/1
{
  "test_field":"test test"
}
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 3,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 2,
  "_primary_term" : 1
}

(2)模拟两个客户端,都获取到了同一条数据性能

GET /test_index/_doc/1
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 3,
  "_seq_no" : 2,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "test_field" : "test test"
  }
}

(3)其中一个客户端,先更新了一下这个数据ui

PUT /test_index/_doc/1?if_seq_no=2&if_primary_term=1
{
  "test_field":"test client 1"
}
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 4,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 3,
  "_primary_term" : 1
}

(4)另一个客户端,尝试基于if_seq_no和if_primary_term的数据进行修改,进行乐观锁的并发控制线程

PUT /test_index/_doc/1?if_seq_no=2&if_primary_term=1
{
  "test_field":"test client 2"
}
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[1]: version conflict, required seqNo [2], primary term [1]. current document has seqNo [3] and primary term [1]",
        "index_uuid": "0jAS2GP1TPG5J8PlqGdYIQ",
        "shard": "4",
        "index": "test_index"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[1]: version conflict, required seqNo [2], primary term [1]. current document has seqNo [3] and primary term [1]",
    "index_uuid": "0jAS2GP1TPG5J8PlqGdYIQ",
    "shard": "4",
    "index": "test_index"
  },
  "status": 409
}

(5)在乐观锁成功阻止并发问题以后,尝试正确的完成更新

GET /test_index/_doc/1
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 4,
  "_seq_no" : 3,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "test_field" : "test client 1"
  }
}

基于最新的数据和if_seq_no,if_primary_term进行修改,可能这个过程会须要反复执行好几回,才能成功,特别是在多线程并发更新同一条数据很频繁的状况下

PUT /test_index/_doc/1?if_seq_no=3&if_primary_term=1
{
  "test_field":"test client 2"
}
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 5,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 4,
  "_primary_term" : 1
}
GET /test_index/_doc/1
{
  "_index" : "test_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 5,
  "_seq_no" : 4,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "test_field" : "test client 2"
  }
}

对于if_seq_no和if_primary_term,官方文档已经有比较详细的叙述,https://www.elastic.co/guide/... 。这里我说下简单的理解方式,对于if_primary_term记录的就是具体的哪一个主分片,而if_seq_no这个参数起的做用和旧版本中的_version是同样的,之因此加上if_primary_term这个参数主要是提升并发的性能以及更天然,由于每一个document都只会在某一个主分片中,因此由所在主分片分配序列号比由以前经过一个参数_version,至关于由整个ES集群分配版本号要来的更好。
To ensure an older version of a document doesn’t overwrite a newer version, every operation performed to a document is assigned a sequence number by the primary shard that coordinates that change. The sequence number is increased with each operation and thus newer operations are guaranteed to have a higher sequence number than older operations. Elasticsearch can then use the sequence number of operations to make sure a newer document version is never overridden by a change that has a smaller sequence number assigned to it.
简单翻译就是为确保较旧版本的文档不会覆盖较新版本,对文档执行的每一个操做都会由协调该更改的主分片分配序列号。每次操做都会增长序列号,所以保证较新的操做具备比旧操做更高的序列号。而后,Elasticsearch可使用序列号操做来确保更新的文档版本永远不会被分配给它的序列号更小的更改覆盖。

二、基于external version进行乐观锁的并发控制(6.7版本已移除)

具体的实战就不作了,本质思想也很简单,就是版本号是存储在本身的数据库中的,能够由开发人员本身控制。可是在6.7版本以后,就移除这个功能,主要是由于:The update API does not support versioning other than internalExternal (version types external and external_gte) or forced (version type force) versioning is not supported by the update API as it would result in Elasticsearch version numbers being out of sync with the external system. 更新API不支持外部(版本类型external和external_gte)或强制(版本类型force)版本控制,由于它会致使Elasticsearch版本号与外部系统不一样步

相关文章
相关标签/搜索