logstash高速入口

原文地址:http://logstash.net/docs/1.4.2/tutorials/getting-started-with-logstash

英语水平有限,假设有错误请各位指正


简单介绍

Logstash是一个接收,处理,转发日志的工具。支持系统日志,webserver日志。错误日志。应用日志,总之包含所有可以抛出来的日志类型。

怎么样听起来挺厉害的吧?
在一个典型的使用场景下(ELK):用Elasticsearch做为后台数据的存储,kibana用来前端的报表展现。php

Logstash在其过程当中担任搬运工的角色,它为数据存储。报表查询和日志解析建立了一个功能强大的管道链。css

Logstash提供了多种多样的 input,filters,codecs和output组件,让使用者轻松实现强大的功能。好了让咱们開始吧
html

依赖条件:JAVA

Logstash执行只依赖java执行环境(jre)。各位可以在命令行下执行java -version命令
显示相似例如如下结果:
java -version
java version "1.7.0_45"
Java(TM) SE Runtime Environment (build 1.7.0_45-b18)
Java HotSpot(TM) 64-Bit Server VM (build 24.45-b08, mixed mode)
为了确保成功执行Logstash建议你们使用较最近的jre版本号。
可以获取开源版本号的jre在:http://openjdk.java.net
或者你可以在官网下载Oracle jdk版本号:http://www.oracle.com/technetwork/java/index.html
一旦jre已经成功在你的系统中安装完毕,咱们就可以继续了

启动和执行Logstash的两条命令演示样例

第一步咱们先下载Logstash
curl -O https://download.elasticsearch.org/logstash/logstash/logstash-1.4.2.tar.gz
现在你应该有了一个叫logstash-1.4.2.tar.gz的文件了。 咱们把它解压一下
tar zxvf logstash-1.4.2.tar.gz
cd logstash-1.4.2
现在咱们来执行一下:
bin/logstash -e 'input { stdin { } } output { stdout {} }'
咱们现在可以在命令行下输入一些字符。而后咱们将看到logstash的输出内容:
hello world
2013-11-21T01:22:14.405+0000 0.0.0.0 hello world
Ok,还挺有意思的吧... 以上样例咱们在执行logstash中,定义了一个叫"stdin"的input另外一个"stdout"的output,无论咱们输入什么字符。Logstash都会依照某种格式来返回咱们输入的字符。

这里注意咱们在命令行中使用了-e參数,该參数赞成Logstash直接经过命令行接受设置。这点尤为高速的帮助咱们重复的測试配置是否正确而不用写配置文件。前端


让咱们再试个更有意思的样例。首先咱们在命令行下使用CTRL-C命令退出以前执行的Logstash。现在咱们又一次执行Logstash使用如下的命令:
bin/logstash -e 'input { stdin { } } output { stdout { codec => rubydebug } }'
咱们再输入一些字符,此次咱们输入"goodnight moon":
goodnight moon
{
  "message" => "goodnight moon",
  "@timestamp" => "2013-11-20T23:48:05.335Z",
  "@version" => "1",
  "host" => "my-laptop"
}
以上演示样例经过又一次设置了叫"stdout"的output(加入了"codec"參数),咱们就可以改变Logstash的输出表现。相似的咱们可以经过在你的配置文件里加入或者改动inputs、outputs、filters,就可以使任意的格式化日志数据成为可能,从而订制更合理的存储格式为查询提供便利。


使用Elasticsearch存储日志

现在,你或许会说:"它看起来还挺高大上的,只是手工输入字符。并把字符从控制台回显出来。实际状况并不有用"。

说的好,那么接下来咱们将创建Elasticsearch来存储输入到Logstash的日志数据。假设你尚未安装Elasticsearch。你可以下载RPM/DEB包或者手动下载tar包。经过下面命令:java

curl -O https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.1.1.tar.gz
tar zxvf elasticsearch-1.1.1.tar.gz
cd elasticsearch-1.1.1/
./bin/elasticsearch

注意
本篇文章实例使用Logstash 1.4.2和Elasticsearch 1.1.1。

不一样的Logstash版本号都有相应的建议Elasticsearch版本号。请确认你使用的Logstash版本号!git


不少其它有关安装和设置Elasticsearch的信息可以參考Elasticsearch官网。因为咱们主要介绍Logstash的入门使用,Elasticsearch默认的安装和配置就已经知足咱们要求。

言归正专。现在Elasticsearch已经执行并监听9200port了(你们都搞定了,对吗?),经过简单的设置Logstash就可以使用Elasticsearch做为它的后端。

默认的配置对于Logstash和Elasticsearch已经足够,咱们忽略一些额外的选项来设置elasticsearch做为output:github

bin/logstash -e 'input { stdin { } } output { elasticsearch { host => localhost } }'
任意的输入一些字符。Logstash会像以前同样处理日志(只是此次咱们将不会看到不论什么的输出,因为咱们没有设置stdout做为output选项)
you know, for logs
咱们可以使用curl命令发送请求来查看ES是否接收到了数据:
curl 'http://localhost:9200/_search?pretty'
返回内容例如如下:
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "logstash-2013.11.21",
      "_type" : "logs",
      "_id" : "2ijaoKqARqGvbMgP3BspJA",
      "_score" : 1.0, "_source" : {"message":"you know, for logs","@timestamp":"2013-11-21T18:45:09.862Z","@version":"1","host":"my-laptop"}
    } ]
  }
}
恭喜,至此你已经成功利用Elasticsearch和Logstash来收集日志数据了。


Elasticsearch 插件(题外话)

这里介绍另一个对于查询你的Logstash数据(Elasticsearch中数据 )很实用的工具叫Elasticsearch-kopf插件。不少其它的信息请见Elasticsearch插件

安装elasticsearch-kopf,仅仅要在你安装Elasticsearch的文件夹中运行下面命令就能够:web

bin/plugin -install lmenezes/elasticsearch-kopf
接下来訪问 http://localhost:9200/_plugin/kopf 来浏览保存在Elasticsearch中的数据,设置及映射!

多重输出

做为一个简单的样例来设置多重输出,让咱们同一时候设置stdout和elasticsearch做为output来又一次执行一下Logstash。例如如下:
bin/logstash -e 'input { stdin { } } output { elasticsearch { host => localhost } stdout { } }'
当咱们输入了一些词组以后。这些输入的内容回回显到咱们的终端,同一时候还会保存到Elasticsearch!

(可以使用curl和kopf插件来验证)。redis

默认配置 - 依照每日日期创建索引

你将发现Logstash可以足够机灵的在Elasticsearch上创建索引... 天天会依照默认格式是logstash-YYYY.MM.DD来创建索引。

在午夜(GMT),Logstash本身主动依照时间戳更新索引。咱们可以根据追溯多长时间的数据做为根据来制定保持多少数据。固然你也可以把比較老的数据迁移到其它的地方(又一次索引)来方便查询,此外假设不过简单的删除一段时间数据咱们可以使用Elasticsearch Curatorshell

接下来

接下来咱们開始了解不少其它高级的配置项。在如下的章节,咱们着重讨论logstash一些核心的特性,以及怎样和logstash引擎交互的。

事件的生命周期

Inputs,Outputs,Codecs,Filters构成了Logstash的核心配置项。

Logstash经过创建一条事件处理的管道。从你的日志提取出数据保存到Elasticsearch中。为高效的查询数据提供基础。

为了让你高速的了解Logstash提供的多种选项,让咱们先讨论一下最常用的一些配置。

不少其它的信息,请參考Logstash事件管道


Inputs

input 及输入是指日志传输数据到Logstash中。当中常见的配置例如如下:
  • file:从文件系统中读取一个文件,很是像UNIX命令 "tail -0a"
  • syslog:监听514port,依照RFC3164标准解析日志数据
  • redis:从redisserver读取数据。支持channel(公布订阅)和list模式。

    redis通常在Logstash消费集群中做为"broker"角色,保存events队列共Logstash消费。

  • lumberjack:使用lumberjack协议来接收数据,眼下已经改成 logstash-forwarder
Filters

Fillters 在Logstash处理链中担任中间处理组件。他们经常被组合起来实现一些特定的行为来,处理匹配特定规则的事件流。常见的filters例如如下:
  • grok:解析无规则的文字并转化为有结构的格式。

    Grok 是眼下最好的方式来将无结构的数据转换为有结构可查询的数据。

    有120多种匹配规则,会有一种知足你的需要。

  • mutate:mutate filter 赞成改变输入的文档,你可以从命名,删除,移动或者改动字段在处理事件的过程当中。
  • drop:丢弃一部分events不进行处理,好比:debug events。
  • clone:拷贝 event。这个过程当中也可以加入或移除字段。
  • geoip:加入地理信息(为前台kibana图形化展现使用)
Outputs

outputs是logstash处理管道的最末端组件。一个event可以在处理过程当中通过多重输出,但是一旦所有的outputs都运行结束,这个event也就完毕生命周期。一些常用的outputs包含:
  • elasticsearch:假设你计划将高效的保存数据,并且能够方便和简单的进行查询...Elasticsearch是一个好的方式。是的,此处有作广告的嫌疑,呵呵。
  • file:将event数据保存到文件里。
  • graphite:将event数据发送到图形化组件中,一个很是流行的开源存储图形化展现的组件。http://graphite.wikidot.com/
  • statsd:statsd是一个统计服务,比方技术和时间统计。经过udp通信,聚合一个或者多个后台服务,假设你已经開始使用statsd,该选项对你应该很是实用。
Codecs

codecs 是基于数据流的过滤器,它可以做为input。output的一部分配置。

Codecs可以帮助你轻松的切割发送过来已经被序列化的数据。流行的codecs包含 json,msgpack,plain(text)。

  • json:使用json格式对数据进行编码/解码
  • multiline:将汇多个事件中数据汇总为一个单一的行。

    比方:java异常信息和堆栈信息

获取完整的配置信息。请參考 Logstash文档中 "plugin configuration"部分。

不少其它有趣Logstash内容

使用配置文件

使用-e參数在命令行中指定配置是非常常常使用的方式,只是假设需要配置不少其它设置则需要很是长的内容。这样的状况。咱们首先建立一个简单的配置文件,并且指定logstash使用这个配置文件。如咱们建立一个文件名称是"logstash-simple.conf"的配置文件并且保存在和Logstash一样的文件夹中。

内容例如如下:

input { stdin { } }
output {
  elasticsearch { host => localhost }
  stdout { codec => rubydebug }
}
接下来,运行命令:
bin/logstash -f logstash-simple.conf
咱们看到logstash依照你刚刚建立的配置文件来执行样例。这样更加的方便。注意,咱们使用-f參数来从文件获取而取代以前使用-e參数从命令行中获取配置。以上演示很easy的样例。固然解析来咱们继续写一些复杂一些的样例。


过滤器

filters是一个行处理机制将提供的为格式化的数据整理成你需要的数据,让咱们看看如下的一个样例,叫grok filter的过滤器。

input { stdin { } }

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch { host => localhost }
  stdout { codec => rubydebug }
}
运行Logstash依照例如如下參数:
bin/logstash -f logstash-filter.conf
现在粘贴如下一行信息到你的终端(固然Logstash就会处理这个标准的输入内容):
127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
你将看到相似例如如下内容的反馈信息:
{
        "message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"",
     "@timestamp" => "2013-12-11T08:01:45.000Z",
       "@version" => "1",
           "host" => "cadenza",
       "clientip" => "127.0.0.1",
          "ident" => "-",
           "auth" => "-",
      "timestamp" => "11/Dec/2013:00:01:45 -0800",
           "verb" => "GET",
        "request" => "/xampp/status.php",
    "httpversion" => "1.1",
       "response" => "200",
          "bytes" => "3891",
       "referrer" => "\"http://cadenza/xampp/navi.php\"",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\""
}
正像你看到的那样,Logstash(使用了grok过滤器)能够将一行的日志数据(Apache的"combined log"格式)切割设置为不一样的数据字段。这一点对于往后解析和查询咱们本身的日志数据很实用。比方:HTTP的返回状态码。IP地址相关等等。很的easy。不多有匹配规则没有被grok包括,因此假设你正尝试的解析一些常见的日志格式。也许已经有人为了作了这种工做。假设查看具体匹配规则。參考logstash grok patterns

另一个过滤器是date filter。这个过滤器来负责解析出来日志中的时间戳并将值赋给timestame字段(不管这个数据是何时收集到logstash的)。

你或许注意到在这个样例中@timestamp字段是设置成December 11, 2013, 说明logstash在日志产生以后一段时间进行处理的。这个字段在处理日志中回添到数据中的,举例来讲... 这个值就是logstash处理event的时间戳。


有用的样例

Apache 日志(从文件获取)

现在,让咱们使用一些颇有用的配置... apache2訪问日志!咱们将从本地读取日志文件,并且经过条件设置处理知足咱们需要的event。

首先。咱们建立一个文件名称是logstash-apache.conf的配置文件。内容例如如下(你可以依据实际状况改动你的文件名称和路径):

input {
  file {
    path => "/tmp/access_log"
    start_position => beginning
  }
}

filter {
  if [path] =~ "access" {
    mutate { replace => { "type" => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

output {
  elasticsearch {
    host => localhost
  }
  stdout { codec => rubydebug }
}
接下来。咱们依照上面的配置建立一个文件(在样例中是"/tmp/access.log"),可以将如下日志信息做为文件内容(也可以用你本身的webserver产生的日志):
71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3"
134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)"
98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
现在使用-f參数来运行一下上面的样例:
bin/logstash -f logstash-apache.conf
你可以看到apache的日志数据已经导入到ES中了。这里logstash会依照你的配置读取,处理指定的文件。不论什么后加入到文件的内容也会被捕获处理最后保存到ES中。

此外,数据中type的字段值会被替换成"apache_access"(这个功能在配置中已经指定)。


这个配置仅仅是让Logstash监控了apache access_log,但是在实际中每每并不够用可能还需要监控error_log。仅仅要在上面的配置中改变一行既可以实现。例如如下:
input {
  file {
    path => "/tmp/*_log"
...

现在你可以看到logstash处理了error日志和access日志。

然而,假设你检查了你的数据(或许用elasticsearch-kopf),你将发现access_log日志被分红不一样的字段,但是error_log确没有这样。这是因为咱们使用了“grok”filter并只配置匹配combinedapachelog日志格式,这样知足条件的日志就会本身主动的被切割成不一样的字段。咱们可以经过控制日志依照它本身的某种格式来解析日志,不是很是好的吗?对吧。


此外,你或许还会发现Logstash不会反复处理文件里已经处理过得events。因为Logstash已经记录了文件处理的位置。这样就仅仅处理文件里新增长的行数。美丽!

条件推断

咱们利用上一个样例来介绍一下条件推断的概念。

这个概念普通状况下应该被大多数的Logstash用户熟悉掌握。

你可以像其它普通的编程语言同样来使用if,else if和else语句。让咱们把每个event依赖的日志文件类型都标记出来(access_log,error_log其它以log结尾的日志文件)。

input {
  file {
    path => "/tmp/*_log"
  }
}

filter {
  if [path] =~ "access" {
    mutate { replace => { type => "apache_access" } }
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    date {
      match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
  } else if [path] =~ "error" {
    mutate { replace => { type => "apache_error" } }
  } else {
    mutate { replace => { type => "random_logs" } }
  }
}

output {
  elasticsearch { host => localhost }
  stdout { codec => rubydebug }
}

我想你已经注意到了,咱们使用"type"字段来标记每个event,但是咱们实际上没有解析"error"和”random"类型的日志... 而实际状况下可能会有很是多很是多类型的错误日志,怎样解析就做为练习留给各位读者吧。你可以依赖已经存在的日志。

Syslog

Ok,现在咱们继续了解一个很是有用的样例:syslog。Syslog对于Logstash是一个很是长用的配置,并且它有很是好的表现(协议格式符合RFC3164)。Syslog其实是UNIX的一个网络日志标准,由client发送日志数据到本地文件或者日志server。在这个样例中,你根本不用创建syslog实例;咱们经过命令行就可以实现一个syslog服务,经过这个样例你将会看到发生什么。

首先,让咱们建立一个简单的配置文件来实现logstash+syslog。文件名称是 logstash-syslog.conf

input {
  tcp {
    port => 5000
    type => syslog
  }
  udp {
    port => 5000
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

output {
  elasticsearch { host => localhost }
  stdout { codec => rubydebug }
}

运行logstash:

bin/logstash -f logstash-syslog.conf

一般。需要一个client连接到Logstashserver上的5000port而后发送日志数据。在这个简单的演示中咱们简单的使用telnet连接到logstashserver发送日志数据(与以前样例中咱们在命令行标准输入状态下发送日志数据相似)。

首先咱们打开一个新的shell窗体,而后输入如下的命令:

telnet localhost 5000
你可以复制粘贴如下的例子信息(固然也可以使用其它字符,只是这样可能会被grok filter不能正确的解析):
Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied
Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'.

以后你可以在你以前执行Logstash的窗体中看到输出结果。信息被处理和解析!


{
                 "message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
              "@timestamp" => "2013-12-23T22:30:01.000Z",
                "@version" => "1",
                    "type" => "syslog",
                    "host" => "0:0:0:0:0:0:0:1:52617",
        "syslog_timestamp" => "Dec 23 14:30:01",
         "syslog_hostname" => "louis",
          "syslog_program" => "CRON",
              "syslog_pid" => "619",
          "syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
             "received_at" => "2013-12-23 22:49:22 UTC",
           "received_from" => "0:0:0:0:0:0:0:1:52617",
    "syslog_severity_code" => 5,
    "syslog_facility_code" => 1,
         "syslog_facility" => "user-level",
         "syslog_severity" => "notice"
}

恭喜全部。看到你已经成为一个贴近格的Logstash的用户。您将可以轻松配置。执行Logstash,还可以发送event给Logstash,但这个过程将有很是多的使用的值这个地方被挖。
相关文章
相关标签/搜索