随着业务不断完善与发展,日志的重要性稳步上升。咱们须要从日志中排查错误,以及分析用户行为,为业务发展提供参考意见。所以,须要一套专门的日志系统帮助咱们收集、分析、处理日志。html
之前我曾经写过一个logstash的blog: http://my.oschina.net/abcfy2/blog/372138 ,版本比较低,可是logstash的配置没变过。此篇blog将比上述blog更详尽一些,扩展到产品环境搭建完整的日志系统,可是logstash自己的配置很少作介绍,由于旧Blog已经介绍的比较详细了。前端
本篇Blog主要介绍咱们目前使用的日志系统的整体架构和部分配置。Kibana的使用暂时不在本篇Blog的覆盖范围以内,之后也许会单独写一篇kibana的使用,读者也能够参考饶琛琳的《ELK stack权威指南》一书的相关章节。java
本篇Blog的内容也并不是本身独自完成,关于log4j 1.2部分的配置和使用是开发同事共同探究实现的。nginx
最后要说的一点是,日志系统的实现并不仅是运维的工做,开发也须要配合,规范日志格式,规范项目埋点,便于排查问题。最后归结与一点,要有执行力,要有人推进,不能随随便便的打日志,更不容许产品环境有乱七八糟的println
这种调试方式的日志输出。git
ELK Stack指代三个独立的组件: Elasticsearch,Logstash,Kibana。这三个独立的组件组合使用,能够造成一套完整的日志解决方案。目前这三个产品前后归于Elastic.co公司旗下,该公司围绕Elasticsearch这个核心产品逐步打造一整套生态环境,使得ELK Stack这套架构日益成熟,并且周边也逐步开始完善。github
其中,Logstash的做用是处理日志,将日志解析为JSON
格式进行传递。Elasticsearch的做用是数据库,将最终解析的结果存库,用于往后查询与分析使用。Kibana是Elasticsearch的dashboard,用于图形化展现elasticsearch数据库的查询结果。这三个组件搭配使用,将十分灵活,有如下几个优势(如下内容节选自饶琛琳的《ELK stack权威指南》一书,感谢做者的努力):web
Kibana的可视化效果:sql
其中,ELK的灵活性得益于Logstash的插件式设计,并且插件之间都是松耦合(经过JSON
事件交互,接口统一)。数据流在Logstash会通过三个阶段: Input -> Filter -> Output,并且Filter能够无限制串联,造成流式处理,甚至能够干脆没有。这三个阶段既能够在单节点上完成,也能够直接Output到其余节点上,分布处理与卸载压力。整个Logstash的基本流程图以下:mongodb
Logstash的数据处理过程描述以下:数据库
Filter
进行处理,如筛选,简单聚合(如Multiline插件将多行JAVA堆栈异常聚合为一个事件),编解码(如将unix时间戳转为时间字符串,将k1=v1,k2=v2这种kv格式解析为{k1:v1, k2:v2}这种JSON格式,正则解析文本日志等),执行Ruby代码等等,而且Filter能够无限制串联。此过程可直接跳过,即不对事件作任何处理。整个Logstash的ruby DSL配置语法看起来像这样:
input { 插件名1 { # 插件相关配置属性 } 插件名2 { # 插件相关配置属性 } ... SNIP ... 插件名3 { # 插件相关配置属性 } } filter { 插件名1 { # 插件相关配置属性 } 插件名2 { # 插件相关配置属性 } ... SNIP ... 插件名3 { # 插件相关配置属性 } } output { 插件名1 { # 插件相关配置属性 } 插件名2 { # 插件相关配置属性 } ... SNIP ... 插件名3 { # 插件相关配置属性 } }
举个例子,好比存储于日志文件中的某http access log日志:
55.3.244.1 GET /index.html 15824 0.043
通过了Logstash的inputs-file
插件,输入成为Logstash的一个事件,在Logstash会变成这样(以rubydebug
格式显示):
{ "message" => "55.3.244.1 GET /index.html 15824 0.043", "@version" => "1", "@timestamp" => "2016-03-01T03:37:33.081Z", "host" => "fengyu-Vostro-3900" }
日志自己内容会存放在message
这个field中,除此以外还会加上一些元数据,如host
,@timestamp
等。
加上filters-grok
这个Filter进行正则解析处理,解析message
这个field(详细配置参考filters-grok的文档),最终将该事件解析成以下的事件:
{ "message" => "55.3.244.1 GET /index.html 15824 0.043", "@version" => "1", "@timestamp" => "2016-03-01T03:51:03.914Z", "host" => "fengyu-Vostro-3900", "client" => "55.3.244.1", "method" => "GET", "request" => "/index.html", "bytes" => "15824", "duration" => "0.043" }
最后,经过outputs-elasticsearch
这个output插件,将解析过的日志推送至Elasticsearch数据库中存储。
经过elasticsearch中的各类查询方式,便可按照本身的需求展现这些数据了。
Logstash的这种设计,能够很容易进行线性扩展,好比不作filter处理,直接output到其余logstash实例的input端,将处理分散在不一样的节点上。最极端的状况,甚至能够扩展成这个架构,兼顾HA(High Availability)与HP(High Performance):
三个logstash实例互为冗余,将解析的结果推送至消息队列,由另外一个logstash实例将日志从消息队列取出,推送至elasticsearch集群中。
整个日志数据流的模型图:
每台服务器上部署有咱们本身开发的应用程序,以及这些应用程序的第三方依赖服务项(如数据库,web服务器等)。
所以日志源主要有两种: 本身开发的应用程序的日志,依赖的第三方软件的日志。
咱们本身开发的程序,直接将日志以JSON格式写入消息队列中。第三方服务大部分没法直接将日志写入消息队列中,而是输出为日志文件,这种日志源经过logstash的filters-grok插件,解析日志文件后推送至消息队列中。
须要收集的第三方依赖的日志,以及收集哪些日志,详见文档末的附录。
消息队列使用kafka
+ zookeeper
的方式实现。日志专用的消息队列部署在日志服务器中。
注: 若是日志量比较小的话,能够不必这么复杂,好比省略掉
kafka
这个消息队列,日志服务器也无需部署logstash,直接在应用服务器上用logstash将解析过的log推送至日志服务器上的es数据库中。
安全问题:
全部服务尽量只对内网ip暴露(经过防火墙实现),减小对外暴露的服务,而且以低权限帐户运行。跨节点的服务(如mongodb复制集,kafka+zookeeper,postgresql集群等)链接一概采用SSL双向认证的方案,提升安全性。
详细配置参考文档末附录的内容。
根据上述描述,咱们须要搭建一台日志服务器,安装ELK与日志专用的消息队列。
应用程序产生的日志直接推送至日志服务器的消息队列中,通过logstash的处理最终推送至elasticsearch中,在kibana上进行展现。
能够在logstash的Filter上定义报警规则,当日志有严重的错误时Output邮件报警。
服务器上应用程序列表如图所示:
多台产品服务器上,每台服务器分别部署有应用程序和logstash,其余第三方服务按照须要组成集群(如postgresql集群,mongodb复制集等)。日志服务器上部署完整的ELK Stack和Kafka+Zookeeper。
logs
这个TOPIC
中。此架构在扩展上将即为便利,共有三个可扩展的点:
这些扩展方案都可在无需原程序改动的条件下进行扩展。
推荐使用elastic.co的仓库(RHEL/CentOS和Ubuntu/Debian仓库为官方维护):
推荐使用清华大学镜像仓库,个人issue已经被tuna接受,国内安装速度会快许多(官方仓库在S3上,因此你懂的)。tuna镜像地址: http://mirrors.tuna.tsinghua.edu.cn/ELK/
按照官方文档的步骤,安装以后根据须要定制配置,启动服务,启用开机自启动便可。
包管理器安装的logstash,启动配置存放于/etc/logstash/conf.d/
,这个目录一开始是空的,本身将logstash的配置文件以.conf
结尾扔到这个目录后,便可使用service logstash start
启动服务,日志存放于/var/log/logstash/
。logstash的配置文件能够拆分红多个.conf
文件,以规范配置,好比input.conf
,filter.conf
,output.conf
。
特别注意: logstash能够在一个目录下存放多个
.conf
文件,logstash内部会将多个.conf
文件合并为一个大的配置文件,合并的顺序为文件名顺序。因此特别注意你的filter
配置,若是多个配置文件都有filter
配置,特别注意filter的加载次序!不然会搞乱你的配置。若是你的filter只针对某个应用的日志使用,那么推荐你使用if [type] == "appname" { filter配置 }
这种方式限制住你的filter的做用范围。
我用的是salt
,产品环境Ubuntu Server 14.04 LTS,固然你也可使用其余相似的工具,如puppet
,chef
,ansible
等等。
salt的logstash
这个state的目录结构以下:
$ tree /srv/salt/logstash/ /srv/salt/logstash/ ├── config │ ├── logagent │ │ ├── 00_log4j.conf │ │ ├── 01_vertx.conf │ │ ├── 02_mongod.conf │ │ ├── 03_postgresql.conf │ │ ├── 04_nginx.conf │ │ └── 99_output.conf │ └── logserver │ └── logserver.conf └── init.sls
$ cat /srv/salt/logstash/init.sls logstash_repo: pkgrepo.managed: - name: deb http://mirrors.tuna.tsinghua.edu.cn/ELK/apt/logstash/2.3/ stable main - file: /etc/apt/sources.list.d/logstash.list - key_url: https://packages.elastic.co/GPG-KEY-elasticsearch - clean_file: True logstash: pkg.latest: - require: - pkgrepo: logstash_repo logstash_grains: grains.list_present: - name: roles - value: logstash logstash-config: file.recurse: - name: /etc/logstash/conf.d {% if 'logserver' in grains.get('roles', '') %} - source: salt://logstash/config/logserver/ {% else %} - source: salt://logstash/config/logagent/ {% endif %} - clean: True - makedirs: True - template: jinja {% if 'postgresql' in grains.get('roles', '') %} logstash-user: group.present: - name: adm - addusers: - "logstash" {% endif %} logstash-service: service.running: - name: logstash - enable: True - watch: - pkg: logstash - file: logstash-config
最终推送到/etc/logstash/conf.d/
目录下的文件为00_log4j.conf,01_vertx.conf,02_mongod.conf,03_postgresql.conf,04_nginx.conf,99_output.conf
,这样命名是为了按照本身预期的文件顺序叠加input
,filter
,output
配置,而不会形成混乱。有关00_log4j.conf
的配置内容参考博客开头提供的旧的blog,这里基本没大改过。
为了演示这套架构的流程与效果,因此将这套架构最小化,将产品服务器的应用与日志服务器的应用所有部署在一个节点上测试。
日志文件数据源以Nginx
的access log为例,使用logstash将nginx access log中的内容推送至kafka队列中,另外一个logstash实例从kafka将nginx的log取出存入elasticsearch中。
本身开发的应用程序直接按照上述日志规范打印日志进入kafka,由logstash从kafka中取出应用程序的日志,推送至elasticsearch中。
修改Nginx的配置文件,使之打印出JSON
格式的access log,配置方法见附录内容。 access log内容以下:
{"@timestamp":"2016-03-03T13:11:03+08:00","host":"sinoiot-172-16-250-3","clientip":"172.16.1.34","size":191,"responsetime":0.000,"http_host":"172.16.250.3","url":"/mirror/","xff":"-","referer":"-","agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36","status":200} {"@timestamp":"2016-03-03T13:11:03+08:00","host":"sinoiot-172-16-250-3","clientip":"172.16.1.34","size":0,"responsetime":0.000,"http_host":"172.16.250.3","url":"/favicon.ico","xff":"-","referer":"http://172.16.250.3/mirror/","agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36","status":204} ...
模拟产品环境的Logstash的配置文件以下所示:
input { file { path => "/var/log/nginx/access.log" codec => json type => "nginx" tags => "access" } } output { # stdout这个output插件仅做为调试阶段使用,用于将处理过的结果打印在终端 # 真实产品环境不须要这个output stdout { codec => "rubydebug" } kafka { topic_id => "logs" bootstrap_servers => "172.16.250.10:9092" # 真实产品环境须要修改对应的kafka集群列表 } }
启动logstash,将会看到终端上显示解析过的事件:
{ "@timestamp" => "2016-03-03T05:11:03.000Z", "host" => "sinoiot-172-16-250-3", "clientip" => "172.16.1.34", "size" => 191, "responsetime" => 0.0, "http_host" => "172.16.250.3", "url" => "/mirror/", "xff" => "-", "referer" => "-", "agent" => "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36", "status" => 200, "@version" => "1", "path" => "/var/log/nginx/access.log", "type" => "nginx", "tags" => [ [0] "access" ] } { "@timestamp" => "2016-03-03T05:11:03.000Z", "host" => "sinoiot-172-16-250-3", "clientip" => "172.16.1.34", "size" => 0, "responsetime" => 0.0, "http_host" => "172.16.250.3", "url" => "/favicon.ico", "xff" => "-", "referer" => "http://172.16.250.3/mirror/", "agent" => "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36", "status" => 204, "@version" => "1", "path" => "/var/log/nginx/access.log", "type" => "nginx", "tags" => [ [0] "access" ] }
从kafka队列中的logs
这个topic获取日志信息,将看到下列内容:
$ bin/kafka-console-consumer.sh --zookeeper 172.16.250.10:2181 --topic logs --from-beginning {"@timestamp":"2016-03-03T05:12:21.000Z","host":"sinoiot-172-16-250-3","clientip":"218.75.124.3","size":162,"responsetime":0.000,"http_host":"218.75.124.3","url":"/mirror/packages.elastic.co/elasticsearch/2.x/debian/dists/stable/main/i18n/Translation-en","xff":"-","referer":"-","agent":"Debian APT-HTTP/1.3 (1.0.1ubuntu2)","status":404,"@version":"1","path":"/var/log/nginx/access.log","type":"nginx","tags":["access"]} {"@timestamp":"2016-03-03T05:12:21.000Z","host":"sinoiot-172-16-250-3","clientip":"218.75.124.3","size":162,"responsetime":0.000,"http_host":"218.75.124.3","url":"/mirror/packages.elastic.co/kibana/4.4/debian/dists/stable/main/i18n/Translation-en_US","xff":"-","referer":"-","agent":"Debian APT-HTTP/1.3 (1.0.1ubuntu2)","status":404,"@version":"1","path":"/var/log/nginx/access.log","type":"nginx","tags":["access"]}
证实logstash已经将解析过的事件推送至kafka队列中。
因为消息队列中存储的日志都是解析过的,因此日志服务器上的配置就简单多了,只须要经过logstash将kafka中的日志推送至elasticsearch存储便可。
日志服务器的logstash配置就简单的多(真实产品环境下须要配置email filter插件,用于邮件报警)。
模拟日志服务器的logstash配置:
input { kafka { topic_id => "logs" zk_connect => "172.16.250.10:2181" # 真实产品环境替换为对应的zookeeper集群列表 } } output { elasticsearch { codec => json } # 产品环境调试完毕,不须要stdout这个output plugin stdout { codec => "rubydebug" } # 产品环境须要邮件报警的话,加入email output # if 报警条件 { # email { # # email output插件的配置 # } #} }
最后,在kibana中将看到以下的效果:
本身开发的应用程序直接按照JSON
格式推送至Kafka消息队列中,所以不须要经过logstash output kafka这种方式。log4j 1.2版本须要手工格式化成JSON,log4j 2.x版本提供了JSON appender,不过目前来看log4j 1.x版本依旧占据主流。输出到kafka的配置参考附录。
因为推送的topic_id
是同样的,所以日志服务器中的logstash配置也无需修改。
从kafka队列中取出log,看看格式:
$ bin/kafka-console-consumer.sh --zookeeper 172.16.250.10:2181 --topic logs --from-beginning {"@timestamp":"2016-03-03T17:03:32.772+08:00","host":"172.16.1.4","type":"rtds","loglevel":"INFO","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"a":1,"b":2}} {"@timestamp":"2016-03-03T17:03:32.773+08:00","host":"172.16.1.4","type":"rtds","loglevel":"DEBUG","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"c":1,"d":2}} {"@timestamp":"2016-03-03T17:03:32.813+08:00","host":"172.16.1.4","type":"rtds","loglevel":"ERROR","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"errormsg":" java.math.BigDecimal.divide(Unknown Source)\n org.codehaus.groovy.runtime.typehandling.BigDecimalMath.divideImpl(BigDecimalMath.java:68)\n org.codehaus.groovy.runtime.typehandling.IntegerMath.divideImpl(IntegerMath.java:49)\n org.codehaus.groovy.runtime.dgmimpl.NumberNumberDiv$NumberNumber.invoke(NumberNumberDiv.java:323)\n org.codehaus.groovy.runtime.callsite.PojoMetaMethodSite.call(PojoMetaMethodSite.java:56)\n org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:125)\n hawkeyes.rtds.MainVerticle.test(MainVerticle.groovy:69)\n hawkeyes.rtds.MainVerticle.deployInStandaloneMode(MainVerticle.groovy:63)\n sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)\n sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)\n java.lang.reflect.Method.invoke(Unknown Source)\n org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:93)\n groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:325)\n groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1210)\n groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1077)\n groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1019)\n groovy.lang.Closure.call(Closure.java:426)\n groovy.lang.Closure.call(Closure.java:420)\n java_util_concurrent_Callable$call.call(Unknown Source)\n org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:48)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:113)\n org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:117)\n hawkeyes.rtds.MainVerticle.start(MainVerticle.groovy:30)\n io.vertx.lang.groovy.GroovyVerticle.start(GroovyVerticle.groovy:64)\n io.vertx.lang.groovy.GroovyVerticle$1.start(GroovyVerticle.groovy:93)\n io.vertx.core.impl.DeploymentManager.lambda$doDeploy$159(DeploymentManager.java:429)\n io.vertx.core.impl.ContextImpl.lambda$wrapTask$16(ContextImpl.java:335)\n io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)\n io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)\n io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)\n java.lang.Thread.run(Unknown Source)\n"}} {"@timestamp":"2016-03-03T17:03:32.814+08:00","host":"172.16.1.4","type":"rtds","loglevel":"INFO","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"a":1,"b":2}} {"@timestamp":"2016-03-03T17:03:32.814+08:00","host":"172.16.1.4","type":"rtds","loglevel":"DEBUG","classname":"hawkeyes.rtds.MainVerticle","logdetail":{"c":1,"d":2}}
最终在kibana中展现的效果如图:
ELK这套架构的设计因为其外部组件的松耦合性,几乎能够知足各类规模日志收集,组合消息队列,更是带来了弹性伸缩的可能性。
这套架构的引入,将对从此日志的收集管理提供便利,经过日志提供的数据,也便于业务跟踪。并且此架构在将来也容易扩展。
此架构涉及到的组件也相对较多,须要有必定的维护量。数据分析时不但须要有规范化的数据结构,也须要熟悉elasticsearch的聚合表达式,须要一些专业知识与学习成本。
修改log4j.properties
文件,配置kafka appender
便可将log内容输入到kafka消息队列中。
log4j.logger.hawkeyes.rtds=INFO, Kafka log4j.appender.Kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender log4j.appender.Kafka.layout=org.apache.log4j.EnhancedPatternLayout log4j.appender.Kafka.layout.ConversionPattern=%m log4j.appender.Kafka.brokerList=127.0.0.1:9092 log4j.appender.Kafka.topic=logs log4j.appender.Kafka.requiredNumAcks=1
LOG4J主要由三大组件组成:
按照原来配置log4j.rootLogger=DEBUG, Kafka
这使程序中全部日志都会向Kafka中写入。但KafkaLog4jAppender在初始化时,自己会打印log,它在获取logger对象时又会继续建立KafkaLog4jAppender,新的KafkaLog4jAppender又会打log, 这就成了死循环,所以定义了一个输出范围log4j.category.hawkeyes.rtds=INFO, Kafka
,全部hawkeyes.rtds包下的类才会向kafka消息队列中输出,这不会影响KafkaLog4jAppender中log输出。
为知足不重启程序就能修改日志级别的需求,可使用log4j的动态改变log输出级别的功能。
改变Logger中level属性便可。
参考代码:
def rtdsLogger = Logger.getLogger("hawkeyes.rtds") rtdsLogger.setLevel(Level.toLevel("info"))
而后将这种方法进行封装,对外提供一个能够操做的api便可(如REST api)。
编辑/etc/nginx/nginx.conf
配置文件,加入如下内容:
log_format json '{"@timestamp":"$time_iso8601",' '"host":"$hostname",' '"clientip":"$remote_addr",' '"size":$body_bytes_sent,' '"responsetime":$request_time,' '"http_host":"$host",' '"url":"$uri",' '"xff":"$http_x_forwarded_for",' '"referer":"$http_referer",' '"agent":"$http_user_agent",' '"status":$status}'; access_log /var/log/nginx/access.log json;
删掉原来默认的配置行access_log /var/log/nginx/access.log
。重启nginx,以后nginx的access log文件/var/log/nginx/access.log
将以json_lines
的格式打印日志。
以上配置参考了饶琛琳的《ELK stack权威指南》的相关章节
Zookeeper集群配置范例:
须要改动的文件有两个。在zookeeper的配置目录中
myid
: 这个文件的内容修改成一个正整数,要求每一个节点的数值不一样zoo.cfg
: 修改server.${id}=${ip}:2888:3888。这个id和myid
中的数字一一对应,后面的ip是节点的ip(注意不要使用环回ip,必须是能被其余节点访问到的ip,也能够是域名)。参考范例:server.1=172.16.250.10:2888:3888 server.2=172.16.250.13:2888:3888 server.3=172.16.250.14:2888:3888
Zookeeper启用SSL双向认证: //TODO
Kafka集群配置范例:
修改config
目录下的主配置文件server.properties
。关键的几个配置参数以下:
broker.id=1 advertised.host.name=172.16.250.10 zookeeper.connect=172.16.250.10:2181,172.16.250.13:2181,172.16.250.14:2181
broker.id
: 同zookeeper
集群配置,每一个节点的id均为不重复的正整数。advertised.host.name
: 同zookeeper
的集群配置,设置为能被其余节点访问到的ip或域名(该选项默认为系统主机名,不用hosts或dns基本没法被其余节点访问到)。zookeeper.connect
: 为zookeeper集群列表,格式为ip:port
。多个节点使用,
分割。Kafka启用SSL双向认证: //TODO
产品服务器的logstash将日志从文件取出,格式化后推送至日志服务器的Kafka中:
input { file { path => "/path/to/log/file" # 日志文件路径 type => "app" # 应用名,如nginx,postgresql等 ... SNIP ... # 这里根据不一样的文件格式可能须要作不一样处理 } } filter { # filter这里主要是grok正则,nginx配置JSON日志格式后不须要grok解析 grok { ... SNIP ... } } output { kafka { topic_id => "logs" bootstrap_servers => "kafka" # 真实产品环境须要修改对应的kafka集群列表 ... SINP ... } }
日志服务器logstash从kafka消息队列中取出对应的日志消息,推送至elasticsearch存储。 日志报警规则在日志服务器指定,便于修改报警规则。
input { kafka { zk_connect => "zookeeper_cluster:2181" topic_id => "logs" ... SNIP ... } } filter { # 这里详细指定邮件报警规则 if "email_alert" in [tags] { email { ... SNIP ... } } } output { elasticsearch { ... SNIP ... } }