能够使用Logstash的grok模块对任意文本解析并结构化输出。Logstash默认带有120中匹配模式。
php
能够参见源代码html
logstash/patterns/grok-patternslinux
logstash/lib/logstash/filters/grok.rbgit
grok的语法格式为 %{SYNTAX:SEMANTIC}github
SYNTAX是文本要匹配的模式,例如3.14匹配 NUMBER 模式,127.0.0.1 匹配 IP 模式。
正则表达式
SEMANTIC 是匹配到的文本片断的标识。例如 “3.14” 能够是一个时间的持续时间,因此能够简单地叫作"duration" ,字符串"55.3.244.1"能够被标识为“client”apache
因此,grok过滤器表达式能够写成:
ruby
%{NUMBER:duration} %{IP:client}ssh
默认状况下,全部的SEMANTIC是以字符串的方式保存,若是想要转换一个SEMANTIC的数据类型,例如转换一个字符串为×××,能够写成以下的方式:curl
%{NUMBER:num:int}
例如日志
55.3.244.1 GET /index.html 15824 0.043
能够写成以下的grok过滤表达式
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
再举一个实际的案例
常规的Apache日志
127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] "GET /router.php HTTP/1.1" 404 285 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2" 127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] "GET /router.php HTTP/1.1" 404 285 "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2"
使用Logstash收集
input{ file { type => "apache" path => "/var/log/httpd/access_log" exclude => ["*.gz"] sincedb_path => "/dev/null" } } output { stdout { codec => rubydebug } }
显示:
{ "message" => "127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] \"GET /router.php HTTP/1.1\" 404 285 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"", "@version" => "1", "@timestamp" => "2015-04-13T09:22:03.844Z", "type" => "apache", "host" => "xxxxxx", "path" => "/var/log/httpd/access_log" } { "message" => "127.0.0.1 - - [13/Apr/2015:17:22:03 +0800] \"GET /router.php HTTP/1.1\" 404 285 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"", "@version" => "1", "@timestamp" => "2015-04-13T09:22:03.844Z", "type" => "apache", "host" => "xxxxxx", "path" => "/var/log/httpd/access_log" }
修改配置以下:
input { file { type => "apache" path => "/var/log/httpd/access_log" exclude => ["*.gz"] sincedb_path => "/dev/null" } } filter { if [type] == "apache" { grok { match => ["message", "%{COMBINEDAPACHELOG}"] } } } output { stdout { codec => rubydebug } }
显示:
{ "message" => "127.0.0.1 - - [14/Apr/2015:09:53:40 +0800] \"GET /router.php HTTP/1.1\" 404 285 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"", "@version" => "1", "@timestamp" => "2015-04-14T01:53:57.182Z", "type" => "apache", "host" => "xxxxxxxx", "path" => "/var/log/httpd/access_log", "clientip" => "127.0.0.1", "ident" => "-", "auth" => "-", "timestamp" => "14/Apr/2015:09:53:40 +0800", "verb" => "GET", "request" => "/router.php", "httpversion" => "1.1", "response" => "404", "bytes" => "285", "referrer" => "\"-\"", "agent" => "\"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"" } { "message" => "127.0.0.1 - - [14/Apr/2015:09:53:40 +0800] \"GET /router.php HTTP/1.1\" 404 285 \"-\" \"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"", "@version" => "1", "@timestamp" => "2015-04-14T01:53:57.187Z", "type" => "apache", "host" => "xxxxxxx", "path" => "/var/log/httpd/access_log", "clientip" => "127.0.0.1", "ident" => "-", "auth" => "-", "timestamp" => "14/Apr/2015:09:53:40 +0800", "verb" => "GET", "request" => "/router.php", "httpversion" => "1.1", "response" => "404", "bytes" => "285", "referrer" => "\"-\"", "agent" => "\"curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.15.3 zlib/1.2.3 libidn/1.18 libssh2/1.4.2\"" }
这里的%{COMBINEDAPACHELOG} 是logstash自带的匹配模式
patterns/grok-patterns
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:req uest}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
grok能够支持任意正则表达式
因此支持的正则表达式的语法能够参见
http://www.geocities.jp/kosako3/oniguruma/doc/RE.txt
在有些状况下自带的匹配模式没法知足需求,能够自定义一些匹配模式
首先能够根据正则表达式匹配文本片断
(?<field_name>the pattern here)
例如,postfix日志有一个字段表示 queue id,能够使用如下表达式进行匹配:
(?<queue_id>[0-9A-F]{10,11}
能够手动建立一个匹配文件
# # contents of ./patterns/postfix:
POSTFIX_QUEUEID [0-9A-F]{10,11}
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com> filter { grok { patterns_dir => "./patterns" match => [ "message", "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" ] } } The above will match and result in the following fields: * timestamp: Jan 1 06:25:43 * logsource: mailserver14 * program: postfix/cleanup * pid: 21403 * queue_id: BEF25A72965 * syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com> The `timestamp`, `logsource`, `program`, and `pid` fields come from the SYSLOGBASE pattern which itself is defined by other patterns.
能够使用重写
The fields to overwrite. This allows you to overwrite a value in a field that already exists. For example, if you have a syslog line in the 'message' field, you can overwrite the 'message' field with part of the match like so: filter { grok { match => [ "message", "%{SYSLOGBASE} %{DATA:message}" ] overwrite => [ "message" ] } } In this case, a line like "May 29 16:37:11 sadness logger: hello world" will be parsed and 'hello world' will overwrite the original message.
参考文档: