Logstash
#############################logstash命令经常使用参数#############################
html
-n 指定logstash实例名称,若是没有指定,默认是本地主机名 -f 从指定的文件或文件夹中加载logstash的配置;若是是一个文件夹,它会加载里面全部的文件,或者能够加上通配符只加载特定格式的文件 -w 容许filter和output的pipeline线程数量,默认是CPU核数 -b 每一个 Logstash pipeline 线程,在执行具体的 filter 和 output 函数以前,最多能累积的日志条数,默认是 125 条。越大性能越好,一样也会消耗越多的 JVM 内存 -u 每一个 Logstash pipeline 线程,在打包批量日志的时候,最多等待几毫秒。默认是 5 ms -l 指定日志输出位置 -r 监控配置配置文件,若是有变化则自动重载logstash -e 使用给定的命令行字符串做为配置,直接运行bin/log/logstash -e 配置默认是"input { stdin { type => stdin } }" "output { stdout { codec => rubydebug } }" -t 检查配置文件是否有语法错误 -V 打印logstash版本 --log.level 指定日志等级,包括trace、debug、info、warn、error、fatal,默认info --http.host 指定web API绑定的主机,默认127.0.0.1 --http.port 指定web API的http端口,默认9600至9700
#############################后台启动logstash#############################
1.安装supervisor
java
yum -y install supervisor --enablerepo=epel
2.在/etc/supervisord.conf配置文件最后添加:
node
[program:logstash] directory=/usr/local/logstash command=/usr/local/logstash/bin/logstash -f /usr/local/logstash/etc/conf.d/ -r -l /var/log/logstash
3.启动
nginx
service supervisord start
4.单独控制一个子进程
git
supervisorctl stop logstash
#############################编解码配置插件#############################
#官网参考:https://www.elastic.co/guide/en/logstash/current/codec-plugins.html
#logstash不仅是一个input|filter|output的数据流,而是一个input|decode|filter|encode|output的数据流。codec就是用来decode、encode事件的。
1.JSON编解码
#直接输入预约义好的JSON数据,能够省略filter/grok配置,下降logstash过滤的CPU负载消耗
github
input { file { path => "/opt/data/logs/bjape01-ngx-a1-172.16.3.2/nginx_access.log" codec => "json" } }
2.multiline多行事件编码
web
codec => multiline { pattern => "^\[" negate => true what => "previous" }
#pattern 要匹配的正则表达式,字符串类型
#negate 正则表达式是否生效,布尔类型,默认为flase
#what 未匹配的内容是向前合并仍是向后后合并,previous,next两个值选择
#示例配置能够用于Log4j方式的应用程序日志
3.line面向行的文本数据
4.plain空的纯文本编解码
#使用已经有定义框架输入或输出,例如logstash输出转存到elasticsearch
正则表达式
output { if [type] == "user_audit" { elasticsearch { hosts => ["172.16.1.25","172.16.1.26","172.16.1.27"] index => 'user_audit-%{+YYYY-MM-dd}' codec=>plain{charset=>"UTF-8"} } } }
#############################输入插件#############################
#官网参考:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
1.标准输入
示例:
数据库
input { stdin { add_field => {"key" => "value"} codec => "plain" tags => ["add"] type => "std" } }
#add_field 向事件添加一个字段,hash类型
#codec 设置编码方式,默认是line
#tags 添加标记
#type 添加类型
2.文件输入
apache
input { file { path => [ "/opt/data/logs/idca-web1-172.16.3.2/apache_access.log", "/opt/data/logs/idca-web1-172.16.3.2/apache_error.log" ] stat_interval => 1 discover_interval => 1 type => "logstash-apache_log" } }
#path 处理的文件的路径, 能够定义多个路径
#stat_interval 每隔多久检查一次被监听文件状态(是否更新),默认是1秒
#discover_interval 每隔多久去检查一次被监听的path下是否有新文件,默认是15秒
#start_position 从什么位置开始读取文件数据,"beginning"从头开始读取,读到最后一行不会终止,继续tailf;"end"从结束位置tailf。默认值是"end"
#坑:start_position仅在该文件从未被监听过的时候起做用,若是sincedb文件中已经有这个文件的inode记录了,那么logstash依然会冲记录过的pos开始读取数据。因此重复测试的时候每回须要删除sincedb文件,该文件通常在安装目录下的data/plugins/inputs/file/中
#优化:在file中添加sincedb_path => "/dev/null",能够直接将sincedb写到黑洞中
#exclude 不想被监听的文件能够排除出去
#sincedb_path 配置sincedb文件位置
#sincedb_write_interval 每隔多久写一次sincedb文件,默认15秒
3.syslog输入
input { syslog { port => "514" } }
4.kafka输入
input { kafka { type => "logstash-apache_log" codec => "plain" topics => "apache_log" bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092" } }
#topics 主题名,多个主题时使用列表形式
#bootstrap_servers kafka地址,多个地址时使用“host1:port1,host2:port2
”格式
#############################过滤器配置#############################
#官网参考:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
1.grok正则捕获
#用于匹配对应格式的message,而后过滤出来
filter { grok { break_on_match => false patterns_dir => ["/usr/local/logstash/etc/conf.d/patterns"] match => { "message" => "\[%{TIMESTAMP_ISO8601:time}\] \[%{GLASSFISHVERSION:gfversion}\] %{OTHER}"} } grok { break_on_match => false patterns_dir => ["/usr/local/logstash/etc/conf.d/patterns"] match => {"path" => "/%{USER}/%{USER}/%{USER}/%{USER:host}/%{OTHER}" } overwrite => [ "host" ] } }
#break_on_match 表示匹配即中止,默认为true,若是但愿grok尝试全部模式,设置为false
#patterns_dir 将grok表达式统一写到文件中,该选项指定文件目录。grok表达式能够写到配置文件中,可是日志格式可能有多种,每种都写一行本身的表达式就可能会出问题,因此建议将全部的grok表达式统一写到一个地方。
#grok表达式格式:
USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME} #第一行用普通的正则表达式来定义一个grok表达式;第二行用以定义好的grok表达式来定义另外一个grok表达式。logstash内置了多种grok表达式,查看https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns,一样能够自定义grok表达式。
%{NUMBER:num} %{NUMBER:num:int} #grok表达式的打印复制格式,第一行中NUMBER表示匹配的模式,num是匹配内容的标识,表明匹配的内容;第二行中int表示转换匹配内容的格式,默认是字符串格式。
#overwrite用来重写message字段,上面例子第二个grok中match的是一个路径,可是咱们只想保留路径中的一个host字段,因此overwrite => [ "host" ]就能够了
2.date时间处理
#日志产生到logstash处理通常都会有一段时间差,致使logstash实际处理时间都比日志产生时间晚。logstash-filter-date插件能够解析日志中的时间,变成LogStash:Timestamp对象,而后转存到@timestamp字段里,做为事件的时间戳;若是未使用date插件或没有解析到时间,logstash将使用日志输入的时间做为事件的时间戳。
date { match => [ "time", "MMM dd yyyy HH:mm:ss", "MMM d yyyy, HH:mm:ss", "ISO8601" ] }
#配置选项包括:locale、match、tag_on_failure、target、timezone。经常使用的是match,值的类型一样是列表,默认是[]
#实例中的time就是grok中匹配字段的标识,由咱们自定义。
#date插件支持五种时间格式,经常使用的是ISO8601和Joda-Time库
ISO8601:相似"2018-10-19T14:25:25.905+0800",这是咱们的北京时间,若是是UTC时间后面跟的是一个大写的Z(时区偏移),nginx、apache、java应用均可以设置为这种格式。 UNIX:UNIX时间戳格式,记录从1970年起至今的总秒数。 UNIX_MS:从1970年至今的总毫秒数。 TAI64N:tai64n格式,不多用。 Joda-Time库:y年,yyyy例如2018,yy例如18;M月,MMMM例如January,MMM例如Jan,MM例如01,M例如1;d日,dd例如01,d例如1;H时,HH例如01,H例如1;m分,mm例如01,m例如1;s秒,ss例如01,s例如1
3.GeoIP地址查询
#GeoIP过滤器根据来自Maxmind GeoLite2数据库的数据,添加关于IP地址地理位置的信息。
#插件名称logstash-filter-geoip,能够先使用logstash-plugin list命令查看插件是否安装,没有安装可使用logstash-plugin install logstash-filter-geoip命令来安装
#基本配置
geoip { source => "clientip" }
#debug标准输出为
{ "path" => "/var/log/logstash_test/apache_access.log", "type" => "apache_log", "timestamp" => "26/Oct/2018:14:44:25 +0800", "@version" => "1", "response" => "302", "usetime" => "0", "referrer" => "/hsbp2pUser/view/html/errorSkip.html;jsessionid=f1deac55291cdf60c1e66108a89c.bjape01-wgw-b2", "@timestamp" => 2018-10-26T06:44:25.000Z, "geoip" => { "country_code2" => "CN", "timezone" => "Asia/Shanghai", "latitude" => 22.8167, "continent_code" => "AS", "region_code" => "45", "region_name" => "Guangxi", "country_name" => "China", "location" => { "lon" => 108.3167, "lat" => 22.8167 }, "country_code3" => "CN", "ip" => "113.15.241.43", "longitude" => 108.3167, "city_name" => "Nanning" }, "agent" => "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)", "host" => "opsmanage", "message" => "113.15.241.43 - - [26/Oct/2018:14:44:25 +0800] \"/hsbp2pUser/view/html/errorSkip.html;jsessionid=f1deac55291cdf60c1e66108a89c.bjape01-wgw-b2\" 302 460 0 \"Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)\"", "clientip" => "113.15.241.43", "bytes" => "460", "remote_user" => "-" }
#经常使用配置
#source 指定来源IP字段,即便用GeoIP库来分析的IP地址,若是是一个列表,只有第一个会被使用
#database 指定GeoIP库的路径,若是不指定该配置项默认随Logstash的GeoLite2城市数据库
#fields 指定输出那些字段,格式是一个列表。若是不指定默认为[city_name,continent_code,country_code2,country_code3,country_name,dma_code,ip,latitude,longitude,postal_code,region_name,timezone]
#target 指定应该将geoip数据存储在哪一个Logstash的字段,通常设置为geoip
#############################输出插件#############################
#官网参考:https://www.elastic.co/guide/en/logstash/current/output-plugins.html
1.输出到Elasticsearch
output { elasticsearch { hosts => ["172.16.1.25","172.16.1.26","172.16.1.27"] index => "%{type}-%{+YYYY-MM-dd}" codec=>plain{charset=>"UTF-8"} } }
#hosts Elasticsearch地址,列表形式,能够写多个。#index 索引名,写入Elasticsearch索引的名称,可使用变量。注意:%{+YYYY-MM-dd}}在语法解析时,看到已+开头的,就会自动认为后面是时间格式,因此以前处理过程当中不要给自定义字段取加号开头的名字;索引名中不要有大写字母,不然Elasticsearch会报InvalidindexNameException