第1章 环境说明:
java
现有架构为elk+kafka+filebeat,elk各组件为5.2.x版本node
[root@logstash-node-1 ~]# rpm -qa |grep logstashnginx
logstash-5.2.2-1.noarchapache
[root@logstash-node-1 ~]# java -versionjson
java version "1.8.0_181"bootstrap
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)服务器
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)架构
因为logstash5.x版本不支持独立的pipeline,须要大量的if-else判断,配置文件管理起来也比较复杂,而新的pipeline配置相对独立,能够针对每一个业务的日志类型来进行管理app
这里只升级了logstash组件,验证是能够和es 5.x版本配置使用的,没有问题elasticsearch
cat /etc/logstash/pipelines.yml
- pipeline.id: nginx_access
path.config: "/etc/logstash/conf.d/nginx_access.yml"
cat /etc/logstash/conf.d/nginx_access.yml
input {
kafka {
bootstrap_servers => "127.0.0.1:9020"
group_id => "logstash"
consumer_threads => 5
topics => "nginx_access"
codec => "json"
}
}
filter {
grok {
patterns_dir => [ "/etc/logstash/patterns.d/" ]
match => {
message => ["%{WPT_NGX_COMM}"]
}
}
mutate {
split => ["request" , "?"]
add_field => {
"uri_path" => "%{[request][0]}"
"uri_query" => "%{[request][1]}"
}
remove_field => ["request"]
convert => {
"response" => "integer"
"body_bytes_sent" => "integer"
"request_time" => "float"
"upstream_response_time" => "float"
}
}
useragent {
source => "user_agent"
lru_cache_size => 5000
}
date {
timezone => "Asia/Shanghai"
match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z", "UNIX", "yyyy-MM-dd HH:mm:ss", "dd-MMM-yyyy HH:mm:ss" ]
target => "@timestamp"
remove_field => "timestamp"
}
}
output {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "%{type}-%{+YYYY.MM.dd}"
}
}
调试期间能够使用./bin/logstash –r 来检查配置
systemctl stop logstash.service
rpm –e logstash-5.2.2-1.noarch
yum localinstall logstash-6.5.4.rpm
配置文件我本地打包好了上传到服务器上
在pipeline文件中,我只开启了一个管道进行验证服务是否有问题,而且在output中,同时让数据打到文件一份,用来验证数据解析是否正常,最后启动logstash服务便可
output {
elasticsearch {
hosts => ["http://127.0.0.1:9200"]
index => "%{type}-%{+YYYY.MM.dd}"
}
file {
path => "/tmp/test.log"
}
日志中没有报错,而且logstash工做也是正常的,可是有警告,网上查了一下,多是与kafka版本不匹配而致使
[2018-12-26T14:39:05,424][WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-7
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_121]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[?:1.8.0_121]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:1.8.0_121]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) [kafka-clients-2.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:791) [kafka-clients-2.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650) [kafka-clients-2.0.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630) [kafka-clients-2.0.1.jar:?]
at sun.reflect.GeneratedConstructorAccessor47.newInstance(Unknown Source) [?:1.8.0_121]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [?:1.8.0_121]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) [?:1.8.0_121]