在上一篇中咱们介绍了Logstash快速入门,本文主要介绍的是ELK日志系统中的Logstash的实战使用。实战使用我打算从如下的几个场景来进行讲解。html
在咱们使用logstash将采集的数据传输到ES中的时候,会发现采集的时间@timestamp
的时间和咱们本地的不一致,这个主要是由于时区的问题致使的,咱们在计算时间的时候须要将这个时间增长8小时,可是这样会很不方便。为了永久解决这个问题,咱们能够在logstash中的filter中对该字段进行转换,增长8小时。java
添加的配置以下:json
ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] }
本来示例图:
ruby
添加配置以后的示例图:
能够看到添加配置以后@timestamp
时间已经和本地时间基本一致了。app
咱们在进行采集日志到ES中的时候,有时须要对日志内容进行切割。好比获得日志内容的时间以及日志级别等等。这时咱们就能够经过grok来对日志内容进行切分,好比将制定好的日志内容切割为日志时间、线程名称、日志级别、类名以及详细内容等等。咱们只须要在logstash的filter中使用grok语法便可完成日志内容切割。
这里咱们使用JAVA的Logback来制定日志输出格式,而后经过日志的格式编写grok语法,最后将grok配置添加到logstash的filter中。elasticsearch
Logback输出配置:测试
|%d{yyyy-MM-dd HH:mm:ss.SSS}|[%thread]|%-5level|%logger{50}|-%msg%n.net
日志样例数据:插件
|2020-07-24 17:08:33.159|[Thread-5]|INFO|com.pancm.Application|-测试示例三: All things in their being are good for something. 天生我才必有用3线程
grok模式:
|%{DATA:log_time}|%{DATA:thread}|%{DATA:log_level}|%{DATA:class_name}|-%{GREEDYDATA:content}
使用grok分析
能够看到以及分析匹配成功了。
而后咱们在filter中添加以下配置:
grok {
match => { "message" =>"|%{DATA:log_time}|%{DATA:thread}|%{DATA:log_level}|%{DATA:class_name}|-%{GREEDYDATA:content}"
}
}
最终输出的日志到ES的示例图:
咱们在使用Logstash采集日志的时候,若是没有指定索引库或模板,则会使用ElasticSearch默认自带的名字为”logstash”的模板,默认应用于Logstash写入数据到ElasticSearch使用。可是咱们但愿使用自定义的索引模板,将采集的日志按照咱们自身的想法来写入,此时咱们就须要用到自定义模板了。
主要有两种方式,一种是在logstash的output插件中使用template指定本机器上的一个模板json路径, 例如 template => "/home/logstash.json"
,json里面的内容为咱们自定的索引mapping,虽然这种方式简单,可是分散在Logstash机器上,维护起来比较麻烦。还有一种是在elasticsearc服务端自定义配置模板,事先将模板设置好,而后在logstash的output输出中指定该模板便可,这种方式比较灵活方便,可动态更改,全局生效。
这里咱们仍是经过一个示例来进行说明,咱们首先建立一个template_mylog的模板,配置这几个字段:
log_time、thread、log_level、class_name、content。
语句以下:
PUT _template/template_mylog { "index_patterns" : [ "mylog-*" ], "order" : 10, "settings": { "index.number_of_shards": 3, "number_of_replicas": 1 }, "mappings" : { "properties" : { "log_level" : { "type" : "keyword" }, "thread" : { "type" : "keyword" }, "class_name" : { "type" : "keyword" }, "content" : { "type" : "keyword" }, "log_time" : { "type" : "date","format" : "yyyy-MM-dd HH:mm:ss.SSS"} } } }
示例图:
注:上述的配置比其余mapping而言多了两个新配置,一个是index_patterns,该配置代表自动建立的索引开头以mylog-
的索引库都会采用该模板;而order表示顺序级别,在有相同的索引模板中,该值越大,优先级越高。
建立成功以后,咱们只需在output中的添加以下配置便可。
elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM.dd}" }
而后咱们启动logstash进行日志的采集。
效果图:
咱们在使用logstash采集日志的时候,有时有多种不一样的日志而且须要采集到不一样的索引库中,这时咱们就能够经过标记来进行写入。好比采集/home/logs目录下的日志我定义一个标记为java,采集/home/logs2目录下的日志我定义一个标记为java2,那么在写入ElasticSearch的时候只须要根据该标记区分写入便可。
logstash input配置示例:
file { path => ["/home/logs/mylog-2020-08-13.0.txt"] type => "java" start_position => "beginning" sincedb_path => "/dev/null" } file { path => ["/home/logs2/*.txt"] type => "java2" start_position => "beginning" sincedb_path => "/dev/null" }
logstash output配置示例:
if [type] == "java"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM.dd}" } } if [type] == "java2"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM}" } }
示例图在多行内容合并场景中。
咱们在采集日志的时候,常常会遇到异常日志,而且异常日志并不是为一行内容,若是咱们按照原有的方式采集,在ElasticSearch中显示的是一行一行的内容,这样的话咱们排查问题会很头疼。幸亏Logstash中支持多行日志合并,使用multiline.pattern、multiline.negate和multiline.what来实现配置实现。
下面的配置中,咱们经过制定匹配规则将以空格开头的全部行合并到上一行,并把以Caused by开头的也追加到上一行。
在Logstash的input配置中添加以下配置:
codec => multiline { pattern => "\s*\[" negate => "true" what => "previous" }
异常日志:
原异常日志在ElasticSearch中示例图:
多行合并以后的效果图:
logstash-test.conf 配置
input{ file { path => ["/home/logs/mylog-2020-08-13.0.txt"] type => "java" start_position => "beginning" sincedb_path => "/dev/null" } file { path => ["/home/logs2/*.txt"] type => "java2" codec => multiline { pattern => "\s*\[" negate => "true" what => "previous" } start_position => "beginning" sincedb_path => "/dev/null" } } filter { grok { match => { "message" =>"\|%{DATA:log_time}\|%{DATA:thread}\|%{DATA:log_level}\|%{DATA:class_name}\|-%{GREEDYDATA:content}" } } ruby { code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" } ruby { code => "event.set('@timestamp',event.get('timestamp'))" } mutate { remove_field => ["timestamp"] } } output { stdout { codec => rubydebug } if [type] == "java"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM.dd}" } } if [type] == "java2"{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "mylog-%{+YYYY.MM}" } } }
1.logstash: Could not execute action: PipelineAction::Create
解决办法: 斜杆采用“/”
2, logstash: object mapping for [host] tried to parse field [host] as object, but found a concrete value
解决办法: 在filter里面添加以下配置:
mutate { rename => { "host" => "host.name" } }
原创不易,若是感受不错,但愿给个推荐!您的支持是我写做的最大动力!
版权声明:
做者:虚无境
博客园出处:http://www.cnblogs.com/xuwujing
CSDN出处:http://blog.csdn.net/qazwsxpcm
我的博客出处:http://www.panchengming.com