原文地址:Logstash 基础入门
博客地址:http://www.extlight.comhtml
Logstash 是一个开源的数据收集引擎,它具备备实时数据传输能力。它能够统一过滤来自不一样源的数据,并按照开发者的制定的规范输出到目的地。java
顾名思义,Logstash 收集数据对象就是日志文件。因为日志文件来源多(如:系统日志、服务器 日志等),且内容杂乱,不便于人类进行观察。所以,咱们可使用 Logstash 对日志文件进行收集和统一过滤,变成可读性高的内容,方便开发者或运维人员观察,从而有效的分析系统/项目运行的性能,作好监控和预警的准备工做等。正则表达式
Logstash 依赖 JDK1.8 ,所以在安装以前请确保机器已经安装和配置好 JDK1.8。apache
Logstash 下载api
tar -zxvf logstash-5.6.3.tar.gz -C /usr cd logstash-5.6.3
Logstash 经过管道进行运做,管道有两个必需的元素,输入和输出,还有一个可选的元素,过滤器。浏览器
输入插件从数据源获取数据,过滤器插件根据用户指定的数据格式修改数据,输出插件则将数据写入到目的地。以下图:tomcat
咱们先来一个简单的案例:ruby
bin/logstash -e 'input { stdin { } } output { stdout {} }'
启动 Logstash 后,再键入 Hello World,结果以下:服务器
[root@localhost logstash-5.6.3]# bin/logstash -e 'input { stdin { } } output { stdout {} }' Sending Logstash's logs to /usr/logstash-5.6.3/logs which is now configured via log4j2.properties [2017-10-27T00:17:43,438][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/logstash-5.6.3/modules/fb_apache/configuration"} [2017-10-27T00:17:43,440][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/logstash-5.6.3/modules/netflow/configuration"} [2017-10-27T00:17:43,701][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>125} [2017-10-27T00:17:43,744][INFO ][logstash.pipeline ] Pipeline main started The stdin plugin is now waiting for input: [2017-10-27T00:17:43,805][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} Hello World 2017-10-27T07:17:51.034Z localhost.localdomain Hello World
Hello World(输入)通过 Logstash 管道(过滤)变成:2017-10-27T07:17:51.034Z localhost.localdomain Hello World (输出)。app
在生产环境中,Logstash 的管道要复杂不少,可能须要配置多个输入、过滤器和输出插件。
所以,须要一个配置文件管理输入、过滤器和输出相关的配置。配置文件内容格式以下:
# 输入 input { ... } # 过滤器 filter { ... } # 输出 output { ... }
根据本身的需求在对应的位置配置 输入插件、过滤器插件、输出插件 和 编码解码插件 便可。
在使用插件以前,咱们先了解一个概念:事件。
Logstash 每读取一次数据的行为叫作事件。
在 Logstach 目录中建立一个配置文件,名为 logstash.conf(名字任意)。
输入插件容许一个特定的事件源能够读取到 Logstash 管道中,配置在 input {} 中,且能够设置多个。
修改配置文件:
input { # 从文件读取日志信息 file { path => "/var/log/messages" type => "system" start_position => "beginning" } } # filter { # # } output { # 标准输出 stdout { codec => rubydebug } }
其中,messages 为系统日志。
保存文件。键入:
bin/logstash -f logstash.conf
在控制台结果以下:
{ "@version" => "1", "host" => "localhost.localdomain", "path" => "/var/log/messages", "@timestamp" => 2017-10-29T07:30:02.601Z, "message" => "Oct 29 00:30:01 localhost systemd: Starting Session 16 of user root.", "type" => "system" } ......
输出插件将事件数据发送到特定的目的地,配置在 output {} 中,且能够设置多个。
修改配置文件:
input { # 从文件读取日志信息 file { path => "/var/log/error.log" type => "error" start_position => "beginning" } } # filter { # # } output { # 输出到 elasticsearch elasticsearch { hosts => ["192.168.2.41:9200"] index => "error-%{+YYYY.MM.dd}" } }
其中,error.log 的内容格式以下:
2017-08-04 13:57:30.378 [http-nio-8080-exec-1] ERROR c.g.a.global.ResponseResultAdvice -设备数据为空 com.light.pay.common.exceptions.ValidationException: 设备数据为空 at com.light.pay.common.validate.Check.isTrue(Check.java:31) at com.light.attendance.controllers.cloudApi.DevicePushController.deviceInfoPush(DevicePushController.java:44) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:745) 2017-08-04 13:57:44.495 [http-nio-8080-exec-2] ERROR c.g.a.global.ResponseResultAdvice -Failed to invoke remote method: pushData, provider: dubbo://192.168.2.100:20880/com.light.attendance.api.DevicePushApi?application=salary-custom&default.check=false&default.timeout=30000&dubbo=2.8.4&interface=com.light.attendance.api.DevicePushApi&methods=getAllDevices,getDeviceById,pushData&organization=com.light.attendance&ow ......
配置文件中使用 elasticsearch 输出插件。输出的日志信息将被保存到 Elasticsearch 中,索引名称为 index 参数设置的格式。
若是读者不了解 Elasticsearch 基础内容,能够查看本站 《Elasticsearch 基础入门》 文章或自行百度进行知识的补缺。
保存文件。键入:
bin/logstash -f logstash.conf
打开浏览器访问 http://192.168.2.41:9100 使用 head 插件查看 Elasticsearch 数据,结果以下图:
踩坑提醒:
file 输入插件默认使用 “\n” 判断日志中每行的边界位置。error.log 是笔者本身编辑的错误日志,以前因为在复制粘贴日志内容时,忘记在内容末尾换行,致使日志数据始终没法导入到 Elasticsearch 中。 在此,提醒各位读者这个关键点。
编码解码插件本质是一种流过滤器,配合输入插件或输出插件使用。
从上图中,咱们发现一个问题:Java 异常日志被拆分红单行事件记录到 Elasticsearch 中,这不符合开发者或运维人员的查看习惯。所以,咱们须要对日志信息进行编码将多行事件转成单行事件记录起来。
咱们须要配置 Multiline codec 插件,这个插件能够将多行日志信息合并成一行,做为一个事件处理。
Logstash 默认没有安装该插件,须要开发者自行安装。键入:
bin/logstash-plugin install logstash-codec-multiline
修改配置文件:
input { # 从文件读取日志信息 file { path => "/var/log/error.log" type => "error" start_position => "beginning" # 使用 multiline 插件 codec => multiline { # 经过正则表达式匹配,具体配置根据自身实际状况而定 pattern => "^\d" negate => true what => "previous" } } } # filter { # # } output { # 输出到 elasticsearch elasticsearch { hosts => ["192.168.2.41:9200"] index => "error-%{+YYYY.MM.dd}" } }
保存文件。键入:
bin/logstash -f logstash.conf
使用 head 插件查看 Elasticsearch 数据,结果以下图:
过滤器插件位于 Logstash 管道的中间位置,对事件执行过滤处理,配置在 filter {},且能够配置多个。
本次测试使用 grok 插件演示,grok 插件用于过滤杂乱的内容,将其结构化,增长可读性。
安装:
bin/logstash-plugin install logstash-filter-grok
修改配置文件:
input { stdin {} } filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER :duration}" } } } output { stdout { codec => "rubydebug" } }
保存文件。键入:
bin/logstash -f logstash.conf
启动成功后,咱们输入:
55.3.244.1 GET /index.html 15824 0.043
控制台返回:
[root@localhost logstash-5.6.3]# bin/logstash -f logstash.conf Sending Logstash's logs to /root/logstash-5.6.3/logs which is now configured via log4j2.properties [2017-10-30T08:23:20,456][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.3/modules/fb_apache/configuration"} [2017-10-30T08:23:20,459][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.3/modules/netflow/configuration"} [2017-10-30T08:23:21,447][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>125} The stdin plugin is now waiting for input: [2017-10-30T08:23:21,516][INFO ][logstash.pipeline ] Pipeline main started [2017-10-30T08:23:21,573][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600} 55.3.244.1 GET /index.html 15824 0.043 { "duration" => "0.043", "request" => "/index.html", "@timestamp" => 2017-10-30T15:23:23.912Z, "method" => "GET", "bytes" => "15824", "@version" => "1", "host" => "localhost.localdomain", "client" => "55.3.244.1", "message" => "55.3.244.1 GET /index.html 15824 0.043" }
输入的内容被匹配到相应的名字中。