rpm --import http://packages.elasticsearch.org/GPG-KEY-elasticsearch cat > /etc/yum.repos.d/logstash.repo <<EOF [logstash-5.0] name=logstash repository for 5.0.x packages baseurl=http://packages.elasticsearch.org/logstash/5.0/centos gpgcheck=1 gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch enabled=1 EOF yum clean all yum install logstash
# bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'
logstash用不一样的线程来实现对输入信息处理以后显示成咱们但愿的样子,logstash给每一个线程都取了名字,输入的叫xx,过滤的叫\xxnode
数据在线程之间以事件的形式流传,不要叫行,由于logstash能够处理多行事件。python
logstash会给事件添加一些额外的信息,最重要的就是@timestamp,用来标记事件的发生时间。由于这个字段涉及到logstash的内部流转,因此必须是一个joda对象,若是尝试本身给一个字符串字段重命名为@timestamp的话,logstash会直接报错。因此请使用filters/data插件来管理这个特殊字段。linux
另外的几个字段:ios
你能够随意给事件添加字段或者从事件里删除字段。事实上事件就是一个Ruby对象,或者更简单的理解为就是一个哈希。git
注:每一个logstash过滤插件都有四个方法:github
add_tag、remove_tag、add_field、remove_fieldweb
logstash习惯用shipper、broker、indexer来描述数据流中不一样进程的角色。正则表达式
在不少的运维场景中并无用logstash做为shipper,或者说 没有用elasticsearch做为是数据存储,也就是没有indexer。redis
logstash设计了本身的DSL--有点像Puppet的DSL,二者均是经过Ruby语言写的--包括区域、注释、数据类型(布尔值、字符串、数值、数组、哈希)。条件判断,字段引用等。shell
logstash用{}来定义区域。区域内能够包括插件区域定义,能够在一个区域中定义多个插件。插件区域内则能够定义键值对设置。示例:
input{
stdin{}
syslog{}
}
logstash支持少许的数据值类型
debug => true
host => "hostname"
port => 514
match => ["datetime","UNIX","ISO8601"]
option => {
key1 => "value1",
key2 => "value2"
}
注:若是版本低于1.2.0,哈希的语法跟数组是同样的,示例:
match => ["field1","pattern1","field2","pattern2"]
字段是logstash::Event对象的属性。能够想象字段就像一个键值对。若是你想在Logstash中使用字段的值,只须要把字段的名字写在中括号中就好了,这就叫字段引用。
对于嵌套字段(也就是多维哈希表,或者叫哈希的哈希),每层字段名都写在[]中。好比你能够从geoip获取到longitude值(这是个笨办法,实际上有单独的字段专门存这个数据的):
[geoip][location][0]
注:logstash的数组也支持倒序下标,及[geoip][location][-1]能够获取数组最后一个元素的值。
Logstash还支持变量内插,在字符串里使用字段引用的方法是这样:
“the longitude is %{[geoip][location][0]}”
Logstash从1.3.0版本开始支持条件判断和表达式。
表达式支持下面这些操做符:
示例:
if "_grokparsefailure" not in [tags] { } else if [status] !~ /^2\d\d/ or ( [url] == "/noc.gif" nand [geoip][city] != "beijing" ) { } else { }
logstash提供了一个shell脚本叫logstash,方便快速运行。
即执行。事实上你能够不写任何具体配置,直接运行bin/logstash -e ‘ ’达到相同的效果,这个参数的默认值是下面这样的:
input{
stdin{ }
}
output{
stdout{ }
}
即文件。bin/logstash -f agent.conf,或者bin/logstash -f /etc/logstash.d/
注:logstash列出目录下全部文件时是字母排序的。而logstash配置段的filter和output都是顺序执行的,因此顺序很是重要。采用多文件管理的用户,推荐采用数字编号方式命名配置文件,同时在配置中,严谨采用if判断限定不一样日志的动做。
即测试。用来测试logstash读取的配置文件是否存在语法错误。logstash配置语法是用grammar.treetop定义的,尤为是使用了上一条提到的读取目录方式的读者,尤为要提早测试。
即日志。logstash默认输出日志到标准错误。生产环境下你能够经过bin/logstash -l log/logstash.log命令来统一存储日志。
运行filter和output的pipeline线程的数量。默认是CPU核数。
每一个logstash pipeline线程,在打包批量日志的时候,最多等待毫秒数。默认是5ms。
能够写本身的插件,而后用bin/logstash --pluginpath /path/to/own/plugins进行加载。
输出必定的调试日志。
从logstash 5.0开始,新增了$LS_HOME/config/logstash.yml文件,能够将全部的命令行参数都经过YAML文件方式设置。同时为了反映命令行配置参数的层级关系,参数也都改为用.而不是-了。
pipeline:
workers:24
bath:
size:125
delay:5
从logstash1.5.0版本开始,logstash将全部插件都独立拆分红gem包。这样每一个插件均可以独立更新,不用等待logstash自身作总体更新的时候才能使用。
为了达到这个目标,logstash配置了专门的plugin管理命令
Usage: bin/logstash-plugin [OPTIONS] SUBCOMMAND [ARG] ... Parameters: SUBCOMMAND subcommand [ARG] ... subcommand arguments Subcommands: install Install a plugin uninstall Uninstall a plugin update Install a plugin list List all installed plugins Options: -h, --help print help
bin/logstash-plugin list查看本机如今有多少插件可用(起始就是在vender/bundle/jruby/1.9/gems的目录下)
若发布新模块logstash-output-webhdfs。只须要运行:bin/logstash-plugin install logstash-output-webhdfs
假如只是升级的话:bin/logstash-plugin update logstash-input-tcp
bin/logstash-plugin install /path/to/logstash-filter-crash.gem
执行成功后在logstash-5.0.0目录下的Gemfile文件最后一行多出一段内容:
gem "logstash-filter-crash", "1.1.0", :path => "vendor/local_gems/d354312c/logstash-filter-mweibocrash-1.1.0"
同时Gemfile.jruby-1.9.lock文件开头也会多出一段内容:
PATH remote: vendor/local_gems/d354312c/logstash-filter-crash-1.1.0 specs: logstash-filter-crash (1.1.0) logstash-core (>= 1.4.0, < 2.0.0)
rpm发行包安装的用户:/etc/init.d/logstash脚本中,会加载/etc/init.d/functions库文件,利用其中的daemon函数,将logstash进程做为后台程序运行。
因此只要确保配置文件存在于/etc/logstash/conf.d/目录下全部的conf结尾的文件且无其余文件,便可经过service logstash start启动
command command > /dev/null command > /dev/null 2>&1 command & command > /dev/null & command > /dev/null 2>&1 & command &> /dev/null nohup command &> /dev/null
想要维持一个长期后台运行的logstash,你须要同时在命令前面加nohup,后面加&。
screen算是linux运维一个中高级技巧。经过screen命令建立的环境下运行的终端命令,其父进程不是sshd的登陆会话,而是screen。这样就能够避免用户退出进程消失的问题,又随时能从新接管回终端继续操做。
建立独立的screen命令:
screen -dmS elkscreen_1
接管连入建立的elkscreen_1
screen -r elkscreen_1
运行logstash以后,不要按Ctrl+C,而是按Ctrl+A+D,断开环境。想要从新接管,依然screen -r elkscreen_1便可。
若建立了多个screen,可经过:
screen -list
daemontools工具:daemontools、python实现的supervisord、perl实现的ubic、ruby实现的god等。
以supervisord为例,由于出现比较早,可经过epel仓库直接安装:
yum -y install supervisord --enablerepo=epel
在/etc/supervisord.conf配置文件中添加内容,定义你要启动的程序:
[program:elkpro_1] environment=LS_HEAP_SIZE=5000m directory=/opt/logstash command=/opt/logstash/bin/logstash -f /etc/logstash/pro1.conf -w 10 -l /var/log/logstash/pro1.log [program:elkpro_2] environment=LS_HEAP_SIZE=5000m directory=/opt/logstash command=/opt/logstash/bin/logstash -f /etc/logstash/pro2.conf -w 10 -l /var/log/logstash/pro2.log
而后service supervisord start 便可。
logstash会以supervisord子进程的身份运行,你还可使用supervisordctl命令单独控制一系列logstash子进程中某一个进程的启停操做:
supervisorctl stop elkpro_2
logstash使用一个名叫FileWatch的Ruby Gem库来监听文件变化。这个库支持glob展开文件路径,并且会记录一个叫.sincedb的数据库文件来跟踪被监听的日志文件的当前读取位置。因此,不要担忧logstash会漏过你的数据。
sincedb文件中记录了每一个被监听文件的inide、major number、minor number和pos。
inode:在其余博文中已经阐述;
major device number:能够看作是设备驱动程序,被同一设备驱动程序管理的设备有相同的major device number。这个数字实际是Kernel中device driver table的索引。这个表保存着不一样的设备驱动程序;
minor number:表明被访问的具体设备,也就是说Kernel根据major device number找到的设备驱动程序,而后再从minor device number得到设备位置属性。
pos:包含发生错误的位置的绝对文件位置;
input { file { path => ["/var/log/*.log", "/var/log/message"] type => "system" start_position => "beginning" } }
有一些比较有用的配置项,能够用来指定File Watch库的行为:
logstash每隔多久去检查一次被监听的path下是否有新文件,默认15s。
不想被监听的文件能够排除出去,这里跟path同样支持glob展开。
glob:所谓的 glob 模式是指 shell 所使用的简化了的正则表达式。星号(*)匹配零个或多个任意字符;[abc]匹配任何一个列在方括号中的字符(这个例子要么匹配一个 a,要么匹配一个 b,要么匹配一个 c);问号(?)只匹配一个任意字符;若是在方括号中使用短划线分隔两个字符,表示全部在这两个字符范围内的均可以匹配(好比 [0-9] 表示匹配全部 0 到 9 的数字)。
一个已经监听中的文件,若是超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3600s。
在每次检查文件列表的时候,若是一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86400s。
若是你不想用默认的$HOME/.sincedb(windows平台上在C:\windows\System32\config\systemprofile\.sincedb),能够经过这个配置定义sincedb文件到其余位置。
logstash每隔多久写一次sincedb文件,默认是15s。
logstash每隔多久检查一次被监听文件状态(是否有更新,默认是1s)
logstash从什么位置开始读取文件数据,默认是结束位置,也就是说logstash进程会以相似tail -f的形式运行。若是你是要导入原有数据,把这个设定成“beginning”,logstash进程就从头开始读取,相似less +F的形式运行。
一、一般你要导入原有数据进Elasticsearch的话,你还须要filter/date插件来修改默认的@timestamp字段值。
二、FileWatch只支持绝对路径,并且会不自动递归目录。因此有须要的话,请用数组方式都写明具体哪些文件。
三、LogStash::Input::File只是在进程运行的注册阶段初始化一个FileWatch对象,因此它不能支持相似fluentd那样的path=> “/path/to/%{+yyyy/mm/dd/hh}.log”写法。达到相同目的,你只能写成path=>"/path/to/*/*/*/*.log"。FileWatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归所有子目录。
四、在单个input/file中监听的文件数量太多的话,每次启动扫描构建监听队列会消耗较多的时间。给使用者的感受好像读取不到同样,这是正常现象。
五、start_position仅在该文件从未被监听过的时候起做用,若是sincedb文件中已经有这个文件的inode记录了,那么logstash依然会从记录的pos开始读取数据。
六、由于windows平台上没有inode概念,logstash某些版本在windows平台上监听文件不是很靠谱,windows平台上,推荐考虑使用nxlog做为收集端。
input { stdin { add_field => {"key" => "value"} codec => "plain" tags => ["add"] type => "std" } }
用上面的新stdin设置从新运行一次最开始的hello world示例。建议你们把整段配置都写入一个文本文件,而后运行命令:bin/logstash -f stdin.conf。输入“hello world”并回车,你会在终端看到以下的输出:
{
"message" => "hello world", "@version" => "1", "@timestamp" => "2014-08-08T06:48:47.789Z", "type" => "std", "tags" => [ [0] "add" ], "key" => "value", "host" => "raochenlindeMacBook-Air.local" }
type和tags是logstash事件中两个特殊的字段。一般说咱们会在输入区段中经过type来标记事件类型----咱们确定是提早能知道这个事件属于什么类型的。而tags这是在数据处理过程当中,由具体的插件来添加或者删除的。
最多见的用法像下面这样:
input { stdin { type => "web" } } filter { if [type] == "web" { grok { match => ["message", %{COMBINEDAPACHELOG}] } } } output { if "_grokparsefailure" in [tags] { nagios_nsca { nagios_status => "1" } } else { elasticsearch { } } }
syslog多是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤为是网络设备,好比思科----syslog几乎是惟一可行的办法。
这里只讲如何把logstash配置成一个syslog服务器来接收数据。有关rsyslog的用法,稍后的类型项目中,会有更详细的介绍。
input { syslog { port => "514" } }
做为简单的测试,先暂停本机的syslog或rsyslog进程,而后启动logstash进程(这样就不会有端口冲突的问题)。如今,本机的syslog就会默认发送到logstash里了。咱们能够用自带的logger命令行工具发送一条“hello world”信息到syslog里,即logstash里面。看到logstash输出像下面这样。
{
"message" => "Hello World", "@version" => "1", "@timestamp" => "2014-08-08T09:01:15.911Z", "host" => "127.0.0.1", "priority" => 31, "timestamp" => "Aug 8 17:01:15", "logsource" => "raochenlindeMacBook-Air.local", "program" => "com.apple.metadata.mdflagwriter", "pid" => "381", "severity" => 7, "facility" => 3, "facility_label" => "system", "severity_label" => "Debug" }
Logstash是用UDPSocket,TCPServer和Logstash::Filter::Grok来实现Logstash::Inputs::Syslog的。因此你起始能够直接用logstash配置实现同样的效果:
input { tcp { port => "8514" } } filter { grok { match => ["message", "%{SYSLOGLINE}" ] } syslog_pri { } }
建议在使用Logstash::Inputs::Syslog的时候走TCP协议来传输数据;
由于具体实现中,UDP监听器只用了一个线程,而TCP监听器会在接收每一个链接的时候都启动新的线程来处理后续步骤。
若是你已经使用UDP监听收集日志,用下行命令检查你的UDP接收队列的大小:
netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}' Recv-Q 228096
228096是UDP接收队列的默认最大大小,这时候linux内核开始丢弃数据包了。
强 烈建议使用Logstash::Input::Tcp和Logstash::filters::Grok配合实现一样的syslog功能。
虽然Logstash::Input::Syslog在使用TCPServer的时候能够采用多线程处理的接收,可是在同一个客户端数据处理中,其grok和date是一直在该线程中完成的,这会致使整体上的处理性能几何级的降低,通过测试,TCPServer每秒能够接收50000条数据,而在同一线程中启用grok后每秒只能处理5000条,再加上date只能到500条。
才将这两步拆分到filters阶段后,logstash支持对该阶段插件单独设置多线程运行,大大提升了整体处理性能。在相同环境下,logstash -f tcp.conf -w 20的测试中,整体处理性能能够达到30000条数据。
注:测试采用 logstash 做者提供的 yes "<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000
命令。出处见:https://github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile
若是实在没办法切换到TCP协议,你能够本身写程序或者使用其余基于异步IO框架(好比libev)的项目。下面是一个简单的异步IO实现UDP监听数据输入Eleasticsearch的示例:
https://gist.github.com/chenryn/7c922ac424324ee0d695
将来你可能会用到redis服务器或者其余消息队列系统做为logstash broker的角色。不过logstash其实也有本身的TCP/UDP插件,在临时任务的时候,也算能用,尤为是测试环境。
小贴士:
虽然Logstash::Input::TCP用Ruby的Socket和Openssl库实现了高级的SSL功能,但Logstash自己只能在SizedQueue中缓存20个事件。这就是建议在生产环境中换用其余消息队列的缘由。
input { tcp { port => 8888 mode => "server" ssl_enable => false } }
目前看来,Logstash::Input::TCP最多见的用法就是配合nc命令导入旧数据。在启动logstash进程后,在另外一个终端运行以下的命令便可导入数据:
# nc 127.0.0.1 8888 < olddata
这种作法比用Logstash::Input::File好,由于当nc命令结束,咱们就知道数据导入完毕了。而用input/file方式,logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。
未完待续……
贴士