#Elasticsearch深刻:Reindex API

从本地重建索引

Reindex不会尝试设置目标索引。它不会复制源索引的设置信息。您应该在运行_reindex操做以前设置目标索引,包括设置映射,分片数,副本等。html

_reindex的最基本形式只是将文档从一个索引复制到另外一个索引。下面将文档从twitter索引复制到new_twitter索引中:node

POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

这将会返回相似如下的信息:
{
  "took" : 147,
  "timed_out": false,
  "created": 120,
  "updated": 0,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "total": 120,
  "failures" : [ ]
}

_update_by_query同样,_reindex获取源索引的快照,但其目标索引必须是不一样的索引,所以不会发生版本冲突。 dest元素能够像索引API同样进行配置,以控制乐观并发控制。只需将version_type 空着(像上面同样)或将version_type设置为internal则Elasticsearch强制性的将文档转储到目标中,覆盖具备相同类型和ID的任何内容:git

POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "internal"
  }
}
将version_type设置为external将致使Elasticsearch从源文件中保留版本,建立缺失的全部文档,并更新在目标索引中比源索引中版本更老的全部文档:

 
POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "external"
  }
}
设置op_type为create将致使_reindex仅在目标索引中建立缺乏的文档。全部存在的文档将致使版本冲突:

POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "op_type": "create"
  }
} 

默认状况下,版本冲突将停止_reindex进程,但您能够经过请求体设置"conflict":"proceed"来在冲突时进行计数:

POST _reindex
{
  "conflicts": "proceed",
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "op_type": "create"
  }
}
您能够经过向source添加type或添加query来限制文档。下面会将kimchy发布的tweet复制到new_twitter中:

POST _reindex
{
  "source": {
    "index": "twitter",
    "type": "tweet",
    "query": {
      "term": {
        "user": "kimchy"
      }
    }
  },
  "dest": {
    "index": "new_twitter"
  }
}

source中的indextype均可以是一个列表,容许您在一个请求中从大量的来源进行复制。下面将从twitterblog索引中的tweetpost类型中复制文档。它也包含twitter索引中post类型以及blog索引中的tweet类型。若是你想更具体,你将须要使用query。它也没有努力处理ID冲突。目标索引将保持有效,但因为迭代顺序定义不正确,预测哪一个文档能够保存下来是不容易的。json

POST _reindex
{
  "source": {
    "index": ["twitter", "blog"],
    "type": ["tweet", "post"]
  },
  "dest": {
    "index": "all_together"
  }
}

还能够经过设置大小限制处理的文档的数量。下面只会将单个文档从twitter复制到new_twitterapi

POST _reindex
{
  "size": 1,
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
}

若是你想要从twitter索引得到一个特定的文档集合你须要排序。排序使滚动效率更低,但在某些状况下它是值得的。若是可能,更喜欢更多的选择性查询sizesort。这将从twitter复10000个文档到new_twitter数组

POST _reindex
{
  "size": 10000,
  "source": {
    "index": "twitter",
    "sort": { "date": "desc" }
  },
  "dest": {
    "index": "new_twitter"
  }
} 

source部分支持搜索请求中支持的全部元素。例如,只使用原始文档的一部分字段,使用源过滤以下所示:

POST _reindex
{
  "source": {
    "index": "twitter",
    "_source": ["user", "tweet"]
  },
  "dest": {
    "index": "new_twitter"
  }
}

update_by_query同样,_reindex支持修改文档的脚本。与_update_by_query不一样,脚本容许修改文档的元数据。此示例修改了源文档的版本:服务器

POST _reindex
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter",
    "version_type": "external"
  },
  "script": {
    "inline": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}",
    "lang": "painless"
  }
} 

就像在_update_by_query中同样,您能够设置ctx.op来更改在目标索引上执行的操做:

noop
若是您的脚本决定没必要进行任何更改,请设置 ctx.op ="noop" 。这将致使_update_by_query 从其更新中忽略该文档。这个没有操做将被报告在响应体的 noop 计数器上。

delete
若是您的脚本决定必须删除该文档,请设置ctx.op="delete"。删除将在响应体的 deleted 计数器中报告。

将ctx.op设置为其余任何内容都是错误。在ctx中设置任何其余字段是一个错误。

想一想可能性!只要当心点,有很大的力量...你能够改变:

_id
_type
_index
_version
_routing
_parent

将_version设置为null或从ctx映射清除就像在索引请求中不发送版本同样。这将致使目标索引中的文档被覆盖,不管目标版本或_reindex请求中使用的版本类型如何。

默认状况下,若是_reindex看到具备路由的文档,则路由将被保留,除非脚本被更改。您能够根据dest请求设置routing来更改:并发

  • keep:将批量请求的每一个匹配项的路由设置为匹配上的路由。默认值。
  • discard:将批量请求的每一个匹配项的路由设置为null
  • =<某些文本>:将批量请求的每一个匹配项的路由设置为`=`以后的文本。

例如,您可使用如下请求将source索引的全部公司名称为cat的文档复制到路由设置为catdest索引。less

POST _reindex
{
  "source": {
    "index": "source",
    "query": {
      "match": {
        "company": "cat"
      }
    }
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}

默认状况下,_reindex批量滚动处理大小为1000.您能够在source元素中指定size字段来更改批量处理大小:dom

POST _reindex
{
  "source": {
    "index": "source",
    "size": 100
  },
  "dest": {
    "index": "dest",
    "routing": "=cat"
  }
}

Reindex也可使用[Ingest Node]功能来指定pipeline, 就像这样:

POST _reindex
{
  "source": {
    "index": "source"
  },
  "dest": {
    "index": "dest",
    "pipeline": "some_ingest_pipeline"
  }
}

从远程重建索引

Reindex支持从远程Elasticsearch群集重建索引:

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "username": "user",
      "password": "pass"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

host参数必须包含schemehostport(例如 https:// otherhost:9200)。用户名和密码参数是可选的,当它们存在时,索引将使用基本认证链接到远程Elasticsearch节点。使用基本认证时请务必使用https,密码将以纯文本格式发送。

必须在elasticsearch.yaml中使用reindex.remote.whitelist属性将远程主机明确列入白名单。它能够设置为容许的远程hostport组合的逗号分隔列表(例如otherhost:9200,another:9200,127.0.10.*:9200,localhost:*)。白名单忽略了scheme ——仅使用主机和端口。

此功能应适用于您可能找到的任何版本的Elasticsearch的远程群集。这应该容许您从任何版本的Elasticsearch升级到当前版本,经过从旧版本的集群从新创建索引。

要启用发送到旧版本Elasticsearch的查询,query参数将直接发送到远程主机,无需验证或修改。

来自远程服务器的从新索引使用默认为最大大小为100mb的堆栈缓冲区。若是远程索引包含很是大的文档,则须要使用较小的批量大小。下面的示例设置很是很是小的批量大小10

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200"
    },
    "index": "source",
    "size": 10,
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

也可使用socket_timeout字段在远程链接上设置socket的读取超时,并使用connect_timeout字段设置链接超时。二者默认为三十秒。此示例将套接字读取超时设置为一分钟,并将链接超时设置为十秒:

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200",
      "socket_timeout": "1m",
      "connect_timeout": "10s"
    },
    "index": "source",
    "query": {
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest"
  }
}

Reindex API

1. URL参数

除了标准参数像pretty以外,“Reindex API”还支持refreshwait_for_completionwait_for_active_shardstimeout以及requests_per_second

发送refresh将在更新请求完成时更新索引中的全部分片。这不一样于 Index API 的refresh参数,只会致使接收到新数据的分片被索引。

若是请求包含wait_for_completion=false,那么Elasticsearch将执行一些预检检查、启动请求、而后返回一个任务,能够与Tasks API一块儿使用来取消或获取任务的状态。Elasticsearch还将以.tasks/task/${taskId}做为文档建立此任务的记录。这是你能够根据是否合适来保留或删除它。当你完成它时,删除它可让Elasticsearch回收它使用的空间。

wait_for_active_shards控制在继续请求以前必须有多少个分片必须处于活动状态,详见这里timeout控制每一个写入请求等待不可用分片变成可用的时间。二者都能正确地在Bulk API中工做。

requests_per_second能够设置为任何正数(1.4,6,1000等),来做为“delete-by-query”每秒请求数的节流阀数字,或者将其设置为-1以禁用限制。节流是在批量批次之间等待,以便它能够操纵滚动超时。等待时间是批次完成的时间与request_per_second * requests_in_the_batch的时间之间的差别。因为分批处理没有被分解成多个批量请求,因此会致使Elasticsearch建立许多请求,而后等待一段时间再开始下一组。这是“突发”而不是“平滑”。默认值为-1。

2. 响应体

JSON响应相似以下:

{
  "took" : 639,
  "updated": 0,
  "created": 123,
  "batches": 1,
  "version_conflicts": 2,
  "retries": {
    "bulk": 0,
    "search": 0
  }
  "throttled_millis": 0,
  "failures" : [ ]
}
  • took:从整个操做的开始到结束的毫秒数。
  • updated:成功更新的文档数。
  • upcreateddated:成功建立的文档数。
  • batches:经过查询更新的滚动响应数量。
  • version_conflicts:根据查询更新时,版本冲突的数量。
  • retries:根据查询更新的重试次数。bluk 是重试的批量操做的数量,search 是重试的搜索操做的数量。
  • throttled_millis:请求休眠的毫秒数,与`requests_per_second`一致。
  • failures:失败的索引数组。若是这是非空的,那么请求由于这些失败而停止。请参阅 conflicts 来如何防止版本冲突停止操做。

3. 配合Task API使用

您可使用Task API获取任何正在运行的重建索引请求的状态:

GET _tasks?detailed=true&actions=*/update/byquery 

响应会相似以下:
{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/reindex",
          "status" : {    //①
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": {
              "bulk": 0,
              "search": 0
            },
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}

① 此对象包含实际状态。它就像是响应json,重要的添加total字段。 total是重建索引但愿执行的操做总数。您能够经过添加的updatedcreateddeleted的字段来估计进度。当它们的总和等于total字段时,请求将完成。

使用任务id能够直接查找任务:

GET /_tasks/taskId:1

这个API的优势是它与wait_for_completion=false集成,以透明地返回已完成任务的状态。若是任务完成而且wait_for_completion=false被设置,那么它将返回resultserror字段。此功能的成本是wait_for_completion=false.tasks/task/${taskId}建立的文档,由你本身删除该文件。

4. 配合取消任务API使用

全部重建索引都能使用Task Cancel API取消:

POST _tasks/task_id:1/_cancel 

可使用上面的任务API找到task_id。

取消应尽快发生,但可能须要几秒钟。上面的任务状态API将继续列出任务,直到它被唤醒取消自身。

5. 重置节流阀

request_per_second的值能够在经过查询删除时使用_rethrottle API更改:

POST _update_by_query/task_id:1/_rethrottle?requests_per_second=-1 

可使用上面的任务API找到task_id。

就像在_update_by_query API中设置它同样,request_per_second能够是-1来禁用限制,或者任何十进制数字,如1.7或12,以节制到该级别。加速查询的会当即生效,可是在完成当前批处理以后,减慢查询的才会生效。这样能够防止滚动超时。

6. 修改字段名

_reindex可用于使用重命名的字段构建索引的副本。假设您建立一个包含以下所示的文档的索引:

POST test/test/1?refresh
{
  "text": "words words",
  "flag": "foo"
} 

可是你不喜欢这个flag名称,而是要用tag替换它。 _reindex能够为您建立其余索引:

POST _reindex
{
  "source": {
    "index": "test"
  },
  "dest": {
    "index": "test2"
  },
  "script": {
    "inline": "ctx._source.tag = ctx._source.remove(\"flag\")"
  }
} 

如今你能够获得新的文件:

GET test2/test/1 

{
  "found": true,
  "_id": "1",
  "_index": "test2",
  "_type": "test",
  "_version": 1,
  "_source": {
    "text": "words words",
    "tag": "foo"
  }
} 

或者你能够经过tag进行任何你想要的搜索。

7. 手动切片

重建索引支持滚动切片,您能够相对轻松地手动并行化处理:

POST _reindex
{
  "source": {
    "index": "twitter",
    "slice": {
      "id": 0,
      "max": 2
    }
  },
  "dest": {
    "index": "new_twitter"
  }
}
POST _reindex
{
  "source": {
    "index": "twitter",
    "slice": {
      "id": 1,
      "max": 2
    }
  },
  "dest": {
    "index": "new_twitter"
  }
} 

您能够经过如下方式验证:

GET _refresh
POST new_twitter/_search?size=0&filter_path=hits.total 

其结果一个合理的total像这样:

{
  "hits": {
    "total": 120
  }
}

8. 自动切片

你还可让重建索引使用切片的_uid来自动并行的滚动切片

POST _reindex?slices=5&refresh
{
  "source": {
    "index": "twitter"
  },
  "dest": {
    "index": "new_twitter"
  }
} 

您能够经过如下方式验证:

POST new_twitter/_search?size=0&filter_path=hits.total 

其结果一个合理的total像这样:

{
  "hits": {
    "total": 120
  }
}

slices添加到_reindex中能够自动执行上述部分中使用的手动过程,建立子请求,这意味着它有一些怪癖:

  • 您能够在Task API中看到这些请求。这些子请求是具备slices请求任务的“子”任务。
  • 获取slices请求任务的状态只包含已完成切片的状态。
  • 这些子请求能够单独寻址,例如取消和重置节流阀。
  • slices的重置节流阀请求将按相应的从新计算未完成的子请求。
  • slices的取消请求将取消每一个子请求。
  • 因为slices的性质,每一个子请求将不会得到彻底均匀的文档部分。全部文件都将被处理,但有些片可能比其余片大。预期更大的切片能够有更均匀的分布。
  • 带有slices请求的request_per_secondsize的参数相应的分配给每一个子请求。结合上述关于分布的不均匀性,您应该得出结论,使用切片大小可能不会致使正确的大小文档为_reindex
  • 每一个子请求都会得到源索引的略有不一样的快照,尽管这些都是大体相同的时间。

9. 挑选切片数量

在这一点上,咱们围绕要使用的slices数量提供了一些建议(好比手动并行化时,切片API中的max参数):

  • 不要使用大的数字,500就能形成至关大的CPU抖动。
  • 从查询性能的角度来看,在源索引中使用分片数量的一些倍数更为有效。
  • 在源索引中使用彻底相同的分片是从查询性能的角度来看效率最高的。
  • 索引性能应在可用资源之间以slices数量线性扩展。
  • 索引或查询性能是否支配该流程取决于许多因素,如正在重建索引的文档和进行reindexing的集群。

10. 索引的平常重建

您可使用_reindexPainless组合来从新每日编制索引,以将新模板应用于现有文档。 假设您有由如下文件组成的索引:

PUT metricbeat-2016.05.30/beat/1?refresh
{"system.cpu.idle.pct": 0.908}

PUT metricbeat-2016.05.31/beat/1?refresh
{"system.cpu.idle.pct": 0.105}

metricbeat-*索引的新模板已经加载到Elaticsearch中,但它仅适用于新建立的索引。Painless可用于从新索引现有文档并应用新模板。

下面的脚本从索引名称中提取日期,并建立一个附带有-1的新索引。来自metricbeat-2016.05.31的全部数据将从新索引到metricbeat-2016.05.31-1

POST _reindex
{
  "source": {
    "index": "metricbeat-*"
  },
  "dest": {
    "index": "metricbeat"
  },
  "script": {
    "lang": "painless",
    "inline": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"
  }
}

来自上一个度量索引的全部文档如今能够在*-1索引中找到。

GET metricbeat-2016.05.30-1/beat/1
GET metricbeat-2016.05.31-1/beat/1

之前的方法也能够与更改字段的名称一块儿使用,以便将现有数据加载到新索引中,但若是须要,还能够重命名字段。

11. 提取索引的随机子集

Reindex可用于提取用于测试的索引的随机子集:

POST _reindex
{
  "size": 10,
  "source": {
    "index": "twitter",
    "query": {
      "function_score" : {
        "query" : { "match_all": {} },
        "random_score" : {}
      }
    },
    "sort": "_score"    //①
  },
  "dest": {
    "index": "random_twitter"
  }
}

① Reindex默认按_doc排序,因此random_score不会有任何效果,除非您将排序重写为_score