Elasticsearch——mget及bulk

1. mget批量查询

1.1 批量查询的好处

若是查询100条数据,一条一条的查的话,就须要发送100条数据,若是进行批量查询的话,只须要发送一次网络请求。java

通常来讲,在进行查询的时候,若是一次性要查询多条数据的话,那么必定要用batch批量操做的api 尽量减小网络开销次数,可能能够将性能提高数倍,甚至数十倍,很是很是之重要node

1.2 语法

一条一条的查询json

GET test_index/test_type/1
GET test_index/test_type/2
返回
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "1",
  "_version": 1,
  "found": true,
  "_source": {
    "test_field": "create id by myself"
  }
}
复制代码

mget批量查询api

GET /_mget
{
  "docs": [
    {
      "_index": "test_index",
    "_type": "test_type",
    "_id": "1"
    },
    {
      "_index": "test_index",
    "_type": "test_type",
    "_id": "2"
    }
  ]
}
返回结果
{
  "docs": [
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "1",
      "_version": 1,
      "found": true,
      "_source": {
        "test_field": "create id by myself"
      }
    },
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "2",
      "_version": 1,
      "found": true,
      "_source": {
        "name": "Tom",
        "age": 12,
        "gender": "M"
      }
    }
  ]
}
复制代码

若是查询的document是一个index下的不一样type种的话数组

GET /test_index/_mget
{
   "docs" : [
      {
         "_type" :  "test_type",
         "_id" :    1
      },
      {
         "_type" :  "test_type",
         "_id" :    2
      }
   ]
}
复制代码

若是查询的数据都在同一个index下的同一个type下,最简单了性能优化

GET /test_index/test_type/_mget
{
   "ids": [1, 2]
}
复制代码

2. bulk批量增删改

2.1 语法

每一个操做须要两个 json 串,语法以下:网络

{"action": {"metadata"}}
{"data"}
复制代码

举例,好比你如今要建立一个文档,放bulk里面,看起来会是这样子的:数据结构

{"index": {"_index": "test_index", "_type", "test_type", "_id": "1"}}
{"test_field1": "test1", "test_field2": "test2"}
复制代码

bulk api 对 json 的语法,有严格的要求,每一个json串不能换行,只能放一行,同时一个json串和一个json串之间,必须有一个换行jvm

单个json串里面有换行的话,会报错:elasticsearch

{
  "error": {
    "root_cause": [
      {
        "type": "json_e_o_f_exception",
        "reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@79a526fa; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@79a526fa; line: 1, column: 3]"
      }
    ],
    "type": "json_e_o_f_exception",
    "reason": "Unexpected end-of-input: expected close marker for Object (start marker at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@79a526fa; line: 1, column: 1])\n at [Source: org.elasticsearch.transport.netty4.ByteBufStreamInput@79a526fa; line: 1, column: 3]"
  },
  "status": 500
}
复制代码

2.2 可执行的操做

  1. delete:删除一个文档,只要1个json串就能够了
  2. create:PUT /index/type/id/_create,强制建立
  3. index:普通的put操做,能够是建立文档,也能够是全量替换文档
  4. update:执行的 partial update 操做

2.3 示例

POST /_bulk
{"delete": {"_index": "test_index", "_type": "test_type", "_id": "2"}}
{"create": {"_index": "test_index", "_type": "test_type", "_id":6}}
{"test_field": "create id 6"}
{"index": {"_index": "test_index", "_type": "test_type", "_id": 7}}
{"test_field": "put id 7"}
{"update": {"_index": "test_index", "_type": "test_type", "_id": 1}}
{"doc": {"test_field": "update id 1"}}
返回结果:
{
  "took": 62,
  "errors": false,
  "items": [
    {
      "delete": {
        "found": true,
        "_index": "test_index",
        "_type": "test_type",
        "_id": "2",
        "_version": 2,
        "result": "deleted",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    },
    {
      "create": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "6",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": true,
        "status": 201
      }
    },
    {
      "index": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "7",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": true,
        "status": 201
      }
    },
    {
      "update": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "1",
        "_version": 2,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    }
  ]
}
复制代码

bulk操做中,任意一个操做失败,是不会影响其余的操做的,可是在返回结果里,会告诉你异常日志

上面咱们已经create了 _id 为6的数据,咱们再create一次,确定会报错,执行如下语句:

POST /_bulk
{"delete": {"_index": "test_index", "_type": "test_type", "_id": "2"}}
{"create": {"_index": "test_index", "_type": "test_type", "_id":6}}
{"test_field": "create id 6"}
{"index": {"_index": "test_index", "_type": "test_type", "_id": 9}}
{"test_field": "put id 9"}
{"update": {"_index": "test_index", "_type": "test_type", "_id": 1}}
{"doc": {"test_field": "update id 1"}}
返回结果:
{
  "took": 10,
  "errors": true,
  "items": [
    {
      "delete": {
        "found": false,
        "_index": "test_index",
        "_type": "test_type",
        "_id": "2",
        "_version": 1,
        "result": "not_found",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 404
      }
    },
    {
      "create": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "6",
        "status": 409,
        "error": {
          "type": "version_conflict_engine_exception",
          "reason": "[test_type][6]: version conflict, document already exists (current version [1])",
          "index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
          "shard": "2",
          "index": "test_index"
        }
      }
    },
    {
      "index": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "9",
        "_version": 2,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": false,
        "status": 200
      }
    },
    {
      "update": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "1",
        "_version": 2,
        "result": "noop",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    }
  ]
}
复制代码

能够看到返回结果中 create 报错

若是修改的是同一个index, 同一个index和同一个type,下面的语法也能够:

POST /test_index/_bulk
{ "delete": { "_type": "test_type", "_id": "3" }} 
{ "create": { "_type": "test_type", "_id": "12" }}
{ "test_field":    "test12" }
{ "index":  { "_type": "test_type" }}
{ "test_field":    "auto-generate id test" }
{ "index":  { "_type": "test_type", "_id": "2" }}
{ "test_field":    "replaced test2" }
{ "update": { "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

POST /test_index/test_type/_bulk
{ "delete": { "_id": "3" }} 
{ "create": { "_id": "12" }}
{ "test_field":    "test12" }
{ "index":  { }}
{ "test_field":    "auto-generate id test" }
{ "index":  { "_id": "2" }}
{ "test_field":    "replaced test2" }
{ "update": { "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }
复制代码

2.4 bulk size 最佳大小

bulk request会加载到内存里,若是太大的话,性能反而会降低,所以须要反复尝试一个最佳的bulk size。通常从1000~5000条数据开始,尝试逐渐增长。另外,若是看大小的话,最好是在5~15MB之间。

2.5 _bulk api的奇特json格式与底层性能优化关系

bulk api奇特的json格式

{"action": {"meta"}}
{"data"}
{"action": {"meta"}}
{"data"}
复制代码

为何不是下面这种格式

[{
  "action": {
 
  },
  "data": {

  }
}]
复制代码
  1. bulk中的每一个操做均可能要转发到不一样的node的shard去执行

  2. 若是采用比较良好的json数组格式

    容许任意的换行,整个可读性很是棒,读起来很爽,es拿到那种标准格式的json串之后,要按照下述流程去进行处理

    • 将json数组解析为JSONArray对象,这个时候,整个数据,就会在内存中出现一份如出一辙的拷贝,一份数据是json文本,一份数据是JSONArray对象
    • 解析json数组里的每一个json,对每一个请求中的document进行路由
    • 为路由到同一个shard上的多个请求,建立一个请求数组
    • 将这个请求数组序列化
    • 将序列化后的请求数组发送到对应的节点上去
  3. 耗费更多内存,更多的jvm gc开销

    咱们以前提到过bulk size最佳大小的那个问题,通常建议说在几千条那样,而后大小在10MB左右,因此说,可怕的事情来了。假设说如今100个bulk请求发送到了一个节点上去,而后每一个请求是10MB,100个请求,就是1000MB = 1GB,而后每一个请求的json都copy一份为jsonarray对象,此时内存中的占用就会翻倍,就会占用2GB的内存,甚至还不止。由于弄成jsonarray以后,还可能会多搞一些其余的数据结构,2GB+的内存占用。

    占用更多的内存可能就会积压其余请求的内存使用量,好比说最重要的搜索请求,分析请求,等等,此时就可能会致使其余请求的性能急速降低 另外的话,占用内存更多,就会致使java虚拟机的垃圾回收次数更多,跟频繁,每次要回收的垃圾对象更多,耗费的时间更多,致使es的java虚拟机中止工做线程的时间更多

  4. 如今的奇特格式

    {"action": {"meta"}}
    {"data"}
    {"action": {"meta"}}
    {"data"}
    复制代码
    • 不用将其转换为json对象,不会出现内存中的相同数据的拷贝,直接按照换行符切割json
    • 对每两个一组的json,读取meta,进行document路由
    • 直接将对应的json发送到node上去
  5. 最大的优点在于,不须要将json数组解析为一个JSONArray对象,造成一份大数据的拷贝,浪费内存空间,尽量地保证性能

相关文章
相关标签/搜索