在EFK基础架构中,咱们须要在客户端部署Filebeat,经过Filebeat将日志收集并传到LogStash中。在LogStash中对日志进行解析后再将日志传输到ElasticSearch中,最后经过Kibana查看日志。html
上文EFK实战一 - 基础环境搭建已经搭建好了EFK的基础环境,本文咱们经过真实案例打通三者之间的数据传输以及解决EFK在使用过程当中的一些常见问题。java
首先看一下实际的业务日志linux
2020-01-09 10:03:26,719 INFO ========GetCostCenter Start===============
2020-01-09 10:03:44,267 WARN 成本中心编码少于10位!{"deptId":"D000004345","companyCode":"01"}
2020-01-09 10:22:37,193 ERROR java.lang.IllegalStateException: SessionImpl[abcpI7fK-WYnW4nzXrv7w,]: can't call getAttribute() when session is no longer valid.
at com.caucho.server.session.SessionImpl.getAttribute(SessionImpl.java:283)
at weaver.filter.PFixFilter.doFilter(PFixFilter.java:73)
at com.caucho.server.dispatch.FilterFilterChain.doFilter(FilterFilterChain.java:87)
at weaver.filter.MonitorXFixIPFilter.doFilter(MonitorXFixIPFilter.java:30)
at weaver.filter.MonitorForbiddenUrlFilter.doFilter(MonitorForbiddenUrlFilter.java:133)复制代码
日志组成格式为:时间 日志级别 日志详情主要任务就是将这段日志正常写入EFK中。正则表达式
tar -zxvf filebeat-7.5.1-linux-x86_64.tar.gz
filebeat.inputs:
- type: log
enabled: true
paths:
- /app/weaver/Resin/log/xxx.log复制代码
此段配置日志输入,指定日志存储路径docker
output.logstash:
# The Logstash hosts
hosts: ["172.31.0.207:5044"]复制代码
此段配置日志输出,指定Logstash存储路径服务器
./filebeat -e -c filebeat.yml
nohup ./filebeat -e -c filebeat.yml &
命令启动便可 logstash的配置主要分为三段input
,filter
,output
。input
用于指定输入,主要是开放端口给Filebeat用于接收日志filter
用于指定过滤,对日志内容进行解析过滤。output
用于指定输出,直接配置ES的地址便可
微信
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["http://172.31.0.127:9200"]
index => "myindex-%{+YYYY.MM.dd}"
user => "elastic"
password => "xxxxxx"
}
}复制代码
咱们配置好logstash后经过命令重启logstashdocker-compose -f elk.yml restart logstash
session
通过上述两步配置后应用程序往日志文件写入日志,filebeat会将日志写入logstash。在kibana查看写入的日志结果以下:架构
日志显示有2个问题:app
filebeat.inputs
区域添加以下几行配置 # 以日期做为前缀
multiline.pattern: ^\d{4}-\d{1,2}-\d{1,2}
# 开启多行合并
multiline.negate: true
# 合并到上一行以后
multiline.match: after复制代码
filter {
grok{
match => {
"message" => "(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}),\d{3} %{LOGLEVEL:loglevel} (?<des>.*)"
}
}
}复制代码
这里主要是使用grok语法对日志进行解析,经过正则表达式对日志进行过滤。你们能够经过kibana里的grok调试工具进行调试
配置完成后咱们从新打开kibana Discover界面查看日志,符合预期,完美!
这个主要缘由仍是客户端日志文件格式有问题,你们能够经过file xxx.log
查看日志文件的编码格式,若是是ISO8859的编码基本都会乱码,咱们能够在filebeat配置文件中经过encoding指定日志编码进行传输。
filebeat.inputs:
- type: log
enabled: true
paths:
- /app/weaver/Resin/log/xxx.log
encoding: GB2312复制代码
如上所示,打开kibana Discover面板时出现此异常,你们只要删除ES中的.kibana_1
索引而后从新访问Kibana便可。
咱们在终端查看日志某关键字时通常会查上下文信息便于排查问题,如常常用到的指令cat xxx.log | grep -C50 keyword
,那么在Kibana中如何实现这功能呢。
在Kibana中搜索关键字,而后找到具体日志记录,点击左边向下箭头,而后再点击“查看周围文档”便可实现。
咱们日志平台可能须要对接多个业务系统,须要根据业务系统创建不一样的索引。
- type: log
......
fields:
logType: oabusiness复制代码
input {
beats {
port => 5044
}
}
filter {
if [fields][logType] == "oabusiness" {
grok{
match => {
"message" => "(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}),\d{3} %{LOGLEVEL:loglevel} (?<des>.*)"
}
}
}
}
output {
elasticsearch {
hosts => ["http://172.31.0.207:9200"]
index => "%{[fields][logType]}-%{+YYYY.MM.dd}"
user => "elastic"
password => "elastic"
}
}复制代码
好了,各位朋友们,本期的内容到此就所有结束啦,能看到这里的同窗都是优秀的同窗,下一个升职加薪的就是你了!若是以为这篇文章对你有所帮助的话请扫描下面二维码加个关注。"转发" 加 "在看",养成好习惯!我们下期再见!
欢迎扫码关注微信公众号或 我的博客