在ELFK集群结合Kafka集群的配置过程当中,经过filebet收集日志传输到kafka,而后Kafka传输日志到logstash,最后到elasticsearch,并结合zabbix告警。html
但在kafka传输日志到logstash的配置完成后,启动logstash时报错:java
[WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException
kafka经过logstash输入到elasticsearch的配置文件:web
input { kafka { bootstrap_servers => "192.168.1.253:9092" group_id => "webservice" topics => "253-webservice" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true type => "253_webservice" }}filter { if [type] == "253_ciphermachine" { ruby { code => "event.set('log_time',event.get('@timestamp').time.localtime + 8*60*60)" } #grok{ # match => [ "log_time","(?^\d{4}-\d{1,2}-\d{1,2})" ] #} grok { match => [ "message", "%{TIME:thistime} %{NOTSPACE:level}" ] } mutate { add_field => [ "[zabbix_key]", "ciphermachine_error" ] add_field => [ "[zabbix_host]", "192.168.1.253" ] remove_field => "@version" remove_field => "host" remove_field => "path" remove_field => "_type" remove_field => "_score" remove_field => "_id" remove_field => "thread-id" remove_field => "log_time" remove_field => "thisdate" remove_field => "thistime" remove_field => "score" remove_field => "id" remove_field => "name" remove_field => "beat.*" remove_field => "fields.*" remove_field => "host.*" remove_field => "input.type" remove_field => "log.file.path" remove_field => "offset" remove_field => "prospector.type" remove_field => "source" } #ruby { # code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )" #} date { match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ] target => "@timestamp" } ruby { code => "event.set('logtime',event.get('@timestamp').time.localtime + 8*60*60)" } }}output { if [type] == "253_ciphermachine" { elasticsearch { hosts => ["192.168.1.253:9200"] user => "elastic" password => "changeme" index => "webservice.log-%{+YYYY.MM.dd}" } if [level] =~ /(ERR|error|ERROR|Failed)/ { zabbix { zabbix_host => "[zabbix_host]" zabbix_key => "[zabbix_key]" zabbix_server_host => "192.168.1.252" zabbix_server_port => "10051" zabbix_value => "message" } } }}
网上搜了一下,提示是kafka并发问题。因而从新修改了配置文件:shell
input { kafka { bootstrap_servers => "192.168.1.253:9092" group_id => "ciphermachine" client_id => "ciphermachine-1" #须要指定client_id,不然logstash启动报错 topics => "253-ciphermachine" auto_offset_reset => "latest" codec => "json" consumer_threads => 5 decorate_events => false type => "253_ciphermachine" }}filter { if [type] == "253_ciphermachine" { ruby { code => "event.set('log_time',event.get('@timestamp').time.localtime + 8*60*60)" } #grok{ # match => [ "log_time","(?^\d{4}-\d{1,2}-\d{1,2})" ] #} grok { match => [ "message", "%{TIME:thistime} %{NOTSPACE:level}" ] } mutate { add_field => [ "[zabbix_key]", "ciphermachine_error" ] add_field => [ "[zabbix_host]", "192.168.1.253" ] remove_field => "@version" remove_field => "host" remove_field => "path" remove_field => "_type" remove_field => "_score" remove_field => "_id" remove_field => "thread-id" remove_field => "log_time" remove_field => "thisdate" remove_field => "thistime" remove_field => "score" remove_field => "id" remove_field => "name" remove_field => "beat.*" remove_field => "fields.*" remove_field => "host.*" remove_field => "input.type" remove_field => "log.file.path" remove_field => "offset" remove_field => "prospector.type" remove_field => "source" } #ruby { # code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )" #} date { match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ] target => "@timestamp" } ruby { code => "event.set('logtime',event.get('@timestamp').time.localtime + 8*60*60)" } }}output { if [type] == "253_ciphermachine" { elasticsearch { hosts => ["192.168.1.253:9200"] user => "elastic" password => "changeme" index => "webservice.log-%{+YYYY.MM.dd}" } if [level] =~ /(ERR|error|ERROR|Failed)/ { zabbix { zabbix_host => "[zabbix_host]" zabbix_key => "[zabbix_key]" zabbix_server_host => "192.168.1.252" zabbix_server_port => "10051" zabbix_value => "message" } } }}
反复测试,发现是input部分没有指定client_id
,任意指定client_id
便可,重启logstash后再也不报错。apache
在部署过程当中,原先直接使用logstash收集日志传输到kafka存在问题,kafka没法接收到logstash传入的数据,而手动去kafka的topic中生产数据,又会致使kafka重复消费的问题,可是我其实配置了auto_offset_reset => "latest"
,多是个人配置文件存在问题。因而最终放弃了直接用logstash收集日志到kafka,改由filebeat收集日志到kafka。json
可是在此以前,我须要删除以前的topic。可是在删除topic时,一直提示zookeeper marked,实际topic没有删除,只是被zookeeper标记,能够经过下面解决。bootstrap
# vim /usr/local/kafka/config/server.propertiesdelete.topic.enable=true #容许删除topic
# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.1.253:2181 --topic 253-webservice
若是这样没法删除,进入zookeeper命令行删除:vim
# /usr/local/kafka/bin/zookeeper-shell.sh 192.168.1.253:2181ls /brokers/topics rmr /brokers/topics/253-ciphermachine rmr /brokers/topics/253-webservice
# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.1.253:2181