ELK技术栈之-Logstash详解

ELK技术栈之-Logstash详解

 

前言

在第九章节中,咱们已经安装好Logstash组件了,而且启动实例测试它的数据输入和输出,可是用的是最简单的控制台标准输入和标准输出,那这节咱们就来深刻的学习Logstash的详细使用。php

经常使用启动参数

咱们在上一节中演示了启动Logstash的实例,其中咱们启动的时候给Logstash脚本传入了-e的参数,但实际上,Logstash的启动参数有不少,咱们来看一下各个启动参数的做用:html

  • -e #当即启动实例,例如:./logstash -e "input {stdin {}} output {stdout {}}"
  • -f #指定启动实例的配置文件,例如:./logstash -f config/test.conf
  • -t #测试配置文件的正确性,例如:./logstash -f config/test.conf -t
  • -l #指定日志文件名称,例如:./logstash -f config/test.conf -l logs/test.log
  • -w #指定filter线程数量,不指定默认是5,例如:./logstash-f config/test.conf -w 8

配置文件语法

文件结构

咱们刚刚知道,启动参数能够指定一个配置文件,那么接下来就有必要来了解一下配置文件的结构:
Logstash经过{}来定义区域,区域内能够定义插件,一个区域内能够定义多个插件,以下:java

input {
    #标准输入源插件
    stdin {
    }
    #普通文件源插件
    file {
        path => ["/var/log/*.log", "/var/log/message"]
       ....
    }
   ......
}
filter {
      #grok过滤插件
      grok {
            match => ["message", "%{HTTPDATE:logdate}"]
            .....
      }
     #date过滤插件
     date {
            match => ["logdate", "dd/MMM/yyyy:HH:mm:ss Z"]
           .....
    }
   .....
}
output {
     stdout {
     }
     elasticsearch {
        hosts => ["127.0.0.1:9200"]
        ....
    }
    .....
}

咱们先大概了解一下配置文件的结构,接下来咱们再详细看这些插件的配置。node

数据类型

Logstash配置文件支持的数据类型有:
一、Boolean,例如:ssl_enable => true
二、Number,例如:port => 33
三、String,例如:name => “Hello world”
四、hash,例如:options => {key1 => "value1", key2 => "value2"}
五、array,例如:match => ["datetime", "UNIX", "ISO8601"]mysql

字段引用

Logstash数据流中的数据被称之为Event对象,Event以JSON结构构成,Event的属性被称之为字段,若是你想在配置文件中引用这些字段,只须要把字段的名字写在中括号[]里就好了,如[type],对于嵌套字段每层字段名称都写在[]里就能够了,好比:[tags][type];除此以外,对于Logstash的arrag类型支持下标与倒序下表,如:[tags][type][0][tags][type][-1]
如下的内容就是一个Event对象:git

{
      "message" => "hello logstash",
      "@version" => "1",
      "@timestamp" => 2018-08-13T17:32:01.122Z,
      "host" => "localhost.localdomain"
}

条件判断

Logstash支持下面的操做符:
一、==(等于), !=(不等于), <(小于), >(大于), <=(小于等于), >=(大于等于)
二、=~(匹配正则), !~(不匹配正则)
三、in(包含), not in(不包含)
四、and(与), or(或), nand(非与), xor(非或)
五、()(复合表达式), !()(对复合表达式结果取反)
例如如下的条件判断:web

if "_grokparsefailure" not in [tags] {
} 
else if [status] !~ /^2\d\d/ or ( [url] == "/noc.gif" nand [geoip][city] != "beijing" ) {
} 
else {
}

环境变量引用

Logstash支持引用系统环境变量,环境变量不存在时能够设置默认值,例如:正则表达式

export TCP_PORT=12345
input {
  tcp {
    port => "${TCP_PORT:54321}"
  }
}

经常使用输入插件

在第九章中,咱们已经使用是标准输入,以键盘的输入数据做为Logstash数据源,但实际上咱们也知道,Logstash的数据源有不少,每种数据源都有相应的配置,在Logstash中,这些数据源的相应配置称为插件,咱们经常使用的输入插件有:file、jdbc、redis、tcp、syslog,这些输入插件会监听数据源的数据,若是新增数据,将数据封装成Event进程处理或者传递,更多的输入插件你们能够看Logstash官网,接下来咱们以file和jdbc两个输入插件做为例子,来学习输入插件的使用,其余输入插件使用起来大同小异,你们自行扩展。redis

file输入插件

读取文件插件主要用来抓取文件的数据变化信息,以此做为Logstash的数据源。sql

  • 配置示例:
input{
  file {
    path => ["/var/log/*.log", "/var/log/message"]
    type => "system"
    start_position => "beginning"
  }
}
output{
  stdout{}
}
  • 经常使用参数
参数名称 数据类型 默认值 描述
path array 用于匹配被监控的文件,如"/var/logs/*.log"或者 "/var/log/message",必须使用绝对路径
type string Event的type字段,若是采用elasticsearch作store,在默认状况下将做为elasticsearch的type
sincedb_path string “$HOME/.sincedb*” 文件读取记录,必须指定一个文件而不是目录,文件中保存没个被监控的文件等当前inode和byteoffset
sincedb_write_interval number 15 间隔多少秒写一次sincedb文件
start_position string "end" 值为“beginning”和“end”,从文件等开头仍是结尾读取文件内容,默认是结尾,若是须要导入文件中的老数据,能够设置为“beginning”,该选项只在第一次启动logstash时有效,若是文件已经存在于sincedb的记录内,则此配置无效
stat_interval number 1 间隔多少秒检查一下文件是否被修改,加大此参数将下降系统负载,可是增长了发现新日志的间隔时间
close_older number 3600 设置文件多少秒内没有更新就关掉对文件的监听
codec string “plain” 输入数据以后对数据进行解码
add_field hash {} 用于向Event中添加字段
delimiter string “\n” 文件内容的行分隔符,默认按照换行进行Event对象封装,也就是说一行数据就一个Event对象
discover_interval number 15 间隔多少秒查看一下path匹配的路径下是否有新文件产生
exclude array path匹配的文件中指定例外,如:path => “/var/log/“;exclude =>”*.gz”
id string 区分两个相同类型的插件,好比两个filter
ignore_older number 忽略历史修改,若是设置3600秒,logstash只会发现一小时内被修改过的文件,一小时以前修改的文件的变化不会被读取,若是再次修改该文件,全部的变化都会被读取,默认被禁用
tags array 能够在Event中增长标签,以便于在后续的处理流程中使用

jdbc输入插件

该插件可使用jdbc把关系型数据库的数据做为Logstash的数据源

  • 配置示例:
input {
  jdbc {
    jdbc_driver_library => "/opt/logstash/mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    jdbc_password => "123456"
    parameters => { "favorite_artist" => "Beethoven" }
    schedule => "* * * * *"
    statement => "SELECT * from songs where artist = :favorite_artist"
  }
}
output{
  stdout{}
}
  • 经常使用参数(空 = 同上)
参数名称 数据类型 默认值 描述
jdbc_driver_library string jdbc链接驱动的jar包位置
jdbc_driver_class string jdbc驱动类
jdbc_connection_string string 数据库的链接地址
jdbc_user string 数据库的用户名
jdbc_password string 数据库的密码
jdbc_paging_enabled boolean false 开启分页
jdbc_page_size number 100000 每页查询多少条数据
statement string 执行的SQL查询语句
parameters hash {} 设置SQL查询语句的参数
use_column_value boolean false 是否须要在程序中使用查询出来的列
tracking_column string 须要在程序中使用哪一个查询出来的列
clean_run boolean false 是否应该保留先前的运行状态
colums_charset hsah {} 特定列的字符编码。此选项将覆盖指定列的原来charset配置
schedule string 定时执行数据读取的任务,使用cornexpression表达式:分、时、天、月、年,所有为*默认含义为每分钟执行任务,该cornexpression表达式于Linux系统的crontab使用方式同样,若是不会使用cornexpression表达式,能够查看crontab相关文档。不设置的话,只执行一次数据的读取
lowercase_column_names boolean true 是否须要把查询出来的列名转换成小写,也就是说即便列名是驼峰的,也会转成所有都是小写的,默认为转小写,若是不须要,则设置为false
record_last_run boolean true 是否记录数据库中最后一条数据的位置
last_run_metadata_path string 记录数据库中最后一条数据的位置信息存放路径
add_field      
codec      
id      
tags      
type      

经常使用过滤插件

丰富的过滤器插件的是 logstash威力如此强大的重要因素,过滤器插件主要处理流经当前Logstash的事件信息,能够添加字段、移除字段、转换字段类型,经过正则表达式切分数据等,也能够根据条件判断来进行不一样的数据处理方式。咱们经常使用的过滤插件有:grok、date、geoip、mutate、json、Split、ruby,更多的过滤插件你们能够看Logstash官网,接下来咱们以grok、date和geoip这3个过滤插件做为例子,来学习过滤插件的使用,其余过滤插件使用起来大同小异,你们自行扩展。

grok正则插件

grok正则捕获是Logstash中将非结构化数据解析成结构化数据以便于查询的最好工具,很是适合解析system log,web log, database log等任意的 log文件。

  • 内置正则表达式调用
    grok提供100多个经常使用正则表达式可供使用,这100多个正则表达式定义在logstash/vendor/bundle/jruby/x.x/gems/logstash-patterns-core-xxx/patterns/grok-patterns文件中,想要灵活的匹配各类数据,那么必须查看该文件,大概了解grok提供了什么内置的正则表达式。调用它们的语法以下:%{SYNTAX:SEMANTIC}
    SYNTAX:表示内置的正则表达式的名称
    SEMANTIC:表示在Event中建立该字段名来存储匹配到的值
    例如:输入的数据内容为“[debug] 127.0.0.1 - test log content”,咱们想提取127.0.0.1这个IP地址,那么可使用如下语法来匹配:%{IP:client},将得到“client: 127.0.0.1”的结果,该结果将成为Event的一个新的字段和字段值;若是你在捕获数据时想进行数据类型转换可使用%{NUMBER:num:int}这种语法,默认状况下,全部的返回结果都是string类型,当前Logstash所支持的转换类型仅有“int”和“float”;

  • 自定义表达式调用
    与预约义表达式相同,你也能够将自定义的表达式配置到Logstash中,而后就能够像于定义的表达式同样使用;如下是操做步骤说明:
    一、在Logstash根目录下建立文件夹“patterns”,在“patterns”文件夹中建立文件“extra”(文件名称无所谓,可本身选择有意义的文件名称);
    二、在文件“extra”中添加表达式,格式:patternName regexp,名称与表达式之间用空格隔开便可,例如:POSTFIX_QUEUEID [0-9A-F]{10,11}
    三、使用自定义的表达式时须要在grok插件配置“patterns_dir”属性,属性值为extra文件所在的目录。

  • 配置示例
    日志文件http.log每行内容为:55.3.244.1 GET /index.html 15824 0.043 message-id:BEF25A72965
    grok表达式:表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
    配置文件内容:

input {
  file {
    path => "/var/log/http.log"
  }
}
filter {
  grok {
    patterns_dir => ["./patterns"]
    match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} message\-id:%{POSTFIX_QUEUEID:queueid}"}
  }
}
output{
    stdout{}
}

输出结果:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
queueid: BEF25A72965
  • 示例解析
    一、/var/log/http.log文件每一行的格式为55.3.244.1 GET /index.html 15824 0.043 message-id:BEF25A72965,到时候会把每一行数据封装成一个Event。
    二、使用grok过滤插件处理该文件,match为匹配Event中的message,message就是该文件的一行数据,好比55.3.244.1 GET /index.html 15824 0.043 message-id:BEF25A72965
    三、匹配message的内容使用%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration} message\-id:%{POSTFIX_QUEUEID:queueid}表达式,把一条message拆成client、method、request、bytes、duration、queueid这6个字段,并添加到Event中。
    四、其中%{POSTFIX_QUEUEID:queueid}为自定义表达式的调用,其余5个是grok内置的表达式调用,若是须要使用自定义表达式,则须要在grok插件配置patterns_dir属性,属性值数据类型为array,就是自定义表达式定义的文件所在的目录。
    五、在表达式中,特殊字符须要使用\来转义,好比-""[]这些特殊字符,因此咱们上面实例中的message-id数据在用表达式匹配的时候是使用了message\-id去匹配了。
  • 经常使用参数(空 = 同上)
参数名称 数据类型 默认值 描述
match array {} 设置pattern数组
patterns_dir array [] 指定自定义的pattern文件存放目录,Logstash在启动时会读取文件夹内全部文件,但前提是这些文件的内容是按照grok语法语法来定义的表达式变量
patterns_files_glob string "*" 用于匹配patterns_dir中的文件
add_field      
add_tag      
id      
break_on_match boolean true match字段存在多个pattern时,当第一个匹配成功后结束后面的匹配,若是想匹配全部的pattern,将此参数设置为false
keep_empty_captures boolean false 若是为true,捕获失败的字段奖设置为空值
clean_run boolean false 是否应该保留先前的运行状态
colums_charset hsah {} 特定列的字符编码。此选项将覆盖指定列的原来charset配置
overwrite array [] 覆盖字段内容: match=> { “message” => “%{SYSLOGBASE} %{DATA:message}” } overwrite=> [ “message” ]
periodic_flush boolean false 按期调用filter的flush方法
remove_field array [] 从Event中删除指定字段: remove_field=> [ “fieldname” ]
remove_tag array [] 删除“tags”中的值: remove_tag=> [ “tagname” ]
tag_on_failure array [“_grokparsefailure”] 当没有匹配成功时,将此array添加到“tags”字段内
tag_on_timeout string “_groktimeout” 当匹配超时时,将此字符串内容添加到“tags”字段内
timeout_millis number 30000 设置单个match到超时时间,单位:毫秒,若是设置为0,则不启用超时设置

date时间处理插件

在讲date插件的使用前,咱们须要先讲解一下Logstash的时间记录方式。在Logstash产生了一个Event对象的时候,会给该Event设置一个时间,字段为“@timestamp”,同时,咱们的日志内容通常也会有时间,可是这两个时间是不同的,由于日志内容的时间是该日志打印出来的时间,而“@timestamp”字段的时间是input插件接收到了一条数据并建立Event的时间,全部通常来讲的话“@timestamp”的时间要比日志内容的时间晚一点,由于Logstash监控数据变化,数据输入,建立Event致使的时间延迟。这两个时间均可以使用,具体要根据本身的需求来定。
可是无论“@timestamp”字段的时间仍是日志内容中的时间,其时间格式通常都不是咱们想要的,因此咱们就须要使用date插件,把时间格式转成咱们想要的格式,好比:好比将Apr 17 09:32:01(MMM dd HH:mm:ss)转换为04-17 09:32:01 (MM-dd HH:mm:ss)

  • 配置示例:
filter {
    date {
        match => ["timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
    } 
}
  • 示例解析
    timestamp是自定义的Event字段,用于存放经过grok解析到的日志内容中的时间,dd/MMM/YYYY:HH:mm:ss Z是日志内容中的时间格式,好比:16/Sep/2018:00:42:38 +0800,匹配到了以后,date插件会默认把该时间转成本地格式的时间,而且覆盖掉Event为咱们建立的@timestamp字段中的时间。
  • 经常使用参数(空 = 同上)
参数名称 数据类型 默认值 描述
add_field      
add_tag      
periodic_flush      
id      
remove_field      
remove_tag      
remove_tag      
tag_on_failure      
match array [] 时间字段匹配,可自定多种格式,直到匹配到或者匹配结束,格式:[ field,formats… ],如:match=> [ “logdate”, “MMM dd yyyy HH:mm:ss”, “MMM d yyyy HH:mm:ss”, “ISO8601” ]
target string “@timestamp” 指定match匹配而且转换为date类型的存储位置(字段),默认覆盖到“@timestamp”
timezone string 指定时间格式化的时区

geoip插件

geoip插件是用于根据IP地址来肯定该IP的归属地,默认的数据来源于Maxmind公司GeoLite2(https://dev.maxmind.com/geoip/geoip2/geolite2/)数据库,该数据库内嵌在geoip插件中,存储位置为:logstash/vendor/bundle/jruby/x.x/gems/logstash-filter-geoip-x.x-java/vendor目录中,数据库文件名分别为GeoLite2-City.mmdbGeoLite2-ASN.mmdb
从Maxmind的描述 ----“GeoLite2数据库是免费的IP地理位置数据库,可与MaxMind收费的GeoIP2数据库相媲美,但没有GeoIP2准确”。 有关更多详细信息,请参阅GeoIP Lite2许可证。
Maxmind的商业数据库GeoIP2(https://www.maxmind.com/en/geoip2-databases)也支持geoip插件。简单来讲就是两个数据库一个免费的一个商业的,免费的不如商业的地理位置精准。
若是您须要使用内嵌的GeoLite2之外的数据库,则能够直接从Maxmind网站下载数据库,并使用数据库选项指定其位置(下面会讲到若是使用数据选项)。

  • 配置示例:
filter {
     if [remote_ip] !~ "^127\.|^192\.168\.|^172\.1[6-9]\.|^172\.2[0-9]\.|^172\.3[01]\.|^10\." {
        geoip {
             source => "remote_ip"
             database => "/usr/share/GeoIP/GeoLite2-Country.mmdb"
        }
    }
}
  • 示例解析
    因为geoip使用的ip数据库不能匹配私网地址,因此在使用geoip插件前,先判断一下ip地址是否为私网ip,若是不是,这使用geoip插件。source为指定一个ip地址,为其查询归属地等信息,remote_ip为Event中存储ip地址的自定义字段,database为IP地址数据库所在的位置,不指定的话,使用geoip插件内置的GeoLite2-City默认数据库。
  • 经常使用参数(空 = 同上)
参数名称 数据类型 默认值 描述
add_field      
add_tag      
periodic_flush      
id      
remove_field      
remove_tag      
remove_tag      
source string 要经过geoip插件查询的IP地址所在的字段
tag_on_failure array [“_geoip_lookup_failure”] 若是ip地址查询不到地理位置信息,这在标签中添加该值
cache_size number 1000 因为geoip查询很耗时间,因此默认状况下,查询过一次的ip地址会缓存起来,目前geoip没有内存数据驱逐策略,设置的缓存用完了就不能在缓存新的数据了,因此缓存须要设置一个合理值,过小查询性能增长不明显,太大就很占用服务器的内存
target string “geoip” 把IP地址匹配到的信息放到该字段中
database string 使用的Maxmind数据库文件的路径。 若是不指定,则默认数据库是GeoLite2-City。 GeoLite2-City,GeoLite2-Country,GeoLite2-ASN是Maxmind支持的免费数据库。 GeoIP2-City,GeoIP2-ISP,GeoIP2-Country是Maxmind支持的商业数据库。
default_database_type string "City" 该字段的值只能接受"City"和"ASN",GeoLite2数据库下细分为GeoLite2-City,GeoLite2-ASN和GeoLite2-Country,geoip插件内嵌了GeoLite2-City和GeoLite2-ASN,默认使用GeoLite2-City,若是须要使用GeoLite2-ASN,则需设置该字段并设置为ASN
fields array 要包含在Event中的geoip字段的信息(每一个数据库的信息都不同)。 默认状况下,全部信息都会列出。对于内置的GeoLite2-City数据库,可使用如下信息:city_name, continent_code, country_code2, country_code3, country_name, dma_code, ip, latitude, longitude, postal_code, region_name 和 timezone

经常使用输出插件

通过以上的学习,咱们已经学习了Logstash三大主件中的其中两个了,分别是input和filter,那如今咱们就来学习最后一个组件:output。
每一个数据流通过input和filter后,最终要由output输出,以上的内容咱们都是使用最简单的标准输出:stdout,把数据输出到显示器,但实际上stdout只是Logstash的其中一个输出插件而已,它的经常使用输出插件有:Elasticsearch,Redis,File,TCP等等,更多的输出插件你们能够看Logstash官网,接下来咱们以Elasticsearch和Redis输出插件做为例子,来学习输出插件的使用,其余过滤插件使用起来大同小异,你们自行扩展。

elasticsearch输出插件

用于将Event信息写入到Elasticsearch中,官方推荐插件,ELK技术栈必备插件。

  • 配置示例
output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "logstash-%{type}-%{+YYYY.MM.dd}"
        document_type => "%{type}"
    }
}
  • 经常使用参数(空 = 同上)
参数名称 数据类型 默认值 描述
add_field      
add_tag      
periodic_flush      
flush_size      
id      
remove_field      
remove_tag      
codec      
enable_metric      
hosts array elasticsearch服务器集群所在的地址
index string [无] 文档索引库名称
document_type string [无] 文档类型,不指定的话,类型名为“doc”
document_id string [无] 文档id,不指定的话,由elasticsearch自动生成
user string [无] elasticsearch用户名
password string [无] elasticsearch集群访问密码
parent string “nil” 为文档子节点指定父节点的id
pool_max number 1000 elasticsearch最大链接数
pool_max_per_route number 100 每一个“endpoint”的最大链接数
proxy string 代理URL
template string 设置自定义的文档映射模版存放路径
template_name string 设置使用的默版名称
template_overwrite boolean false 是否始终覆盖现有模版
manage_template boolean true 是否启用elasticsearch模版,Logstash自带一个模版,可是只有名称匹配“logstash-*”的索引才会应用该默版
parameters hash 添加到elasticsearch URL后面的参数键值对
timeout number 60 网络超时时间

redis输出插件

用于将Event写入Redis中进行缓存,因为Redis数据库是先把数据存在内存的,因此效率会很是高,是一个经常使用的logstash输出插件

  • 配置示例
output {
    redis {
        host => ["127.0.0.1"]
        port => 6379
        data_type => "list"
        key => "logstash-list"
    }
}
  • 经常使用参数(空 = 同上)
参数名称 数据类型 默认值 描述
add_field      
add_tag      
periodic_flush      
flush_size      
id      
remove_field      
remove_tag      
codec      
enable_metric      
hosts array ["127.0.0.1"] redis服务列表,若是配置多个,将随机选择一个,若是当前的redis服务不可用,将选择下一个
port number 6379 文档索引库名称
db number 0 使用的redis数据库编号,默认使用0号数据库
password string redis的密码
data_type string 存储在redis中的数据类型,只能使用“list”和“channel”这两个值。若是使用“list”,将采用“RPUSH”操做,若是是“channel”,将采用“PUBLISH”操做
key string 给redis设置一个key,来存储收集到的数据库
batch boolean false 是否启用redis的batch模式,仅在data_type=”list”时有效
batch_events number 50 batch大小,batch达到此大小时执行“RPUSH”

后记

那到这里,咱们已经介绍了Logstash配置文件的语法和经常使用插件的配置方式了,这些经常使用的插件都是使用频率很是高的,全部咱们后面须要来作一个Logstash的实战案例,综合运用咱们这章所学的内容。

相关文章
相关标签/搜索