Logstash技术入门

Logstash简介

Logstash是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括全部能够抛出来的日志类型。怎么样听起来挺厉害的吧?php

依赖条件:JAVAcss

Logstash运行仅仅依赖java运行环境(jre)。各位能够在命令行下运行java -version命令java

启动和运行Logstash的两条命令示例

第一步咱们先下载Logstashweb

curl -O https://artifacts.elastic.co/downloads/logstash/logstash-6.4.0.tar.gz

如今你应该有了一个叫logstash-6.4.0.tar.gz的文件了。 咱们把它解压一下面试

tar zxvf logstash-6.4.0.tar.gz
cd logstash-6.4.0

如今咱们来运行一下:redis

bin/logstash -e 'input { stdin { } } output { stdout {} }'

咱们如今能够在命令行下输入一些字符,而后咱们将看到logstash的输出内容:shell

hello world
2013-11-21T01:22:14.405+0000 0.0.0.0 hello world

 以上例子咱们在运行logstash中,定义了一个叫"stdin"的input还有一个"stdout"的output,不管咱们输入什么字符,Logstash都会按照某种格式来返回咱们输入的字符。这里注意咱们在命令行中使用了 -e 参数,该参数容许Logstash直接经过命令行接受设置。这点尤为快速的帮助咱们反复的测试配置是否正确而不用写配置文件。数据库

让咱们再试个更有意思的例子。首先咱们在命令行下使用CTRL-C命令退出以前运行的Logstash。如今咱们从新运行Logstash使用下面的命令:apache

bin/logstash -e 'input { stdin { } } output { stdout { codec => rubydebug } }'

咱们再输入一些字符,此次咱们输入"goodnight moon":编程

goodnight moon
{
   "message" => 
   "goodnight moon",
   "@timestamp" => "2013-11-20T23:48:05.335Z",
   "@version" => "1",
   "host" => "my-laptop"
}

以上示例经过从新设置了叫"stdout"的output(添加了"codec"参数),咱们就能够改变Logstash的输出表现。相似的咱们能够经过在你的配置文件中添加或者修改inputs、outputs、filters,就可使随意的格式化日志数据成为可能,从而订制更合理的存储格式为查询提供便利。

使用Elasticsearch存储日志

如今,你也许会说:"它看起来还挺高大上的,不过手工输入字符,并把字符从控制台回显出来。实际状况并不实用"。说的好,那么接下来咱们将创建Elasticsearch来存储输入到Logstash的日志数据。若是你尚未安装Elasticsearch,你能够 下载RPM/DEB包 或者手动下载tar包,经过如下命令:

curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-6.4.0.tar.gz
tar zxvf elasticsearch-6.4.0.tar.gz
cd elasticsearch-6.4.0/
./bin/elasticsearch
注意

本篇文章实例使用Logstash 6.4.0和Elasticsearch 6.4.0。不一样的Logstash版本都有对应的建议Elasticsearch版本。请确认你使用的Logstash版本!

更多有关安装和设置Elasticsearch的信息能够参考 Elasticsearch官网 。由于咱们主要介绍Logstash的入门使用,Elasticsearch默认的安装和配置就已经知足咱们要求。

言归正专,如今Elasticsearch已经运行并监听9200端口了,经过简单的设置Logstash就可使用Elasticsearch做为它的后端。默认的配置对于Logstash和Elasticsearch已经足够,咱们忽略一些额外的选项来设置elasticsearch做为output:

bin/logstash -e 'input { stdin { } } output { elasticsearch { host => localhost } }'

随意的输入一些字符,Logstash会像以前同样处理日志(不过此次咱们将不会看到任何的输出,由于咱们没有设置stdout做为output选项)

you know, for logs

咱们可使用curl命令发送请求来查看ES是否接收到了数据:

curl 'http://localhost:9200/_search?pretty'

返回内容以下:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "logstash-2013.11.21",
      "_type" : "logs",
      "_id" : "2ijaoKqARqGvbMgP3BspJA",
      "_score" : 1.0, "_source" : {"message":"you know, for logs","@timestamp":"2013-11-21T18:45:09.862Z","@version":"1","host":"my-laptop"}
    } ]
  }}

恭喜,至此你已经成功利用Elasticsearch和Logstash来收集日志数据了。

Elasticsearch 插件(题外话)

这里介绍另一个对于查询你的Logstash数据(Elasticsearch中数据 )很是有用的工具叫Elasticsearch-kopf插件。更多的信息请见 Elasticsearch插件 。安装elasticsearch-kopf,只要在你安装Elasticsearch的目录中执行如下命令便可:

bin/plugin -install lmenezes/elasticsearch-kopf

接下来访问 http://localhost:9200/_plugin/kopf 来浏览保存在Elasticsearch中的数据,设置及映射!

多重输出

做为一个简单的例子来设置多重输出,让咱们同时设置stdout和elasticsearch做为output来从新运行一下Logstash,以下:

bin/logstash -e 'input { stdin { } } output { elasticsearch { host => localhost } stdout { } }'

当咱们输入了一些词组以后,这些输入的内容回回显到咱们的终端,同时还会保存到Elasticsearch!(可使用curl和kopf插件来验证)。

默认配置 - 按照每日日期创建索引

你将发现Logstash能够足够灵巧的在Elasticsearch上创建索引... 天天会按照默认格式是logstash-YYYY.MM.DD来创建索引。在午夜(GMT),Logstash自动按照时间戳更新索引。咱们能够根据追溯多长时间的数据做为依据来制定保持多少数据,固然你也能够把比较老的数据迁移到其余的地方(从新索引)来方便查询,此外若是仅仅是简单的删除一段时间数据咱们可使用 Elasticsearch Curator 。

接下来

接下来咱们开始了解更多高级的配置项。在下面的章节,咱们着重讨论logstash一些核心的特性,以及如何和logstash引擎交互的。

事件的生命周期

Inputs,Outputs,Codecs,Filters构成了Logstash的核心配置项。Logstash经过创建一条事件处理的管道,从你的日志提取出数据保存到Elasticsearch中,为高效的查询数据提供基础。为了让你快速的了解Logstash提供的多种选项,让咱们先讨论一下最经常使用的一些配置。更多的信息,请参考 Logstash事件管道 。

Inputs

input 及输入是指日志数据传输到Logstash中。其中常见的配置以下:

  • file:从文件系统中读取一个文件,很像UNIX命令 "tail -0a"
  • syslog:监听514端口,按照RFC3164标准解析日志数据
  • redis:从redis服务器读取数据,支持channel(发布订阅)和list模式。redis通常 在Logstash消费集群中 做为"broker"角色,保存events队列共Logstash消费。
  • lumberjack:使用lumberjack协议来接收数据,目前已经改成 logstash-forwarder。

Filters

Fillters 在Logstash处理链中担任中间处理组件。他们常常被组合起来实现一些特定的行为来,处理匹配特定规则的事件流。常见的filters以下:

  • grok:解析无规则的文字并转化为有结构的格式。Grok 是目前最好的方式来将无结构的数据转换为有结构可查询的数据。有120多种匹配规则,会有一种知足你的须要。
  • mutate:mutate filter 容许改变输入的文档,你能够从命名,删除,移动或者修改字段在处理事件的过程当中。
  • drop:丢弃一部分events不进行处理,例如:debug events。
  • clone:拷贝 event,这个过程当中也能够添加或移除字段。
  • geoip:添加地理信息(为前台kibana图形化展现使用)

Outputs

outputs是logstash处理管道的最末端组件。一个event能够在处理过程当中通过多重输出,可是一旦全部的outputs都执行结束,这个event也就完成生命周期。一些经常使用的outputs包括:

  • elasticsearch:若是你计划将高效的保存数据,而且可以方便和简单的进行查询...Elasticsearch是一个好的方式。是的,此处有作广告的嫌疑,呵呵。
  • file:将event数据保存到文件中。
  • graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展现的组件。 http://graphite.wikidot.com/ 。
  • statsd:statsd是一个统计服务,好比技术和时间统计,经过udp通信,聚合一个或者多个后台服务,若是你已经开始使用statsd,该选项对你应该颇有用。

Codecs

codecs 是基于数据流的过滤器,它能够做为input,output的一部分配置。Codecs能够帮助你轻松的分割发送过来已经被序列化的数据。流行的codecs包括 json,msgpack,plain(text)。

  • json:使用json格式对数据进行编码/解码
  • multiline:将汇多个事件中数据汇总为一个单一的行。好比:java异常信息和堆栈信息

获取完整的配置信息,请参考 Logstash文档 中 "plugin configuration"部分。


更多有趣Logstash内容

使用配置文件

使用-e参数 在命令行中指定配置是很经常使用的方式,不过若是须要配置更多设置则须要很长的内容。这种状况,咱们首先建立一个简单的配置文件,而且指定logstash使用这个配置文件。如咱们建立一个文件名是"logstash-simple.conf"的配置文件而且保存在和Logstash相同的目录中。内容以下:

input { stdin { } }
output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}

接下来,执行命令:

bin/logstash -f logstash-simple.conf

咱们看到logstash按照你刚刚建立的配置文件来运行例子,这样更加的方便。注意,咱们使用-f参数来从文件获取而代替以前使用-e参数从命令行中获取配置。以上演示很是简单的例子,固然解析来咱们继续写一些复杂一些的例子。

过滤器

filters是一个行处理机制将提供的为格式化的数据整理成你须要的数据,让咱们看看下面的一个例子,叫grok filter的过滤器。

input { stdin { } }

filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}

output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}

执行Logstash按照以下参数:

bin/logstash -f logstash-filter.conf

如今粘贴下面一行信息到你的终端(固然Logstash就会处理这个标准的输入内容):

127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"

你将看到相似以下内容的反馈信息:

{
      "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"",
    "@timestamp" => "2013-12-11T08:01:45.000Z",
     "@version" => "1",
        "host" => "cadenza",
    "clientip" => "127.0.0.1",
       "ident" => "-",
        "auth" => "-",
    "timestamp" => "11/Dec/2013:00:01:45 -0800",
        "verb" => "GET",
      "request" => "/xampp/status.php",
   "httpversion" => "1.1",
     "response" => "200",
       "bytes" => "3891",
     "referrer" => "\"http://cadenza/xampp/navi.php\"",
       "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\""

正像你看到的那样,Logstash(使用了grok过滤器)可以将一行的日志数据(Apache的"combined log"格式)分割设置为不一样的数据字段。这一点对于往后解析和查询咱们本身的日志数据很是有用。好比:HTTP的返回状态码,IP地址相关等等,很是的容易。不多有匹配规则没有被grok包含,因此若是你正尝试的解析一些常见的日志格式,或许已经有人为了作了这样的工做。若是查看详细匹配规则,参考 logstash grok patterns 。

另一个过滤器是date filter。这个过滤器来负责解析出来日志中的时间戳并将值赋给timestame字段(无论这个数据是何时收集到logstash的)。你也许注意到在这个例子中@timestamp字段是设置成December 11, 2013, 说明logstash在日志产生以后一段时间进行处理的。这个字段在处理日志中回添到数据中的,举例来讲... 这个值就是logstash处理event的时间戳。


实用的例子

Apache 日志(从文件获取)

如今,让咱们使用一些很是实用的配置... apache2访问日志!咱们将从本地读取日志文件,而且经过条件设置处理知足咱们须要的event。首先,咱们建立一个文件名是logstash-apache.conf的配置文件,内容以下(你能够根据实际状况修改你的文件名和路径):

input {
file {
path => "/tmp/access_log"
start_position => beginning
}
}

filter {
if [path] =~ "access" {
mutate { replace => { "type" => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}

output {
elasticsearch {
host => localhost
}

stdout { codec => rubydebug }
}

接下来,咱们按照上面的配置建立一个文件(在例子中是"/tmp/access.log"),能够将下面日志信息做为文件内容(也能够用你本身的webserver产生的日志):

71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3"
134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)"
98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"

如今使用-f参数来执行一下上面的例子:

bin/logstash -f logstash-apache.conf

你能够看到apache的日志数据已经导入到ES中了。这里logstash会按照你的配置读取,处理指定的文件,任何后添加到文件的内容也会被捕获处理最后保存到ES中。此外,数据中type的字段值会被替换成"apache_access"(这个功能在配置中已经指定)。

这个配置只是让Logstash监控了apache access_log,可是在实际中每每并不够用可能还须要监控error_log,只要在上面的配置中改变一行既能够实现,以下:

input {
file {
path => "/tmp/*_log"...

如今你能够看到logstash处理了error日志和access日志。然而,若是你检查了你的数据(也许用elasticsearch-kopf ),你将发现access_log日志被分红不一样的字段,可是error_log确没有这样。这是由于咱们使用了“grok”filter并仅仅配置匹配combinedapachelog日志格式,这样知足条件的日志就会自动的被分割成不一样的字段。咱们能够经过控制日志按照它本身的某种格式来解析日志,不是很好的吗?对吧。

此外,你也许还会发现Logstash不会重复处理文件中已经处理过得events。由于Logstash已经记录了文件处理的位置,这样就只处理文件中新加入的行数。漂亮!

条件判断

咱们利用上一个例子来介绍一下条件判断的概念。这个概念通常状况下应该被大多数的Logstash用户熟悉掌握。你能够像其余普通的编程语言同样来使用if,else if和else语句。让咱们把每一个event依赖的日志文件类型都标记出来(access_log,error_log其余以log结尾的日志文件)。

input {
file {
path => "/tmp/*_log"
}
}


filter {
if [path] =~ "access" {
mutate { replace => { type => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}

} else if [path] =~ "error" {
mutate { replace => { type => "apache_error" } }
} else {
mutate { replace => { type => "random_logs" } }
}
}


output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }

}

我想你已经注意到了,咱们使用"type"字段来标记每一个event,可是咱们实际上没有解析"error"和”random"类型的日志... 而实际状况下可能会有不少不少类型的错误日志,如何解析就做为练习留给各位读者吧,你能够依赖已经存在的日志。

Syslog

Ok,如今咱们继续了解一个很实用的例子:syslog。Syslog对于Logstash是一个很长用的配置,而且它有很好的表现(协议格式符合RFC3164)。Syslog其实是UNIX的一个网络日志标准,由客户端发送日志数据到本地文件或者日志服务器。在这个例子中,你根本不用创建syslog实例;咱们经过命令行就能够实现一个syslog服务,经过这个例子你将会看到发生什么。

首先,让咱们建立一个简单的配置文件来实现logstash+syslog,文件名是 logstash-syslog.conf

input {
tcp {
port => 5000
type => syslog
}

udp {
port => 5000
type => syslog
}
}


filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}

syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}


output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}

执行logstash:

bin/logstash -f logstash-syslog.conf

一般,须要一个客户端连接到Logstash服务器上的5000端口而后发送日志数据。在这个简单的演示中咱们简单的使用telnet连接到logstash服务器发送日志数据(与以前例子中咱们在命令行标准输入状态下发送日志数据相似)。首先咱们打开一个新的shell窗口,而后输入下面的命令:

telnet localhost 5000

你能够复制粘贴下面的样例信息(固然也可使用其余字符,不过这样可能会被grok filter不能正确的解析):

Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' deniedDec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2\>/var/log/cacti/poller-error.log)
Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.

以后你能够在你以前运行Logstash的窗口中看到输出结果,信息被处理和解析!

{

         "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",

        "@timestamp" => "2013-12-23T22:30:01.000Z",

        "@version" => "1",

          "type" => "syslog",

          "host" => "0:0:0:0:0:0:0:1:52617",

    "syslog_timestamp" => "Dec 23 14:30:01",

     "syslog_hostname" => "louis",

      "syslog_program" => "CRON",

        "syslog_pid" => "619",

      "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",

       "received_at" => "2013-12-23 22:49:22 UTC",

       "received_from" => "0:0:0:0:0:0:0:1:52617",

  "syslog_severity_code" => 5,

  "syslog_facility_code" => 1,

     "syslog_facility" => "user-level",

     "syslog_severity" => "notice"

}

看到这里你已经成为一个合格的Logstash用户了。你将能够轻松的配置,运行Logstash,还能够发送event给Logstash,可是这个过程随着使用还会有不少值得深挖的地方。

参考:Logstash官方网站文档,多数细节来源于官方文档描述,小编加以翻译与我的理解整理而成。

欢迎你们关注个人微信公众号【民工哥技术之路】,最新整理的 2TB 技术干货:包括架构师实战教程、大数据、Docker容器、系统运维、数据库、redis、MogoDB、电子书、Java基础课程、Java实战项目、ELK Stack、机器学习、BAT面试精讲视频等。只需在「 民工哥技术之路」微信公众号对话框回复关键字:1024便可获取所有资料。

相关文章
相关标签/搜索