在 Elasticsearch 中对时序型数据进行查询和聚合
做者 Sonja Krause-Harderhtml
最初,Elasticsearch 是一个搜索引擎,将搜索索引保存在 Lucene 数据库中。然而,Elasticsearch 从那时起已经历了巨大演变,现已成为一个高性能、集群式的可扩展数据存储。尽管从索引格式上仍能看出它的渊源,但 Elasticsearch 如今已被各种用户普遍用于各类用途。docker
它的用途之一即是存储、处理和检索时序型数据。时序型数据的特色是每一个数据点都有一个相关的准确时间戳。最多见的情形是,一个数据点表明在具体时间点获得的某种测量结果,能够是股价、科学观察值或者服务器负载。数据库
尽管已有多个专门处理时序型数据的数据库实施方案,可是在存储和查询时序型数据时仍没有通用格式,并且理论上全部数据库引擎都可用来处理时序型数据。编程
从抽象意义上讲,一个时序型数据条目包括下列内容:ubuntu
在服务器监测用例中,确定会有一个键值对用以指明时序型数据属于哪一个主机,但还能够添加任何其余信息,并且添加的这些信息以后可用来请求只获取有关特定主机集合的指标。举一些例子,运行特定服务的主机,仅属于生产环境的主机,或者在特定云服务提供商平台上运行的实例。服务器
为了更接地气,咱们使用 Metricbeat 数据做为示例来看一下如何使用 Elasticsearch 查询来从数据中筛选出特定的时序型信息。网络
每一个 Metricbeat 文档都包括下列信息:less
真正的时序型数据elasticsearch
system
模块的 cpu
指标集,请参见指标集文档。文档中所包含的有关指标自己的元数据。Metricbeat 使用 ECS 字段 event.module
和 event.dataset
来指定建立文档时使用的是哪一个 Metricbeat 模块,以及文档中包含哪一个指标集。ionic
实例的相关元数据,其为实体主机、虚拟机,仍是诸如 Kubernetes Pod 或 Docker 容器等小型实体
举个例子,system.cpu
指标集中的 Metricbeat 文档就是下面这个样子。_source
对象的内联注释表示您能够从哪里获取有关该字段的更多信息:
ECS 文档
Metricbeat 文档
system.cpu 指标集文档
注意:为便于理解,文档在 JSON 文件中添加了 # 注释。
{ "_index" : "metricbeat-8.0.0-2019.08.12-000001", "_type" : "_doc", "_id" : "vWm5hWwB6vlM0zxdF3Q5", "_score" :0.0, "_source" : { "@timestamp" :"2019-08-12T12:06:34.572Z", "ecs" : { # ECS metadata "version" :"1.0.1" }, "host" : { # ECS metadata "name" : "noether", "hostname" : "noether", "architecture" : "x86_64", "os" : { "kernel" :"4.15.0-55-generic", "codename" : "bionic", "platform" : "ubuntu", "version" :"18.04.3 LTS (Bionic Beaver)", "family" : "debian", "name" :"Ubuntu" }, "id" :"4e3eb308e7f24789b4ee0b6b873e5414", "containerized" : false }, "agent" : { # ECS metadata "ephemeral_id" :"7c725f8a-ac03-4f2d-a40c-3695a3591699", "hostname" : "noether", "id" : "e8839acc-7f5e-40be-a3ab-1cc891bcb3ce", "version" :"8.0.0", "type" : "metricbeat" }, "event" : { # ECS metadata "dataset" : "system.cpu", "module" : "system", "duration" :725494 }, "metricset" : { # metricbeat metadata "name" : "cpu" }, "service" : { # metricbeat metadata "type" : "system" }, "system" : { # metricbeat time series data "cpu" : { "softirq" : { "pct" :0.0112 }, "steal" : { "pct" :0 }, "cores" :8, "irq" : { "pct" :0 }, "idle" : { "pct" :6.9141 }, "nice" : { "pct" :0 }, "user" : { "pct" :0.7672 }, "system" : { "pct" :0.3024 }, "iowait" : { "pct" :0.0051 }, "total" : { "pct" :1.0808 } } } } }
总结一下,在 Metricbeat 文档中,时序型数据和元数据混合在一块儿,因此您需拥有文档格式的具体相关知识才能准确检索出所需的内容。
然而,若是您想处理、分析或可视化时序型数据,这些数据一般应该为相似表格的形式,以下所示:
<series name> <timestamp> <value> <key-value pairs> system.cpu.user.pct 1565610800000 0.843 host.name=”noether” system.cpu.user.pct 1565610800000 0.951 host.name=”hilbert” system.cpu.user.pct 1565610810000 0.865 host.name=”noether” system.cpu.user.pct 1565610810000 0.793 host.name=”hilbert” system.cpu.user.pct 1565610820000 0.802 host.name=”noether” system.cpu.user.pct 1565610820000 0.679 host.name=”hilbert”
Elasticsearch 查询能帮助您经过编程方式以极其接近此类表格的形式检索时序性数据,下面的例子为您展现了操做过程。如想亲自体验查询过程,您须要一个 Elasticsearch 实例,还须要安装并运行 Metricbeat 以让其为 system.cpu
和 system.network
指标集传输数据。如需 Metricbeat 的简短介绍,请参看ES网站中的入门文档。
您能够从 Kibana 中的开发工具 (Dev Tools) 控制台运行全部查询。如以前并未用过,您能够查看ES网站中 Kibana 控制台文档简单了解一下。请注意,您须要更改示例查询中的主机名。
咱们假设您已按照默认配置将 Metricbeat 设置完毕。这即表示它天天会建立一个索引,并且这些索引的命名规则为“metricbeat-版本号-日期-计数”,例如 metricbeat-7.3.0-2019.08.06-000009
。要一次性查询全部这些索引,咱们须要使用通配符:
示例查询:
GET metricbeat-*/_search
示例响应以下:
{ "took" :2, "timed_out" : false, "_shards" : { "total" :1, "successful" :1, "skipped" :0, "failed" :0 }, "hits" : { "total" : { "value" :10000, "relation" : "gte" }, "max_score" :1.0, "hits" : [...] } }
很明显,该查询超出了 Elasticsearch 在单次查询中可返回文档的数量限制。这里省略了真实命中信息,但您可能但愿滚动查询结果并与上面已加注释的文档进行比较。
根据所监测基础设施的规模,虽然可能会有海量的 Metricbeat 文档,但您不多须要从最开始(记录)的时间点查询时序型数据,因此咱们开始时用一个日期范围,在本案例中是过去 5 分钟:
示例查询:
GET metricbeat-*/_search { "query": { "range": { "@timestamp": { "gte": "now-5m" } } } }
示例响应以下:
{ "took" :4, "timed_out" : false, "_shards" : { "total" :1, "successful" :1, "skipped" :0, "failed" :0 }, "hits" : { "total" : { "value" :30, "relation" : "eq" }, "max_score" :0.0, "hits" : [...] } }
这个规模管理起来要容易得多。然而,运行此查询时所基于的系统仅有一个主机向其报告,因此在生产环境中,命中数仍然会很高。
如要检索特定主机的全部 CPU 数据,进行 Elasticsearch 查询时第一步原生尝试多是针对 host.name 和指标集 system.cpu
添加筛选:
示例查询:
GET metricbeat-*/_search { "query": { "bool": { "filter": [ { "range": { "@timestamp": { "gte": "now-5m" } } }, { "bool": { "should": [ { "match_phrase": { "host.name": "noether" } }, { "match_phrase": { "event.dataset": "system.cpu" } } ] } } ] } } }
示例响应以下:
{ "took" :8, "timed_out" : false, "_shards" : { "total" :1, "successful" :1, "skipped" :0, "failed" :0 }, "hits" : { "total" : { "value" :30, "relation" : "eq" }, "max_score" :0.0, "hits" : [...] } }
此查询仍会返回大量文档,全部都包含 Metricbeat 发送的有关 system.cpu
指标集的完整数据。这个结果的用处并不大,缘由以下。
首先,咱们须要检索整个时间范围内的全部文档。一旦咱们达到所配置的上限,Elasticsearch 将不会一次性返回这些结果;它会尝试对文档排序,这对咱们的查询根本不适用;同时 Elasticsearch 返回结果时不会按时间戳进行排序。
第二,咱们仅对每一个文档中的一小部份内容感兴趣:时间戳、几个指标值,可能还有一些其余元数据字段。从 Elasticsearch 中返回所有的 _source
,而后再从查询结果中挑选数据,这种方法的效率很低。
解决这种问题的方法之一就是利用 Elasticsearch _聚合_。
**
**
咱们首先看一下日期直方图。日期直方图聚合将会为每一个时间间隔返回一个值。返回的桶已按时间进行排序,并且用户能够指定间隔(又称桶大小)来匹配数据。在这个示例中,咱们将间隔时长选为 10 秒,由于 Metricbeat 默认每 10 秒从系统模块发送一次数据。顶层的 size: 0
参数表示咱们对实际命中结果再也不感兴趣,而只对聚合感兴趣,因此不会返回任何文档。
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "fixed_interval":"10s" } } } }
示例响应以下:
{ ..., "hits" : { "total" : { "value" :30, "relation" : "eq" }, "max_score" : null, "hits" : [ ] }, "aggregations" : { "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:03:20.000Z", "key" :1565615000000, "doc_count" :1 }, { "key_as_string" :"2019-08-12T13:03:30.000Z", "key" :1565615010000, "doc_count" :1 }, { "key_as_string" :"2019-08-12T13:03:40.000Z", "key" :1565615020000, "doc_count" :1 }, ... ] } } }
对于每一个桶,这会在 key
中返回时间戳,还会返回颇有帮助的 key_as_string
(其中包含用户可读的日期时间字符串),以及桶中包含的文档数量。
这个案例的 doc_count
是 1,由于桶大小与 Metricbeat 的报告期间相匹配。若是没有其余信息,这一结果并没什么用处,因此为了看到真正的指标值,咱们须要再添加一个聚合。在这一步,咱们须要决定聚合类型——对数值而言,avg
、min
和 max
都是不错的选择——但因为咱们每一个桶只有一个文档,因此不管选择哪一个都没什么影响。下面的示例便很好地展现了这一点,由于它针对指标 system.cpu.user.pct
在桶的 10 秒期间内的值返回了 avg
、min
和 max
聚合:
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "fixed_interval":"10s" }, "aggregations": { "myActualCpuUserMax": { "max": { "field": "system.cpu.user.pct" } }, "myActualCpuUserAvg": { "avg": { "field": "system.cpu.user.pct" } }, "myActualCpuUserMin": { "min": { "field": "system.cpu.user.pct" } } } } } }
示例响应以下:
{ ..., "hits" : {...}, "aggregations" : { "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:12:40.000Z", "key" :1565615560000, "doc_count" :1, "myActualCpuUserMin" : { "value" :1.002 }, "myActualCpuUserAvg" : { "value" :1.002 }, "myActualCpuUserMax" : { "value" :1.002 } }, { "key_as_string" :"2019-08-12T13:12:50.000Z", "key" :1565615570000, "doc_count" :1, "myActualCpuUserMin" : { "value" :0.866 }, "myActualCpuUserAvg" : { "value" :0.866 }, "myActualCpuUserMax" : { "value" :0.866 } }, ... ] } } }
您能够看到,在每一个桶中 myActualCpuUserMin
、myActualCpuUserAvg
和 myActualCpuUserMax
是同样的,因此若是须要检索按固定间隔报告的时序型数据的原始值,您可使用日期直方图来实现。
然而,您多数状况下不会对每一个单独的数据点感兴趣,若是每隔几秒便取一个测量值的话,则更是如此。出于不少目的,实际上更好的一种方法是拥有更粗粒度的数据:举个例子,若是一个可视化仅有限定数量的像素来展现时序型数据的变化,那么其在呈现时便会舍弃较细粒度的数据。
咱们一般会缩小时序型数据的采样,直至其粒度与任何后续处理步骤的要求相符。在缩小采样的过程当中,给定时间段内的多个数据点会缩减为一个点。在咱们的服务器监测示例中,数据的测量频率为每 10 秒一次,但多数状况下,一分钟内全部值的平均值应该就能够。偶尔状况下,缩小采样的过程与日期直方图聚合的过程如出一辙,前提是日期直方图聚合过程为每一个桶找到多于一份文档,而且应用了正确的嵌套聚合。
下面的示例展现了在完整的 1 分钟桶内采用嵌套 avg
、min
和 max
聚合的日期直方图结果,给出了缩小采样的第一个示例。因为使用了 calendar_interval
,而未使用 fixed_interval
,因此此参数会将桶边界调整为整分钟。
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myDownsampledCpuUserMax": { "max": { "field": "system.cpu.user.pct" } }, "myDownsampledCpuUserAvg": { "avg": { "field": "system.cpu.user.pct" } }, "myDownsampledCpuUserMin": { "min": { "field": "system.cpu.user.pct" } } } } } }
示例响应以下:
{ ..., "hits" : {...}, "aggregations" : { "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:27:00.000Z", "key" :1565616420000, "doc_count" :4, "myDownsampledCpuUserMax" : { "value" :0.927 }, "myDownsampledCpuUserMin" : { "value" :0.6980000000000001 }, "myDownsampledCpuUserAvg" : { "value" :0.8512500000000001 } }, { "key_as_string" :"2019-08-12T13:28:00.000Z", "key" :1565616480000, "doc_count" :6, "myDownsampledCpuUserMax" : { "value" :0.838 }, "myDownsampledCpuUserMin" : { "value" :0.5670000000000001 }, "myDownsampledCpuUserAvg" : { "value" :0.7040000000000001 } }, ... ] } } }
如您所见,myActualCpuUserMin
、myActualCpuUserAvg
和 myActualCpuUserMax
如今的值不一样,具体取决于所用的聚合。
缩小采样时采用哪一种方法与指标息息相关。对于 CPU 百分比,一分钟内的 avg
聚合就能够,针对诸如队列时长和系统负载等指标,max
聚合则可能更为合适。
如今,还可使用 Elasticsearch 来执行一些简单的代数运算,并计算原始数据中没有的时序型数据。假设咱们针对 CPU 进行 avg
聚合,则咱们的示例可加以优化以返回用户 CPU、系统 CPU,以及用户和系统总和除以 CPU 核数,命令以下:
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myDownsampledCpuUserAvg": { "avg": { "field": "system.cpu.user.pct" } }, "myDownsampledCpuSystemAvg": { "avg": { "field": "system.cpu.system.pct" } }, "myCpuCoresMax": { "max": { "field": "system.cpu.cores" } }, "myCalculatedCpu": { "bucket_script": { "buckets_path": { "user": "myDownsampledCpuUserAvg", "system": "myDownsampledCpuSystemAvg", "cores": "myCpuCoresMax" }, "script": { "source": "(params.user + params.system) / params.cores", "lang": "painless" } } } } } } }
示例响应以下:
{ ..., "hits" : {...}, "aggregations" : { "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:32:00.000Z", "key" :1565616720000, "doc_count" :2, "myDownsampledCpuSystemAvg" : { "value" :0.344 }, "myCpuCoresMax" : { "value" :8.0 }, "myDownsampledCpuUserAvg" : { "value" :0.8860000000000001 }, "myCalculatedCpu" : { "value" :0.15375 } }, { "key_as_string" :"2019-08-12T13:33:00.000Z", "key" :1565616780000, "doc_count" :6, "myDownsampledCpuSystemAvg" : { "value" :0.33416666666666667 }, "myCpuCoresMax" : { "value" :8.0 }, "myDownsampledCpuUserAvg" : { "value" :0.8895 }, "myCalculatedCpu" : { "value" :0.15295833333333334 } }, ... ] } } }
**
**
关于 Elasticsearch 聚合在处理时序型数据时的巨大做用,咱们还能够举一个更加详细的关于 system.network
指标集的示例。system.network
指标集文档中的相关部分以下所示:
{ ... "system": { "network": { "in": { "bytes":37904869172, "dropped":32, "errors":0, "packets":32143403 }, "name": "wlp4s0", "out": { "bytes":6299331926, "dropped":0, "errors":0, "packets":13362703 } } } ... }
Metricbeat 会为系统中存在的每一个网络接口发送一份文档。这些文档的时间戳相同,可是 system.network.name
字段的值不一样,每一个网络接口都有一个值。
任何进一步的聚合都须要按照接口完成,因此咱们针对 system.network.name
字段将上个例子中顶层的日期直方图聚合更改成多值聚合。
请注意如要此方法奏效,须要将所聚合的字段映射为关键字字段。若是您使用 Metricbeat 提供的默认索引模板,则此映射可能已为您设置完毕。如未设置,Metricbeat 模板文档页面就您须要完成的操做给出了简短描述。
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myNetworkInterfaces": { "terms": { "field": "system.network.name", "size":50 }, "aggs": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" } } } } } }
示例响应以下:
{ ..., "hits" : {...}, "aggregations" : { "myNetworkInterfaces" : { "doc_count_error_upper_bound" :0, "sum_other_doc_count" :0, "buckets" : [ { "key" : "docker0", "doc_count" :29, "myDateHistogram" : { "buckets" : [...] } }, { "key" : "enp0s31f6", "doc_count" :29, "myDateHistogram" : { "buckets" : [...] } }, { "key" : "lo", "doc_count" :29, "myDateHistogram" : { "buckets" : [...] } }, { "key" : "wlp61s0", "doc_count" :29, "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:39:00.000Z", "key" :1565617140000, "doc_count" :1 }, { "key_as_string" :"2019-08-12T13:40:00.000Z", "key" :1565617200000, "doc_count" :6 }, { "key_as_string" :"2019-08-12T13:41:00.000Z", "key" :1565617260000, "doc_count" :6 }, { "key_as_string" :"2019-08-12T13:42:00.000Z", "key" :1565617320000, "doc_count" :6 }, { "key_as_string" :"2019-08-12T13:43:00.000Z", "key" :1565617380000, "doc_count" :6 }, { "key_as_string" :"2019-08-12T13:44:00.000Z", "key" :1565617440000, "doc_count" :4 } ] } }, ... ] } } }
和 CPU 示例同样,不使用嵌套聚合的话,日期直方图聚合仅会返回用处不太大的 doc_count
。
字节字段包含单调递增值。这些字段的值包括自机器上次启动以来所发送或接收的字节数,因此每次测量时该值都会增大。在这个案例中,正确的嵌套聚合是 max
,因此缩小采样后的值要包括最高值,也就是桶间隔期间所获得的最新测量值。
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myNetworkInterfaces": { "terms": { "field": "system.network.name", "size":50 }, "aggs": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myNetworkInBytesMax": { "max": { "field": "system.network.in.bytes" } }, "myNetworkOutBytesMax": { "max": { "field": "system.network.out.bytes" } } } } } } } }
示例响应以下:
{ ..., "hits" : {...}, "aggregations" : { "myNetworkInterfaces" : { "doc_count_error_upper_bound" :0, "sum_other_doc_count" :0, "buckets" : [ { "key" : "docker0", ... }, { "key" : "enp0s31f6", ... }, { "key" : "lo", ... }, { "key" : "wlp61s0", "doc_count" :30, "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:50:00.000Z", "key" :1565617800000, "doc_count" :2, "myNetworkInBytesMax" : { "value" :2.991659837E9 }, "myNetworkOutBytesMax" : { "value" :5.46578365E8 } }, { "key_as_string" :"2019-08-12T13:51:00.000Z", "key" :1565617860000, "doc_count" :6, "myNetworkInBytesMax" : { "value" :2.992027006E9 }, "myNetworkOutBytesMax" : { "value" :5.46791988E8 }, "myNetworkInBytesPerSecond" : { "value" :367169.0, "normalized_value" :6119.483333333334 }, "myNetworkoutBytesPerSecond" : { "value" :213623.0, "normalized_value" :3560.383333333333 } }, ... ] } }, ... ] } } }
如要从单调递增计数中得到每秒的字节速率,须要使用导数聚合。当此聚合收到所传递的可选参数 unit
时,会在 normalized_value
字段中返回理想的每单位的数值:
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myNetworkInterfaces": { "terms": { "field": "system.network.name", "size":50 }, "aggs": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myNetworkInBytesMax": { "max": { "field": "system.network.in.bytes" } }, "myNetworkInBytesPerSecond": { "derivative": { "buckets_path": "myNetworkInBytesMax", "unit":"1s" } }, "myNetworkOutBytesMax": { "max": { "field": "system.network.out.bytes" } }, "myNetworkoutBytesPerSecond": { "derivative": { "buckets_path": "myNetworkOutBytesMax", "unit":"1s" } } } } } } } }
示例响应以下:
{ ..., "hits" : {...}, "aggregations" : { "myNetworkInterfaces" : { "doc_count_error_upper_bound" :0, "sum_other_doc_count" :0, "buckets" : [ { "key" : "docker0", ... }, { "key" : "enp0s31f6", ... }, { "key" : "lo", ... }, { "key" : "wlp61s0", "doc_count" :30, "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T14:07:00.000Z", "key" :1565618820000, "doc_count" :4, "myNetworkInBytesMax" : { "value" :3.030494669E9 }, "myNetworkOutBytesMax" : { "value" :5.56084749E8 } }, { "key_as_string" :"2019-08-12T14:08:00.000Z", "key" :1565618880000, "doc_count" :6, "myNetworkInBytesMax" : { "value" :3.033793744E9 }, "myNetworkOutBytesMax" : { "value" :5.56323416E8 }, "myNetworkInBytesPerSecond" : { "value" :3299075.0, "normalized_value" :54984.583333333336 }, "myNetworkoutBytesPerSecond" : { "value" :238667.0, "normalized_value" :3977.7833333333333 } }, { "key_as_string" :"2019-08-12T14:09:00.000Z", "key" :1565618940000, "doc_count" :6, "myNetworkInBytesMax" : { "value" :3.037045046E9 }, "myNetworkOutBytesMax" : { "value" :5.56566282E8 }, "myNetworkInBytesPerSecond" : { "value" :3251302.0, "normalized_value" :54188.36666666667 }, "myNetworkoutBytesPerSecond" : { "value" :242866.0, "normalized_value" :4047.766666666667 } }, ... ] } }, ... ] } } }
您可在本身的集群上尝试全部命令,若是您尚未集群,能够免费试用基于 Elastic Cloud 的 Elasticsearch Service 并快速部署一个集群,您还能够下载 Elastic Stack 的默认分发包。使用 Metricbeat 开始从您的系统发送数据吧,畅享查询的乐趣!