注意: Logstash须要Java 8或更高版本.可使用
http://www.oracle.com/technetwork/java/javase/downloads/index.html [oracle官方版本] 或者使用开源版本OpenJDK: http://openjdk.java.net/.html
使用下面的命令,检查你的JDK版本java
java -version
在安装了java的系统上,这个命令将有以下相似的输出:linux
java version "1.8.0_65" Java(TM) SE Runtime Environment (build 1.8.0_65-b17) Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
从https://www.elastic.co/downloads/logstash 下载适合你本身的主机环境的Logstash安装文件.
解压文件,不要将安装logstash到一个含有冒号(:)的目录下面。git
在支持包管理器的linux系统上,可使用包管理器来安装。web
咱们也有适用于APT和YUM的发行版仓库。注意咱们只提供二进制包,但没有源代码软件包,包
做为Logstash构建的一部分被建立。shell
咱们将Logstash package repositories不一样版本分放到不一样的urls,避免在主要或次要版本升级中发生意外。对于全部的2.3.x版本发布使用2.3版本号,在2.2.x使用2.2等。数据库
咱们使用PGP密钥
https://pgp.mit.edu/pks/lookup?op=vindex&search=0xD27D666CD88E42B4[D88E42B4 ]
Elastic的签名密钥,指纹centos
4609 5ACC 8548 582C 1A26 99A9 D27D 666C D88E 42B4
签署咱们的全部包。在https://pgp.mit.edu 提供 。安全
下载并安装公有签名密钥网络
wget -qO - https://packages.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
把仓库的定义加入 /etc/apt/sources.list
文件:
echo "deb https://packages.elastic.co/logstash/2.3/debian stable main" | sudo tee -a /etc/apt/sources.list
注意
Use the
echo
method described above to add the Logstash repository. Do not
useadd-apt-repository
as it will add adeb-src
entry as well, but we do not
provide a source package. If you have added thedeb-src
entry, you will see an
error like the following:
Unable to find expected entry ‘main/source/Sources’ in Release file (Wrong sources.list entry or malformed file)
Just delete the
deb-src
entry from the/etc/apt/sources.list
file and the
installation should work as expected.
运行 sudo apt-get update
使repository可用. 而后运行安装命令:
sudo apt-get update && sudo apt-get install logstash
下载并安装公有签名密钥:
rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
以下内容写到一个文件以.repo
为后缀名,放到/etc/yum.repos.d/
下,如:logstash.repo
[logstash2.3] name=Logstash repository for2.3.x packages baseurl=https://packages.elastic.co/logstash2.3/centos gpgcheck=1 gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch enabled=1
添加完仓库文件,能够安装:
yum install logstash
运行以下的基础Logstash pipeline,测试Logstash的安装
cd logstash-2.3.0 bin/logstash -e 'input { stdin { } } output { stdout {} }'
能够从命令行使用-e 参数来直接指定配置。在命令行中指定的配置可让您快速测试,无需在迭代过程当中修改文件。这个管道输入是标准输入,并移动该输入到输出,以结构化格式标准输出。
一旦"Logstash startup completed"显示,经过命令行提示,输入hello world,查看响应结果:
hello world 2013-11-21T01:22:14.405+0000 0.0.0.0 hello world
Logstash将时间戳和ip地址信息加入到消息中。使用Ctrl+D的shell命令能够从Logstash环境退出。
LogStash管道一般有一个或多个输入,过滤,输出插件。本节中的场景经过Logstash配置文件来指定这些插件,并讨论了每一个插件在作什么。
Logstash配置文件定义了Logstash 管道,当你使用-f <path/to/file>启动一个Logstash实例,其实使用了一个配置文件定义了管道实例。
一个Logstash管道有两个必备元素,输入和输出,一个可选元素,过滤器。input插件从一个源摄取数据,filter插件按照你指定的方式修改数据,output插件写出数据到一个目标数据库。
下面是一个管道配置的模板:
# #是注释,使用注释描述配置 input { } # 该部分被注释,表示filter是可选的 # filter { # # } output { }
这个模板是不具有功能的,由于输入和输出部分没有定义的任何有效的选项。
将模板粘贴到你的Logstash根目录,命名为first-pipeline.conf。
这个例子建立一个Logstash管道,使用Apache web logs 做为输入,解析这些日志到特定的命名字段,而且输出这些解析数据到ElashticSearch集群
你能够下载样例数据,这里
启动Logstash管道,使用file input插件配置Logstash实例。
修改first-pipeline.conf 添加以下内容:
input { file { path => "/path/to/logstash-tutorial.log" start_position => beginning ignore_older => 0 } }
用实际的logstash-tutorial.log日志文件目录替换 /path/to/。
gork 过滤器插件是Logstash的几个默承认用插件之一。关于如何管理Logstash 插件的更多细节请参考:reference documentation
grok 过滤器插件在输入日志中查找指定的模式,配置文件中须要指定哪些模式的识别是你的案例所感兴趣的。一个表明性的web server 日志文件行信息,以下:
83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
行首的IP地址信息很容易被识别,括号中的时间戳也是如此。在此教程中,使用%{COMBINEDAPACHELOG} grok
模式,它的行信息结构有以下的一些字段。
Information | Field Name |
---|---|
IP Address | clientip |
User ID | ident |
User Authentication | auth |
timestamp | timestamp |
HTTP Verb | verb |
Request body | request |
HTTP Version | httpversion |
HTTP Status Code | response |
Bytes served | bytes |
Referrer URL | referrer |
User agent | agent |
编写 first-pipeline.conf添加以下的内容:
filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}"} } }
过滤器处理以后,一个行信息是以下的JSON的结构:
{ "clientip" : "83.149.9.216", "ident" : , "auth" : , "timestamp" : "04/Jan/2015:05:13:42 +0000", "verb" : "GET", "request" : "/presentations/logstash-monitorama-2013/images/kibana-search.png", "httpversion" : "HTTP/1.1", "response" : "200", "bytes" : "203023", "referrer" : "http://semicomplete.com/presentations/logstash-monitorama-2013/", "agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36" }
Now that the web logs are broken down into specific fields, the Logstash pipeline can index the data into an Elasticsearch cluster. Edit the first-pipeline.conf file to add the following text after the input section:
如今,网络日志被分解成具体的字段, 使用Logstash管道能够将数据索引到Elasticsearch集群。编辑first-pipeline.conf 文件,在input 段以后添加如下文字:
output { elasticsearch { } }
用这样的配置, Logstash使用http协议链接到Elasticsearch 。上面的例子假设Logstash和Elasticsearch运行在同一实例中。您可使用主机配置像hosts => "es-machine:9092"
指定远程Elasticsearch实例。
除了解析日志数据,filter插件能够从现有数据基础上补充信息。例如: geoip插件查找IP地址,从而导出地址地理位置信息,并将该位置信息发送给日志。
使用geoip filter插件配置您的Logstash,如:经过添加如下行到first-pipeline.conf文件的filter部分:
geoip { source => "clientip" }
插件geoip的配置须要的数据已经被定义为独立的字段中的数据。确保在配置文件中,geoip 配置段在grok配置段以后。
须要指定包含IP地址的字段的名称。在本例中,字段名称是clientip(grok插件字段) 。
此时此刻,你的first-pipeline.conf文件有input,filter和output段,正确的配置看起来是这样的:
input { file { path => "/Users/palecur/logstash-1.5.2/logstash-tutorial-dataset" start_position => beginning } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG}"} } geoip { source => "clientip" } } output { elasticsearch {} stdout {} }
测试配置语法的正确性,使用下面的命令:
bin/logstash -f first-pipeline.conf --configtest
使用–configtest 选项解析你的配置文件并报告错误.当配置文件经过检查以后,用以下命令启动Logstash:
bin/logstash -f first-pipeline.conf
基于gork filter插件的字段索引,在Elasticsearch运行一个测试查询,
curl -XGET 'localhost:9200/logstash-$DATE/_search?q=response=200'
使用当前日期替换$DATE(YYYY.MM.DD格式).由于咱们的例子只有一条200 HTTP response,因此查询命中一条结果:
{"took":2, "timed_out":false, "_shards":{"total":5, "successful":5, "failed":0}, "hits":{"total":1, "max_score":1.5351382, "hits":[{"_index":"logstash-2015.07.30", "_type":"logs", "_id":"AU7gqOky1um3U6ZomFaF", "_score":1.5351382, "_source":{"message":"83.149.9.216 - - [04/Jan/2015:05:13:45 +0000] \"GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png HTTP/1.1\" 200 52878 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"", "@version":"1", "@timestamp":"2015-07-30T20:30:41.265Z", "host":"localhost", "path":"/path/to/logstash-tutorial-dataset", "clientip":"83.149.9.216", "ident":"-", "auth":"-", "timestamp":"04/Jan/2015:05:13:45 +0000", "verb":"GET", "request":"/presentations/logstash-monitorama-2013/images/frontend-response-codes.png", "httpversion":"1.1", "response":"200", "bytes":"52878", "referrer":"\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"", "agent":"\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"" } }] } }
尝试一个基于ip地址的地理信息查询:
curl -XGET 'localhost:9200/logstash-$DATE/_search?q=geoip.city_name=Buffalo'
使用当前日期替换$DATE(YYYY.MM.DD格式).
只有一条log记录是来自于城市Buffalo,因此查询的结果是:
{"took":3, "timed_out":false, "_shards":{ "total":5, "successful":5, "failed":0}, "hits":{"total":1, "max_score":1.03399, "hits":[{"_index":"logstash-2015.07.31", "_type":"logs", "_id":"AU7mK3CVSiMeBsJ0b_EP", "_score":1.03399, "_source":{ "message":"108.174.55.234 - - [04/Jan/2015:05:27:45 +0000] \"GET /?flav=rss20 HTTP/1.1\" 200 29941 \"-\" \"-\"", "@version":"1", "@timestamp":"2015-07-31T22:11:22.347Z", "host":"localhost", "path":"/path/to/logstash-tutorial-dataset", "clientip":"108.174.55.234", "ident":"-", "auth":"-", "timestamp":"04/Jan/2015:05:27:45 +0000", "verb":"GET", "request":"/?flav=rss20", "httpversion":"1.1", "response":"200", "bytes":"29941", "referrer":"\"-\"", "agent":"\"-\"", "geoip":{ "ip":"108.174.55.234", "country_code2":"US", "country_code3":"USA", "country_name":"United States", "continent_code":"NA", "region_name":"NY", "city_name":"Buffalo", "postal_code":"14221", "latitude":42.9864, "longitude":-78.7279, "dma_code":514, "area_code":716, "timezone":"America/New_York", "real_region_name":"New York", "location":[-78.7279,42.9864] } } }] } }
你须要管理的信息一般来自于不一样的源,你的案例也可能将数据送到多个目标。Logstash 管道须要使用多个输出或者输出插件来知足这些需求。
这个例子建立一个Logstash 管道来从Twitter feed 和Filebeat client获取输入信息,而后将这些信息送到Elasticsearch集群,同时写入一个文件.
要添加一个Twitter feed,你须要一系列的条件:
访问 https://dev.twitter.com/apps 注册一个Twitter 帐号 而且生成你的 consumer key 和 secret, 还有你的 OAuth token 和 secret.
把以下的内容追加到first-pipeline.conf文件的input段:
twitter { consumer_key => consumer_secret => keywords => oauth_token => oauth_token_secret => }
filebeat 客户端是一个轻量级的资源友好的文件服务端日志收集工具,而且把这些日志送到Logstash实例来处理。filebeat 客户端使用安全Beats协议与Logstash实例通讯。lumberjack协议被设计成可靠的低延迟的.Filebeat 使用托管源数据的计算机的计算资源,而且Beats input插件减小了 Logstash实例的资源需求.
注意
一个典型的用法是,Filebeat运行在,与Logstash实例分开的一个单独的计算机上.为了教学方便,咱们将Logstash 和 Filebeat运行在同一个计算机上.
默认的Logstash配置包含Beats input插件,它被设计成资源友好的.安装Filebeat在数据源所在的机器,从Filebeat产品页下载合适的包.
建立一个和这个例子相似的配置文件:
filebeat: prospectors: - paths: - "/path/to/sample-log" fields: type: syslog output: logstash: hosts: ["localhost:5043"] tls: certificate: /path/to/ssl-certificate.crt certificate_key: /path/to/ssl-certificate.key certificate_authorities: /path/to/ssl-certificate.crt timeout: 15
以filebeat.yml为名,保存配置文件
配置你的Logstash实例,加入Filebeat input插件.将以下内容添加到first-pipeline.conf文件的input段中.
beats { port => "5043" ssl => true ssl_certificate => "/path/to/ssl-cert" ssl_key => "/path/to/ssl-key" }
能够经过 file output插件配置让Logstash管道将数据直接写入一个文件.
将以下内容添加到first-pipeline.conf文件的output段,完成Logstash实例配置:
file { path => /path/to/target/file }
输出数据到多个 Elasticsearch节点,能够减轻某个给定节点的资源需求.当某一个节点挂掉的时候,提供了数据记录输入Elasticsearch的更多选择.
修改你的first-pipeline.conf文件的output段,这样就可让你的Logstash实例将数据写入到多个Elasticsearch节点.
output { elasticsearch { hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"] } }
host参数要使用三个非master的Elasticsearch集群节点ip.当host参数列出多个ip地址时,Logstash负载均衡请求到列表中的地址.另外Elasticsearch的默认端口是9200,在上面的配置中能够省略.
此时,你的 first-pipeline.conf 文件应该是下面的这样:
input { twitter { consumer_key => consumer_secret => keywords => oauth_token => oauth_token_secret => } beats { port => "5043" ssl => true ssl_certificate => "/path/to/ssl-cert" ssl_key => "/path/to/ssl-key" } } output { elasticsearch { hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"] } file { path => /path/to/target/file } }
Logstash从你配置的Twitter feed消费数据, 从 Filebeat获取数据, 而且将这些数据索引到Elasticsearch的三个节点,而且同时将数据写入一个文件.
在数据源节点上,使用以下命令启动Filebeat:
sudo ./filebeat -e -c filebeat.yml -d "publish"
Filebeat尝试链接5403端口,知道你的Logstash的Beats 插件生效,这个端口都没有响应,因此如今能够将任何的链接异常看做是正常的.
校验你的配置,使用以下命令:
bin/logstash -f first-pipeline.conf --configtest
使用–configtest 选项解析你的配置文件并报告错误.当配置文件经过检查以后,用以下命令启动Logstash:
bin/logstash -f first-pipeline.conf
使用grep命令在你的目标文件中查找,验证Mozilla信息的存在.
grep Mozilla /path/to/target/file
在你的Elasticsearch集群中运行Elasticsearch查询,验证一样的信息也存在!
curl -XGET 'localhost:9200/logstash-2015.07.30/_search?q=agent=Mozilla'