ES & Filebeat 使用 Pipeline 处理日志中的 @timestamp

使用 Pipeline 处理日志中的 @timestamp

Filebeat 收集的日志发送到 ElasticSearch 后,会默认添加一个 @timestamp 字段做为时间戳用于检索,而日志中的信息会所有添加到 message 字段中,可是这个时间是 Filebeat 采集日志的时间,不是日志生成的实际时间,因此为了便于检索日志,须要将 @timestamp 替换为 message 字段中的时间。json

这里使用的是 elasticseatch 提供的 pipeline 来进行替换。首先日志格式以下:bash

20-09-22 07:01:25.109 INFO - {"traceId":"65e97e88a61d7cd4558b8f3a203458fd"}
20-09-22 06:51:12.117 INFO - {"traceId":"4e0542c994919065f71536872ccb9677"}

在 Kibana 中的 Devtools 界面中编写以下 pipeline 并执行:elasticsearch

PUT _ingest/pipeline/test-news-server-online      # test-news-server-online 为流水线的名称
{
  "description": "test-news-server-online",            # 对 pipeline 进行描述
  "processors": [
    {
      "grok": {                                                              # 使用 grok 对日志内容进行提取
        "field": "message",					       # 选择要提取信息的字段
        "patterns": [
          "%{TIMESTAMP_ISO8601:logatime}"          # 使用 TIMESTAMP_ISO8601 的标准匹配时间,将匹配的值赋值给新增的字段 logatime
        ],
        "ignore_failure": true		                       # 若是日志中有不存在时间戳的行,能够添加这个配置来忽略匹配错误产生的 error 信息
      },
      "date": {					                       # 使用 data 时间戳插件来格式化时间输出,替代默认的 @timestamp
        "field": "logatime",				               # 指定使用新增的 logatime 字段
        "timezone": "Asia/Shanghai", 	                       # 指定输出时间的时区,不指定的话可能会比正确的时间晚 8 个小时
        "formats": [
          "yy-MM-dd HH:mm:ss.SSS"	              # 指定时间输出的格式
        ],
        "ignore_failure": true		                      # 若是遇到错误则忽略
      }
    }
  ]
}

pipeline 编写完成后,在 Devtools 中可使用以下命令进行查询:插件

GET _ingest/pipeline/test-news-server-online

在 filebeat 中引用这个 pipeline:日志

filebeat.idle_timeout: 2s
filebeat.inputs:
- backoff: 1s
  backoff_factor: 2
  close_inactive: 1h
  enabled: true
  encoding: plain
  harvester_buffer_size: 262144
  max_backoff: 10s
  max_bytes: 10485760
  paths:
  - /opt/trace.log
  scan_frequency: 10s
  tail_lines: true
  type: log
  fields:
    type: test-news-server
filebeat.name: filebeat-shiper
filebeat.spool_zie: 50000
output.elasticsearch:
  bulk_max_size: 8192
  hosts:
  - 10.11.16.211:30187
  - 10.11.16.212:30187
  - 10.11.16.213:30187
  - 10.11.16.214:30187
  - 10.11.16.215:30187
  index: test-news-timestamp
  workers: 4
  pipeline: "test-news-server-online"				# 在此处指定 pipeline 的名称
processors:
- drop_fields:
    fields:
    - agent.ephemeral_id
    - agent.hostname
    - agent.id
    - agent.type
    - agent.version
    - ecs.version
    - input.type
    - log.offset
    - version
- decode_json_fields:
    fields:
    - message
    max_depth: 1
    overwrite_keys: true
setup.ilm.enabled: false
setup.template.name: test-news-timestamp-reverse
setup.template.pattern: test-news-timestamp-reverse-*

运行 filebeat,在 kibana 中查看日志信息,能够看到收集的日志信息中新增了 logatime 字段,@timestamp 字段的时间也与 logatime 字段保持了一致。code

若是在 filebeat 运行的日志中发现了以下报错信息,有多是日志中存在不含有时间戳的行(通常是因为日志被截断致使的,能够参考处理多行日志的文档):orm

ERROR   pipeline/output.go:121  Failed to publish events: temporary bulk send failure

若是不但愿将 logatime 字段在日志中展现的话,能够将 pipeline 修改成以下内容:server

PUT _ingest/pipeline/test-news-server-online
{
  "description": "test-news-server-online",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{TIMESTAMP_ISO8601:logatime}"
        ],
        "ignore_failure": true
      },
      "date": {
        "field": "logatime",
        "timezone": "Asia/Shanghai", 
        "formats": [
          "yy-MM-dd HH:mm:ss"
        ],
        "ignore_failure": true
      },
      "remove": {
        "field": "logatime"
      }
    }
  ]
}

若是但愿将 logatime 的值同时赋值给其余的新增字段,例如 realtime ,pipeline 修改以下:ip

PUT _ingest/pipeline/test-news-server-online
{
  "description": "test-news-server-online",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{TIMESTAMP_ISO8601:logatime}"
        ],
        "ignore_failure": true
      },
      "date": {
        "field": "logatime",
        "timezone": "Asia/Shanghai", 
        "target_field": "realtime"
        "formats": [
          "yy-MM-dd HH:mm:ss"
        ],
        "ignore_failure": true
      },
      "remove": {
        "field": "logatime"
      }
    }
  ]
}

target_field 字段用于将一个值赋值给指定的字段,默认是给 @timestamp ,若是未提供该选项,则会默认更新 @timestamp 字段rem