logstash 配置

安装Logstash

注意: Logstash须要Java 8或更高版本.可使用
http://www.oracle.com/technetwork/java/javase/downloads/index.html [oracle官方版本] 或者使用开源版本OpenJDK: http://openjdk.java.net/.html

使用下面的命令,检查你的JDK版本java

java -version

在安装了java的系统上,这个命令将有以下相似的输出:linux

java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)

下载二进制版本安装

从https://www.elastic.co/downloads/logstash 下载适合你本身的主机环境的Logstash安装文件.
解压文件,不要将安装logstash到一个含有冒号(:)的目录下面。git

在支持包管理器的linux系统上,可使用包管理器来安装。web

从Package Repositories安装

咱们也有适用于APT和YUM的发行版仓库。注意咱们只提供二进制包,但没有源代码软件包,包
做为Logstash构建的一部分被建立。shell

咱们将Logstash package repositories不一样版本分放到不一样的urls,避免在主要或次要版本升级中发生意外。对于全部的2.3.x版本发布使用2.3版本号,在2.2.x使用2.2等。数据库

咱们使用PGP密钥
https://pgp.mit.edu/pks/lookup?op=vindex&search=0xD27D666CD88E42B4[D88E42B4 ]
Elastic的签名密钥,指纹centos

4609 5ACC 8548 582C 1A26 99A9 D27D 666C D88E 42B4

签署咱们的全部包。在https://pgp.mit.edu 提供 。安全

APT安装

下载并安装公有签名密钥网络

wget -qO - https://packages.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -

把仓库的定义加入 /etc/apt/sources.list 文件:

echo "deb https://packages.elastic.co/logstash/2.3/debian stable main" | sudo tee -a /etc/apt/sources.list

注意

Use the echo method described above to add the Logstash repository. Do not
use add-apt-repository as it will add a deb-src entry as well, but we do not
provide a source package. If you have added the deb-src entry, you will see an
error like the following:

Unable to find expected entry ‘main/source/Sources’ in Release file (Wrong sources.list entry or malformed file)

Just delete the deb-src entry from the /etc/apt/sources.list file and the
installation should work as expected.

运行 sudo apt-get update 使repository可用. 而后运行安装命令:

sudo apt-get update && sudo apt-get install logstash

YUM安装

下载并安装公有签名密钥:

rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

以下内容写到一个文件以.repo为后缀名,放到/etc/yum.repos.d/下,如:logstash.repo

[logstash2.3]
name=Logstash repository for2.3.x packages
baseurl=https://packages.elastic.co/logstash2.3/centos
gpgcheck=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

添加完仓库文件,能够安装:

yum install logstash

 

运行以下的基础Logstash pipeline,测试Logstash的安装

cd logstash-2.3.0
bin/logstash -e 'input { stdin { } } output { stdout {} }'

能够从命令行使用-e 参数来直接指定配置。在命令行中指定的配置可让您快速测试,无需在迭代过程当中修改文件。这个管道输入是标准输入,并移动该输入到输出,以结构化格式标准输出。

一旦"Logstash startup completed"显示,经过命令行提示,输入hello world,查看响应结果:

hello world
2013-11-21T01:22:14.405+0000 0.0.0.0 hello world

Logstash将时间戳和ip地址信息加入到消息中。使用Ctrl+D的shell命令能够从Logstash环境退出。

 

Logstash管道的高级用法

LogStash管道一般有一个或多个输入,过滤,输出插件。本节中的场景经过Logstash配置文件来指定这些插件,并讨论了每一个插件在作什么。
Logstash配置文件定义了Logstash 管道,当你使用-f <path/to/file>启动一个Logstash实例,其实使用了一个配置文件定义了管道实例。
一个Logstash管道有两个必备元素,输入和输出,一个可选元素,过滤器。input插件从一个源摄取数据,filter插件按照你指定的方式修改数据,output插件写出数据到一个目标数据库。


下面是一个管道配置的模板:

# #是注释,使用注释描述配置
input {
}
# 该部分被注释,表示filter是可选的
# filter {
#
# }
output {
}

这个模板是不具有功能的,由于输入和输出部分没有定义的任何有效的选项。
将模板粘贴到你的Logstash根目录,命名为first-pipeline.conf。

解析Apache日志输出到Elasticsearch

这个例子建立一个Logstash管道,使用Apache web logs 做为输入,解析这些日志到特定的命名字段,而且输出这些解析数据到ElashticSearch集群

你能够下载样例数据,这里

配置Logstash用于文件输入

启动Logstash管道,使用file input插件配置Logstash实例。

修改first-pipeline.conf 添加以下内容:

input {
    file {
        path => "/path/to/logstash-tutorial.log"
        start_position => beginning 
        ignore_older => 0 
    }
}
  • file input 插件的默认行为是监视文件中的末尾新内容,和Unix中的tail -f 命令殊途同归。改变这种默认行为的方式是指定Logstash从文件的什么位置开始处理文件。
  • 文件输入插件的默认行为是忽略最后时间距今超过86400秒的文件。要更改这种默认行为,并处理tutorial file(日期距今大于1天的文件),咱们须要指定不能忽视旧文件。

用实际的logstash-tutorial.log日志文件目录替换 /path/to/。

用Grok 过滤器插件解析web logs

gork 过滤器插件是Logstash的几个默承认用插件之一。关于如何管理Logstash 插件的更多细节请参考:reference documentation

grok 过滤器插件在输入日志中查找指定的模式,配置文件中须要指定哪些模式的识别是你的案例所感兴趣的。一个表明性的web server 日志文件行信息,以下:

83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png
HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel
Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

行首的IP地址信息很容易被识别,括号中的时间戳也是如此。在此教程中,使用%{COMBINEDAPACHELOG} grok 模式,它的行信息结构有以下的一些字段。

Information Field Name
IP Address clientip
User ID ident
User Authentication auth
timestamp timestamp
HTTP Verb verb
Request body request
HTTP Version httpversion
HTTP Status Code response
Bytes served bytes
Referrer URL referrer
User agent agent

编写 first-pipeline.conf添加以下的内容:

filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}

过滤器处理以后,一个行信息是以下的JSON的结构:

{
"clientip" : "83.149.9.216",
"ident" : ,
"auth" : ,
"timestamp" : "04/Jan/2015:05:13:42 +0000",
"verb" : "GET",
"request" : "/presentations/logstash-monitorama-2013/images/kibana-search.png",
"httpversion" : "HTTP/1.1",
"response" : "200",
"bytes" : "203023",
"referrer" : "http://semicomplete.com/presentations/logstash-monitorama-2013/",
"agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
}

索引数据到ElasticSearch集群

Now that the web logs are broken down into specific fields, the Logstash pipeline can index the data into an Elasticsearch cluster. Edit the first-pipeline.conf file to add the following text after the input section:

如今,网络日志被分解成具体的字段, 使用Logstash管道能够将数据索引到Elasticsearch集群。编辑first-pipeline.conf 文件,在input 段以后添加如下文字:

output {
    elasticsearch {
    }
}

用这样的配置, Logstash使用http协议链接到Elasticsearch 。上面的例子假设Logstash和Elasticsearch运行在同一实例中。您可使用主机配置像hosts => "es-machine:9092"指定远程Elasticsearch实例。

使用geoip插件丰富你的数据内容

除了解析日志数据,filter插件能够从现有数据基础上补充信息。例如: geoip插件查找IP地址,从而导出地址地理位置信息,并将该位置信息发送给日志。

使用geoip filter插件配置您的Logstash,如:经过添加如下行到first-pipeline.conf文件的filter部分:

geoip {
    source => "clientip"
}

插件geoip的配置须要的数据已经被定义为独立的字段中的数据。确保在配置文件中,geoip 配置段在grok配置段以后。

须要指定包含IP地址的字段的名称。在本例中,字段名称是clientip(grok插件字段) 。

测试一下

此时此刻,你的first-pipeline.conf文件有input,filter和output段,正确的配置看起来是这样的:

input {
    file {
        path => "/Users/palecur/logstash-1.5.2/logstash-tutorial-dataset"
        start_position => beginning
    }
}
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
output {
    elasticsearch {}
    stdout {}
}

测试配置语法的正确性,使用下面的命令:

bin/logstash -f first-pipeline.conf --configtest

使用–configtest 选项解析你的配置文件并报告错误.当配置文件经过检查以后,用以下命令启动Logstash:

bin/logstash -f first-pipeline.conf

基于gork filter插件的字段索引,在Elasticsearch运行一个测试查询,

curl -XGET 'localhost:9200/logstash-$DATE/_search?q=response=200'

使用当前日期替换$DATE(YYYY.MM.DD格式).由于咱们的例子只有一条200 HTTP response,因此查询命中一条结果:

{"took":2,
"timed_out":false,
"_shards":{"total":5,
  "successful":5,
  "failed":0},
"hits":{"total":1,
  "max_score":1.5351382,
  "hits":[{"_index":"logstash-2015.07.30",
    "_type":"logs",
    "_id":"AU7gqOky1um3U6ZomFaF",
    "_score":1.5351382,
    "_source":{"message":"83.149.9.216 - - [04/Jan/2015:05:13:45 +0000] \"GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png HTTP/1.1\" 200 52878 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"",
      "@version":"1",
      "@timestamp":"2015-07-30T20:30:41.265Z",
      "host":"localhost",
      "path":"/path/to/logstash-tutorial-dataset",
      "clientip":"83.149.9.216",
      "ident":"-",
      "auth":"-",
      "timestamp":"04/Jan/2015:05:13:45 +0000",
      "verb":"GET",
      "request":"/presentations/logstash-monitorama-2013/images/frontend-response-codes.png",
      "httpversion":"1.1",
      "response":"200",
      "bytes":"52878",
      "referrer":"\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"",
      "agent":"\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\""
      }
    }]
  }
}

尝试一个基于ip地址的地理信息查询:

curl -XGET 'localhost:9200/logstash-$DATE/_search?q=geoip.city_name=Buffalo'

使用当前日期替换$DATE(YYYY.MM.DD格式).
只有一条log记录是来自于城市Buffalo,因此查询的结果是:

{"took":3,
"timed_out":false,
"_shards":{
  "total":5,
  "successful":5,
  "failed":0},
"hits":{"total":1,
  "max_score":1.03399,
  "hits":[{"_index":"logstash-2015.07.31",
    "_type":"logs",
    "_id":"AU7mK3CVSiMeBsJ0b_EP",
    "_score":1.03399,
    "_source":{
      "message":"108.174.55.234 - - [04/Jan/2015:05:27:45 +0000] \"GET /?flav=rss20 HTTP/1.1\" 200 29941 \"-\" \"-\"",
      "@version":"1",
      "@timestamp":"2015-07-31T22:11:22.347Z",
      "host":"localhost",
      "path":"/path/to/logstash-tutorial-dataset",
      "clientip":"108.174.55.234",
      "ident":"-",
      "auth":"-",
      "timestamp":"04/Jan/2015:05:27:45 +0000",
      "verb":"GET",
      "request":"/?flav=rss20",
      "httpversion":"1.1",
      "response":"200",
      "bytes":"29941",
      "referrer":"\"-\"",
      "agent":"\"-\"",
      "geoip":{
        "ip":"108.174.55.234",
        "country_code2":"US",
        "country_code3":"USA",
        "country_name":"United States",
        "continent_code":"NA",
        "region_name":"NY",
        "city_name":"Buffalo",
        "postal_code":"14221",
        "latitude":42.9864,
        "longitude":-78.7279,
        "dma_code":514,
        "area_code":716,
        "timezone":"America/New_York",
        "real_region_name":"New York",
        "location":[-78.7279,42.9864]
      }
    }
  }]
 }
}

使用多个输入和输出插件

你须要管理的信息一般来自于不一样的源,你的案例也可能将数据送到多个目标。Logstash 管道须要使用多个输出或者输出插件来知足这些需求。

这个例子建立一个Logstash 管道来从Twitter feed 和Filebeat client获取输入信息,而后将这些信息送到Elasticsearch集群,同时写入一个文件.

从Twitter feed读数据

要添加一个Twitter feed,你须要一系列的条件:

  • 一个consumer key :能惟一标识你的Twitter app(在这个例子中是Logstash)
  • 一个consumer secret, 做为你的Twitter app的密码
  • 一个或多个 keywords用于查询 feed.
  • 一个oauth token,标志使用这个app的Twitter帐号
  • 一个 oauth token secret, 做为 Twitter 帐号密码.

访问 https://dev.twitter.com/apps 注册一个Twitter 帐号 而且生成你的 consumer key 和 secret, 还有你的 OAuth token 和 secret.

把以下的内容追加到first-pipeline.conf文件的input段:

twitter {
    consumer_key =>
    consumer_secret =>
    keywords =>
    oauth_token =>
    oauth_token_secret =>
}

Filebeat Client

filebeat 客户端是一个轻量级的资源友好的文件服务端日志收集工具,而且把这些日志送到Logstash实例来处理。filebeat 客户端使用安全Beats协议与Logstash实例通讯。lumberjack协议被设计成可靠的低延迟的.Filebeat 使用托管源数据的计算机的计算资源,而且Beats input插件减小了 Logstash实例的资源需求.

注意
一个典型的用法是,Filebeat运行在,与Logstash实例分开的一个单独的计算机上.为了教学方便,咱们将Logstash 和 Filebeat运行在同一个计算机上.

默认的Logstash配置包含Beats input插件,它被设计成资源友好的.安装Filebeat在数据源所在的机器,从Filebeat产品页下载合适的包.

建立一个和这个例子相似的配置文件:

filebeat:
  prospectors:
    -
      paths:
        - "/path/to/sample-log" 
      fields:
        type: syslog
output:
  logstash:
    hosts: ["localhost:5043"]
  tls:
    certificate: /path/to/ssl-certificate.crt 
    certificate_key: /path/to/ssl-certificate.key
    certificate_authorities: /path/to/ssl-certificate.crt
    timeout: 15
  • Filebeat 处理的一个或多个文件的路径
  • Logstash实例的SSL证书路径

以filebeat.yml为名,保存配置文件

配置你的Logstash实例,加入Filebeat input插件.将以下内容添加到first-pipeline.conf文件的input段中.

beats {
    port => "5043"
    ssl => true
    ssl_certificate => "/path/to/ssl-cert" 
    ssl_key => "/path/to/ssl-key" 
}
  • Logstash实例验证本身到Filebeat的SSL证书的路径。
  • SSL证书的key的保存路径。

将Logstash数据写入一个文件

能够经过 file output插件配置让Logstash管道将数据直接写入一个文件.

将以下内容添加到first-pipeline.conf文件的output段,完成Logstash实例配置:

file {
    path => /path/to/target/file
}

输出数据到多个 Elasticsearch节点

输出数据到多个 Elasticsearch节点,能够减轻某个给定节点的资源需求.当某一个节点挂掉的时候,提供了数据记录输入Elasticsearch的更多选择.

修改你的first-pipeline.conf文件的output段,这样就可让你的Logstash实例将数据写入到多个Elasticsearch节点.

output {
    elasticsearch {
        hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
    }
}

host参数要使用三个非master的Elasticsearch集群节点ip.当host参数列出多个ip地址时,Logstash负载均衡请求到列表中的地址.另外Elasticsearch的默认端口是9200,在上面的配置中能够省略.

测试管道

此时,你的 first-pipeline.conf 文件应该是下面的这样:

input {
    twitter {
        consumer_key =>
        consumer_secret =>
        keywords =>
        oauth_token =>
        oauth_token_secret =>
    }
    beats {
        port => "5043"
        ssl => true
        ssl_certificate => "/path/to/ssl-cert"
        ssl_key => "/path/to/ssl-key"
    }
}
output {
    elasticsearch {
        hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]
    }
    file {
        path => /path/to/target/file
    }
}

Logstash从你配置的Twitter feed消费数据, 从 Filebeat获取数据, 而且将这些数据索引到Elasticsearch的三个节点,而且同时将数据写入一个文件.

在数据源节点上,使用以下命令启动Filebeat:

sudo ./filebeat -e -c filebeat.yml -d "publish"

Filebeat尝试链接5403端口,知道你的Logstash的Beats 插件生效,这个端口都没有响应,因此如今能够将任何的链接异常看做是正常的.

校验你的配置,使用以下命令:

bin/logstash -f first-pipeline.conf --configtest

使用–configtest 选项解析你的配置文件并报告错误.当配置文件经过检查以后,用以下命令启动Logstash:

bin/logstash -f first-pipeline.conf

使用grep命令在你的目标文件中查找,验证Mozilla信息的存在.

grep Mozilla /path/to/target/file

在你的Elasticsearch集群中运行Elasticsearch查询,验证一样的信息也存在!

curl -XGET 'localhost:9200/logstash-2015.07.30/_search?q=agent=Mozilla'
相关文章
相关标签/搜索