众所周知,Elasticsearch是⼀个实时的分布式搜索引擎,为⽤户提供搜索服务。当咱们决定存储某种数据,在建立索引的时候就须要将数据结构,即Mapping肯定下来,于此同时索引的设定和不少固定配置将不能改变。node
那若是后续业务发生变化,须要改变数据结构或者更换ES更换分词器怎么办呢?为此,Elastic团队提供了不少经过辅助⼯具来帮助开发⼈员进⾏重建索引的方案。
若是对 reindex
API 不熟悉,那么在遇到重构的时候,必然事倍功半,效率低下。反之,就能够方便地进行索引重构,省时省力。程序员
假设以前咱们已经存在一个blog索引,由于更换分词器须要对该索引中的数据进行重建索引,以便支持业务使用新的分词规则搜索数据,而且尽量使这个变化对外服务没有感知,大概分为如下几个步骤:微信
blog_lastest
,Mapping数据结构与blog
索引一致blog
数据同步至blog_lastest
blog
索引blog_lastest
添加别名blog
在这里推荐一个ES管理工具Kibana,主要针对数据的探索、可视化和分析。数据结构
put /blog_lastest/ { "mappings":{ "properties":{ "title":{ "type":"text", "analyzer":"ik_max_word" }, "author":{ "type":"keyword", "fields":{ "seg":{ "type":"text", "analyzer":"ik_max_word" } } } } } }
接⼝将会在 reindex 结束后返回app
POST /_reindex { "source": { "index": "blog" }, "dest": { "index": "blog_lastest" } }
在 kibana
中的使用以下所示
异步
固然高版本(7.1.1)中,ES都有提供对应的Java REST Client
,好比分布式
ReindexRequest reindexRequest = new ReindexRequest(); reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest"); TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);
为了防止赘述,接下来举例所有以kibana
中请求介绍,若是有须要用Java REST Client
,能够自行去ES官网查看。工具
若是 reindex 时间过⻓,建议加上 wait_for_completion=false
的参数条件,这样 reindex 将直接返回 taskId
。oop
POST /_reindex?wait_for_completion=false { "source": { "index": "blog" }, "dest": { "index": "blog_lastest" } }
返回:测试
{ "task" : "dpBihNSMQfSlboMGlTgCBA:4728038" }
op_type
参数控制着写入数据的冲突处理方式,若是把 op_type
设置为 create
【默认值】,在 _reindex
API 中,表示写入时只在 dest
index
中添加不存在的 doucment,若是相同的 document 已经存在,则会报 version confilct
的错误,那么索引操做就会失败。【这种方式与使用 _create API 时效果一致】
POST _reindex { "source": { "index": "blog" }, "dest": { "index": "blog_lastest", "op_type": "create" } }
若是这样设置了,也就不存在更新数据的场景了【冲突数据没法写入】,咱们也能够把 op_type
设置为 index
,表示全部的数据所有从新索引建立。
默认状况下,当发生 version conflict
的时候,_reindex
会被 abort
,任务终止【此时数据尚未 reindex
完成】,在返回体中的 failures
指标中会包含冲突的数据【有时候数据会很是多】,除非把 conflicts
设置为 proceed
。
关于 abort
的说明,若是产生了 abort
,已经执行的数据【例如更新写入的】仍然存在于目标索引,此时任务终止,还会有数据没有被执行,也就是漏数了。换句话说,该执行过程不会回滚,只会终止。若是设置了 proceed
,任务在检测到数据冲突的状况下,不会终止,会跳过冲突数据继续执行,直到全部数据执行完成,此时不会漏掉正常的数据,只会漏掉有冲突的数据。
POST _reindex { "source": { "index": "blog" }, "dest": { "index": "blog_lastest", "op_type": "create" }, "conflicts": "proceed" }
咱们能够故意把 op_type
设置为 create
,人为制造数据冲突的场景,测试时更容易观察到冲突现象。
若是把 conflicts
设置为 proceed
,在返回体结果中不会再出现 failures
的信息,可是经过 version_conflicts
指标能够看到具体的数量。
当你发现reindex
的速度有些慢的时候,能够在 query
参数的同一层次【即 source
参数中】添加 size
参数,表示 scroll size
的大小【会影响批次的次数,进而影响总体的速度】,若是不显式设置,默认是一批 1000 条数据,在一开始的简单示例中也看到了。
以下,设置 scroll size
为 5000:
POST /_reindex?wait_for_completion=false { "source": { "index": "blog", "size":5000 }, "dest": { "index": "blog_lastest", "op_type": "create" }, "conflicts": "proceed" }
测试后,速度达到了 30 分钟 500 万左右,明显提高了不少。
通常来讲,若是咱们的 source index
很大【好比几百万数据量】,则可能须要比较长的时间来完成 _reindex
的工做,可能须要几十分钟。而在此期间不可能一直等待结果返回,能够去作其它事情,若是中途须要查看进度,能够经过 _tasks
API 进行查看。
GET /_tasks/{taskId}
返回:
{ "completed" : false, "task" : { "node" : "dpBihNSMQfSlboMGlTgCBA", "id" : 4704218, "type" : "transport", "action" : "indices:data/write/reindex", …… }
当执行完毕时,completed
为true
查看任务进度以及取消任务,除了根据taskId查看之外,咱们还能够经过查看全部的任务中筛选本次reindex
的任务。
GET _tasks?detailed=true&actions=*reindex
返回结果:
{ "nodes" : { "dpBihNSMQfSlboMGlTgCBA" : { "name" : "node-16111-9210", "transport_address" : "192.168.XXX.XXX:9310", "host" : "192.168.XXX.XXX", "ip" : "192.168.16.111:9310", "roles" : [ "ingest", "master" ], "attributes" : { "xpack.installed" : "true", "transform.node" : "false" }, "tasks" : { "dpBihNSMQfSlboMGlTgCBA:6629305" : { "node" : "dpBihNSMQfSlboMGlTgCBA", "id" : 6629305, "type" : "transport", "action" : "indices:data/write/reindex", "status" : { "total" : 8361421, "updated" : 0, "created" : 254006, "deleted" : 0, "batches" : 743, "version_conflicts" : 3455994, "noops" : 0, "retries" : { "bulk" : 0, "search" : 0 }, "throttled_millis" : 0, "requests_per_second" : -1.0, "throttled_until_millis" : 0 }, "description" : "reindex from [blog] to [blog_lastest][_doc]", "start_time_in_millis" : 1609338953464, "running_time_in_nanos" : 1276738396689, "cancellable" : true, "headers" : { } } } } } }
注意观察里面的几个重要指标,例如从 description
中能够看到任务描述,从 tasks 中能够找到任务的 id
【例如 dpBihNSMQfSlboMGlTgCBA:6629305
】,从 cancellable
能够判断任务是否支持取消操做。
这个 API 其实就是模糊匹配,同理也能够查询其它类型的任务信息,例如使用 GET _tasks?detailed=true&actions=*byquery
查看查询请求的状态。
当集群的任务太多时咱们就能够根据task_id
,也就是上面提到GET /_tasks/task_id
方式更加准确地查询指定任务的状态,避免集群的任务过多,不方便查看。
若是遇到操做失误的场景,想取消任务,有没有办法呢?
固然有啦,虽然覆水难收,经过调用
_tasks API
:
POST _tasks/task_id/_cancel
这里的 task_id
就是经过上面的查询任务接口获取的任务id(任务要支持取消操做,即【cancellable 为 true】时方能收效)。
当咱们经过 API 查询发现任务完成后,就能够进行后续操做,我这里是要删除旧索引,而后再给新索引发别名,用于替换旧索引,这样才能保证对外服务没有任何感知。
DELETE /blog
POST /_aliases { "actions":[ { "add":{ "index":"blog_lastest", "alias":"blog" } } ] }
进行过以上操做后,咱们可使用一个简单的搜索验证服务。
POST /blog/_search { "query": { "match": { "author": "james" } } }
若是搜索结果达到咱们的预期目标,至此,数据索引重建迁移完成。
本文可转载,但需声明原文出处。 程序员小明,一个不多加班的程序员。欢迎关注微信公众号“程序员小明”,获取更多优质文章。