第二十一讲!node
一、上机动手实战演练基于_version进行乐观锁并发控制mysql
(1)先构造一条数据出来算法
PUT /test_index/test_type/7
{
"test_field": "test test"
}sql
(2)模拟两个客户端,都获取到了同一条数据数据库
GET test_index/test_type/7json
{
"_index": "test_index",
"_type": "test_type",
"_id": "7",
"_version": 1,
"found": true,
"_source": {
"test_field": "test test"
}
}api
(3)其中一个客户端,先更新了一下这个数据网络
同时带上数据的版本号,确保说,es中的数据的版本号,跟客户端中的数据的版本号是相同的,才能修改数据结构
PUT /test_index/test_type/7?version=1
{
"test_field": "test client 1"
}多线程
{
"_index": "test_index",
"_type": "test_type",
"_id": "7",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
(4)另一个客户端,尝试基于version=1的数据去进行修改,一样带上version版本号,进行乐观锁的并发控制
PUT /test_index/test_type/7?version=1
{
"test_field": "test client 2"
}
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "3",
"index": "test_index"
}
],
"type": "version_conflict_engine_exception",
"reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "3",
"index": "test_index"
},
"status": 409
}
(5)在乐观锁成功阻止并发问题以后,尝试正确的完成更新
GET /test_index/test_type/7
{
"_index": "test_index",
"_type": "test_type",
"_id": "7",
"_version": 2,
"found": true,
"_source": {
"test_field": "test client 1"
}
}
基于最新的数据和版本号,去进行修改,修改后,带上最新的版本号,可能这个步骤会须要反复执行好几回,才能成功,特别是在多线程并发更新同一条数据很频繁的状况下
PUT /test_index/test_type/7?version=2
{
"test_field": "test client 2"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "7",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
带上版本号更新
乐观锁生效
基于最新的数据和版本号去更新修改
第二十二讲!
课程大纲
一、上机动手实战演练基于external version进行乐观锁并发控制
external version
es提供了一个feature,就是说,你能够不用它提供的内部_version版本号来进行并发控制,能够基于你本身维护的一个版本号来进行并发控制。举个列子,加入你的数据在mysql里也有一份,而后你的应用系统自己就维护了一个版本号,不管是什么本身生成的,程序控制的。这个时候,你进行乐观锁并发控制的时候,可能并非想要用es内部的_version来进行控制,而是用你本身维护的那个version来进行控制。
?version=1
?version=1&version_type=external
version_type=external,惟一的区别在于,_version,只有当你提供的version与es中的_version如出一辙的时候,才能够进行修改,只要不同,就报错;当version_type=external的时候,只有当你提供的version比es中的_version大的时候,才能完成修改
es,_version=1,?version=1,才能更新成功
es,_version=1,?version>1&version_type=external,才能成功,好比说?version=2&version_type=external
(1)先构造一条数据
PUT /test_index/test_type/8
{
"test_field": "test"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
(2)模拟两个客户端同时查询到这条数据
GET /test_index/test_type/8
{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 1,
"found": true,
"_source": {
"test_field": "test"
}
}
(3)第一个客户端先进行修改,此时客户端程序是在本身的数据库中获取到了这条数据的最新版本号,好比说是2
PUT /test_index/test_type/8?version=2&version_type=external
{
"test_field": "test client 1"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
(4)模拟第二个客户端,同时拿到了本身数据库中维护的那个版本号,也是2,同时基于version=2发起了修改
PUT /test_index/test_type/8?version=2&version_type=external
{
"test_field": "test client 2"
}
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "1",
"index": "test_index"
}
],
"type": "version_conflict_engine_exception",
"reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "1",
"index": "test_index"
},
"status": 409
}
(5)在并发控制成功后,从新基于最新的版本号发起更新
GET /test_index/test_type/8
{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 2,
"found": true,
"_source": {
"test_field": "test client 1"
}
}
PUT /test_index/test_type/8?version=3&version_type=external
{
"test_field": "test client 2"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
第二十三讲!
课程大纲
一、什么是partial update?
PUT /index/type/id,建立文档&替换文档,就是同样的语法
通常对应到应用程序中,每次的执行流程基本是这样的:
(1)应用程序先发起一个get请求,获取到document,展现到前台界面,供用户查看和修改
(2)用户在前台界面修改数据,发送到后台
(3)后台代码,会将用户修改的数据在内存中进行执行,而后封装好修改后的全量数据
(4)而后发送PUT请求,到es中,进行全量替换
(5)es将老的document标记为deleted,而后从新建立一个新的document
partial update
post /index/type/id/_update
{
"doc": {
"要修改的少数几个field便可,不须要全量的数据"
}
}
看起来,好像就比较方便了,每次就传递少数几个发生修改的field便可,不须要将全量的document数据发送过去
二、图解partial update实现原理以及其优势
partial update,看起来很方便的操做,实际内部的原理是什么样子的,而后它的优势是什么
三、上机动手实战演练partial update
PUT /test_index/test_type/10
{
"test_field1": "test1",
"test_field2": "test2"
}
POST /test_index/test_type/10/_update
{
"doc": {
"test_field2": "updated test2"
}
}
查询 修改和写回都发生在shard内部
第二十四讲!
课程大纲
es,实际上是有个内置的脚本支持的,能够基于groovy脚本实现各类各样的复杂操做
基于groovy脚本,如何执行partial update
es scripting module,咱们会在高手进阶篇去讲解,这里就只是初步讲解一下
PUT /test_index/test_type/11
{
"num": 0,
"tags": []
}
(1)内置脚本
POST /test_index/test_type/11/_update
{
"script" : "ctx._source.num+=1"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "11",
"_version": 2,
"found": true,
"_source": {
"num": 1,
"tags": []
}
}
(2)外部脚本
ctx._source.tags+=new_tag
POST /test_index/test_type/11/_update
{
"script": {
"lang": "groovy",
"file": "test-add-tags",
"params": {
"new_tag": "tag1"
}
}
}
(3)用脚本删除文档
ctx.op = ctx._source.num == count ? 'delete' : 'none'
POST /test_index/test_type/11/_update
{
"script": {
"lang": "groovy",
"file": "test-delete-document",
"params": {
"count": 1
}
}
}
(4)upsert操做
POST /test_index/test_type/11/_update
{
"doc": {
"num": 1
}
}
{
"error": {
"root_cause": [
{
"type": "document_missing_exception",
"reason": "[test_type][11]: document missing",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "4",
"index": "test_index"
}
],
"type": "document_missing_exception",
"reason": "[test_type][11]: document missing",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "4",
"index": "test_index"
},
"status": 404
}
若是指定的document不存在,就执行upsert中的初始化操做;若是指定的document存在,就执行doc或者script指定的partial update操做
POST /test_index/test_type/11/_update
{
"script" : "ctx._source.num+=1",
"upsert": {
"num": 0,
"tags": []
}
}
第二十五讲!
课程大纲
(1)partial update内置乐观锁并发控制
(2)retry_on_conflict
(3)_version
post /index/type/id/_update?retry_on_conflict=5&version=6
retry策略:
再次获取 document数据和最新版本号
第二十六讲!
课程大纲
一、批量查询的好处
就是一条一条的查询,好比说要查询100条数据,那么就要发送100次网络请求,这个开销仍是很大的
若是进行批量查询的话,查询100条数据,就只要发送1次网络请求,网络请求的性能开销缩减100倍
二、mget的语法
(1)一条一条的查询
GET /test_index/test_type/1
GET /test_index/test_type/2
(2)mget批量查询
GET /_mget
{
"docs" : [
{
"_index" : "test_index",
"_type" : "test_type",
"_id" : 1
},
{
"_index" : "test_index",
"_type" : "test_type",
"_id" : 2
}
]
}
{
"docs": [
{
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 2,
"found": true,
"_source": {
"test_field1": "test field1",
"test_field2": "test field2"
}
},
{
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 1,
"found": true,
"_source": {
"test_content": "my test"
}
}
]
}
(3)若是查询的document是一个index下的不一样type种的话
GET /test_index/_mget
{
"docs" : [
{
"_type" : "test_type",
"_id" : 1
},
{
"_type" : "test_type",
"_id" : 2
}
]
}
(4)若是查询的数据都在同一个index下的同一个type下,最简单了
GET /test_index/test_type/_mget
{
"ids": [1, 2]
}
三、mget的重要性
能够说mget是很重要的,通常来讲,在进行查询的时候,若是一次性要查询多条数据的话,那么必定要用batch批量操做的api
尽量减小网络开销次数,可能能够将性能提高数倍,甚至数十倍,很是很是之重要
第二十七讲!
课程大纲
一、bulk语法
POST /_bulk
{ "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }}
{ "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }}
{ "test_field": "test12" }
{ "index": { "_index": "test_index", "_type": "test_type", "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }
每个操做要两个json串,语法以下:
{"action": {"metadata"}}
{"data"}
举例,好比你如今要建立一个文档,放bulk里面,看起来会是这样子的:
{"index": {"_index": "test_index", "_type", "test_type", "_id": "1"}}
{"test_field1": "test1", "test_field2": "test2"}
有哪些类型的操做能够执行呢?
(1)delete:删除一个文档,只要1个json串就能够了
(2)create:PUT /index/type/id/_create,强制建立
(3)index:普通的put操做,能够是建立文档,也能够是全量替换文档
(4)update:执行的partial update操做
bulk api对json的语法,有严格的要求,每一个json串不能换行,只能放一行,同时一个json串和一个json串之间,必须有一个换行
{
"error": {
"root_cause": [
{
"type": "json_e_o_f_exception",
"reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 3]"
}
],
"type": "json_e_o_f_exception",
"reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@5a5932cd; line: 1, column: 3]"
},
"status": 500
}
{
"took": 41,
"errors": true,
"items": [
{
"delete": {
"found": true,
"_index": "test_index",
"_type": "test_type",
"_id": "10",
"_version": 3,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200
}
},
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true,
"status": 201
}
},
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[test_type][2]: version conflict, document already exists (current version [1])",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "2",
"index": "test_index"
}
}
},
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true,
"status": 201
}
},
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false,
"status": 200
}
},
{
"update": {
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200
}
}
]
}
bulk操做中,任意一个操做失败,是不会影响其余的操做的,可是在返回结果里,会告诉你异常日志
POST /test_index/_bulk
{ "delete": { "_type": "test_type", "_id": "3" }}
{ "create": { "_type": "test_type", "_id": "12" }}
{ "test_field": "test12" }
{ "index": { "_type": "test_type" }}
{ "test_field": "auto-generate id test" }
{ "index": { "_type": "test_type", "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }
POST /test_index/test_type/_bulk
{ "delete": { "_id": "3" }}
{ "create": { "_id": "12" }}
{ "test_field": "test12" }
{ "index": { }}
{ "test_field": "auto-generate id test" }
{ "index": { "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }
二、bulk size最佳大小
bulk request会加载到内存里,若是太大的话,性能反而会降低,所以须要反复尝试一个最佳的bulk size。通常从1000~5000条数据开始,尝试逐渐增长。另外,若是看大小的话,最好是在5~15MB之间。
第二十八讲!
课程大纲
一、阶段性总结
1~8讲:快速入门了一下,最基本的原理,最基本的操做
9~13讲:在入门以后,对ES的分布式的基本原理,进行了相对深刻一些的剖析
14~27讲:围绕着document这个东西,进行操做,进行讲解和分析
二、什么是distributed document store
到目前为止,你以为你在学什么东西,给你们一个直观的感受,好像已经知道了es是分布式的,包括一些基本的原理,而后花了很多时间在学习document自己相关的操做,增删改查。一句话点出来,给你们概括总结一下,其实咱们应该思考一下,es的一个最最核心的功能,已经被咱们相对完整的讲完了。
Elasticsearch在跑起来之后,其实起到的第一个最核心的功能,就是一个分布式的文档数据存储系统。ES是分布式的。文档数据存储系统。文档数据,存储系统。
文档数据:es能够存储和操做json文档类型的数据,并且这也是es的核心数据结构。
存储系统:es能够对json文档类型的数据进行存储,查询,建立,更新,删除,等等操做。其实已经起到了一个什么样的效果呢?其实ES知足了这些功能,就能够说已是一个NoSQL的存储系统了。
围绕着document在操做,其实就是把es当成了一个NoSQL存储引擎,一个能够存储文档类型数据的存储系统,在操做里面的document。
es能够做为一个分布式的文档存储系统,因此说,咱们的应用系统,是否是就能够基于这个概念,去进行相关的应用程序的开发了。
什么类型的应用程序呢?
(1)数据量较大,es的分布式本质,能够帮助你快速进行扩容,承载大量数据
(2)数据结构灵活多变,随时可能会变化,并且数据结构之间的关系,很是复杂,若是咱们用传统数据库,那是否是很坑,由于要面临大量的表
(3)对数据的相关操做,较为简单,好比就是一些简单的增删改查,用咱们以前讲解的那些document操做就能够搞定
(4)NoSQL数据库,适用的也是相似于上面的这种场景
举个例子,好比说像一些网站系统,或者是普通的电商系统,博客系统,面向对象概念比较复杂,可是做为终端网站来讲,没什么太复杂的功能,就是一些简单的CRUD操做,并且数据量可能还比较大。这个时候选用ES这种NoSQL型的数据存储,比传统的复杂的功能务必强大的支持SQL的关系型数据库,更加合适一些。不管是性能,仍是吞吐量,可能都会更好。
第二十九讲!
课程大纲
(1)document路由到shard上是什么意思?
数据路由:当客户端建立document的时候,es此时须要决定将这个document放到index的哪一个shard上面,这个过程就叫作document routing,数据路由
(2)路由算法:shard = hash(routing) % number_of_primary_shards
举个例子,一个index有3个primary shard,P0,P1,P2
每次增删改查一个document的时候,都会带过来一个routing number,默认就是这个document的_id(多是手动指定,也多是自动生成)
routing = _id,假设_id=1
会将这个routing值,传入一个hash函数中,产出一个routing值的hash值,hash(routing) = 21
而后将hash函数产出的值对这个index的primary shard的数量求余数,21 % 3 = 0
就决定了,这个document就放在P0上。
决定一个document在哪一个shard上,最重要的一个值就是routing值,默认是_id,也能够手动指定,相同的routing值,每次过来,从hash函数中,产出的hash值必定是相同的
不管hash值是几,不管是什么数字,对number_of_primary_shards求余数,结果必定是在0~number_of_primary_shards-1之间这个范围内的。0,1,2。
(3)_id or custom routing value
默认的routing就是_id
也能够在发送请求的时候,手动指定一个routing value,好比说put /index/type/id?routing=user_id
手动指定routing value是颇有用的,能够保证说,某一类document必定被路由到一个shard上去,那么在后续进行应用级别的负载均衡,以及提高批量读取的性能的时候,是颇有帮助的
(4)primary shard数量不可变的谜底
shard不可变是由于hash 路由算法都固定了 要是primary shard 的数量多了document的获取结果可能出错 找不到对应的document
第三十讲!
document的crud内部实现原理
课程大纲
(增删改操做只能有primary shard处理 不能有replica shard 处理 先由primary shard 处理 而后将操做同步到对应的replica shard)
分清楚 node 、shard的区别
(1)客户端选择一个node发送请求过去,这个node就是coordinating node(协调节点)
(2)coordinating node,对document进行路由,将请求转发给对应的node(有primary shard)
(3)实际的node上的primary shard处理请求,而后将数据同步到replica node
(4)coordinating node,若是发现primary node和全部replica node都搞定以后,就返回响应结果给客户端