在电商场景下,工做流程为:数据库
若是是多线程操做,就可能有多个线程并发的去执行上述的3步骤流程,假如此时有两我的都来读取商品数据,两个线程并发的服务于两我的,同时在进行商品库存数据的修改。正确的状况:线程A将库存-1,设置为99件,线程B接着读取99件,再-1,变为98件。若是A,B线程都读取的为100件,A处理完以后修改成99件,B处理完以后再次修改成99件,此时结果就出错了。多线程
在读取商品数据时,同时对这一行数据加锁,当此线程处理完数据以后,再解锁,另外一个线程开始处理。并发
悲观锁并发控制方案,就是在各类状况下,都上锁。上锁以后,就只有一个线程能够操做这一条数据,不一样的场景之下,上的锁不一样,行级锁,表级锁,读锁,写锁。异步
乐观锁不加锁,每一个线程均可以任意操做。es的每条文档中有一个version字段,新建文档后为1,修改一次累加,线程A,B同时读取到数据,version=1,A处理完以后库存为99,在写入es的时候会跟es中的版本号比较,都是1,则写入成功,version=2,B处理完以后也为99,存入es时与es中的数据的版本号version=2相比,明显不一样,此时不会用99去更新,而是从新读取最新的数据,再减一,变为98,执行上述操做,写入。ide
Elasticsearch的后台都是多线程异步的,多个请求之间是乱序的,可能后修改的先到,先修改的后到。ui
Elasticsearch的多线程异步并发修改是基于本身的_version版本号进行乐观锁并发控制的。spa
在后修改的先到时,修改完毕后,当先修改的后到时,会比较一下_version版本号,若是不相等就直接扔掉,不须要了。这样结果会就会保存为一个正确状态。线程
代码示例:code
PUT /test_index/test_type/3
{
"test_field": "test test"
}
结果:
{
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
修改
PUT /test_index/test_type/3
{
"test_field": "test1 test1"
}
结果
{
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 2,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
删除
DELETE /test_index/test_type/3
结果:
{
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 3,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
从新建立
PUT /test_index/test_type/3
{
"test_field": "test1 test1"
}
结果
{
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
复制代码
删除操做也会对这条数据的版本号加1cdn
在删除一个document以后,能够从一个侧面证实,它不是当即物理删除掉的,由于它的一些版本号等信息仍是保留着的。先删除一条document,再从新建立这条document,其实会在delete version基础之上,再把version号加1
先新建一条数据
PUT /test_index/test_type/4
{
"test_field": "test"
}
复制代码
模拟两个客户端,都获取到了同一条数据
GET /test_index/test_type/4
返回
{
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 1,
"found": true,
"_source": {
"test_field": "test"
}
}
复制代码
其中一个客户端,先更新了一下这个数据, 同时带上数据的版本号,确保说,es中的数据的版本号,跟客户端中的数据的版本号是相同的,才能修改
PUT test_index/test_type/4?version=1
{
"test_field": "client1 changed"
}
返回结果
{
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
复制代码
另一个客户端,尝试基于version=1的数据去进行修改,一样带上version版本号,进行乐观锁的并发控制
PUT test_index/test_type/4?version=1
{
"test_field": "client2 changed"
}
会出错,返回
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[test_type][4]: version conflict, current version [2] is different than the one provided [1]",
"index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
"shard": "2",
"index": "test_index"
}
],
"type": "version_conflict_engine_exception",
"reason": "[test_type][4]: version conflict, current version [2] is different than the one provided [1]",
"index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
"shard": "2",
"index": "test_index"
},
"status": 409
}
复制代码
乐观锁就成功阻止并发问题
在乐观锁成功阻止并发问题以后,尝试正确的完成更新
从新进行GET请求,获得 version
GET /test_index/test_type/4
{
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 2,
"found": true,
"_source": {
"test_field": "client1 changed"
}
}
复制代码
基于最新的数据和版本号,去进行修改,修改后,带上最新的版本号,可能这个步骤会须要反复执行好几回,才能成功,特别是在多线程并发更新同一条数据很频繁的状况下
PUT /test_index/test_type/4?version=2
{
"test_field": "client2 changed"
}
返回
{
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
复制代码
es提供了一个feature,就是说,你能够不用它提供的内部_version版本号来进行并发控制,能够基于你本身维护的一个版本号来进行并发控制。
?version=1&version_type=external
复制代码
version_type=external,惟一的区别在于,_version,只有当你提供的version与es中的_version如出一辙的时候,才能够进行修改,只要不同,就报错;当version_type=external的时候,只有当你提供的version比es中的_version大的时候,才能完成修改
es,_version=1,?version=1,才能更新成功
es,_version=1,?version>1&version_type=external,才能成功,好比说?version=2&version_type=external
代码示例:
先建立一条数据
PUT test_index/test_type/5
{
"test_field": "external test"
}
返回
{
"_index": "test_index",
"_type": "test_type",
"_id": "5",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
复制代码
模拟两个客户端同时查询到这条数据
GET /test_index/test_type/5
返回
{
"_index": "test_index",
"_type": "test_type",
"_id": "5",
"_version": 1,
"found": true,
"_source": {
"test_field": "external test"
}
}
复制代码
第一个客户端先进行修改,此时客户端程序是在本身的数据库中获取到了这条数据的最新版本号,好比说是2
PUT /test_index/test_type/5?version=2&version_type=external
{
"test_field": "external client1 changed"
}
返回
{
"_index": "test_index",
"_type": "test_type",
"_id": "5",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
复制代码
模拟第二个客户端,同时拿到了本身数据库中维护的那个版本号,也是2,同时基于version=2发起了修改
PUT /test_index/test_type/5?version=2&version_type=external
{
"test_field": "external client2 changed"
}
会出错,返回
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[test_type][5]: version conflict, current version [2] is higher or equal to the one provided [2]",
"index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
"shard": "1",
"index": "test_index"
}
],
"type": "version_conflict_engine_exception",
"reason": "[test_type][5]: version conflict, current version [2] is higher or equal to the one provided [2]",
"index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
"shard": "1",
"index": "test_index"
},
"status": 409
}
复制代码
在并发控制成功后,从新基于最新的版本号发起更新
GET /test_index/test_type/5
返回
{
"_index": "test_index",
"_type": "test_type",
"_id": "5",
"_version": 2,
"found": true,
"_source": {
"test_field": "external client1 changed"
}
}
PUT /test_index/test_type/5?version=3&version_type=external
{
"test_field": "external client2 changed"
}
返回
{
"_index": "test_index",
"_type": "test_type",
"_id": "5",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}复制代码