ELFK结合Kafka报错:InstanceAlreadyExistsException

InstanceAlreadyExistsException错误

在ELFK集群结合Kafka集群的配置过程当中,经过filebet收集日志传输到kafka,而后Kafka传输日志到logstash,最后到elasticsearch,并结合zabbix告警。html

但在kafka传输日志到logstash的配置完成后,启动logstash时报错:java

[WARN ][org.apache.kafka.common.utils.AppInfoParser] Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException

kafka经过logstash输入到elasticsearch的配置文件:web

input {
    kafka {
        bootstrap_servers => "192.168.1.253:9092"
        group_id => "webservice"
	    topics => "253-webservice"
        auto_offset_reset => "latest"
        consumer_threads => 5
        decorate_events => true 
        type => "253_webservice"
    }}filter {
    if [type] == "253_ciphermachine" {
        ruby {
            code => "event.set('log_time',event.get('@timestamp').time.localtime  + 8*60*60)"
        }
        #grok{
        #    match => [ "log_time","(?^\d{4}-\d{1,2}-\d{1,2})"  ]
        #}

        grok {
            match => [ "message", "%{TIME:thistime} %{NOTSPACE:level}" ]
        }

        mutate {
            add_field => [ "[zabbix_key]", "ciphermachine_error" ]
            add_field => [ "[zabbix_host]", "192.168.1.253" ]

            remove_field => "@version"
            remove_field => "host"
            remove_field => "path"
            remove_field => "_type"
            remove_field => "_score"
            remove_field => "_id"
            remove_field => "thread-id"
            remove_field => "log_time"
            remove_field => "thisdate"
            remove_field => "thistime"
            remove_field => "score"
            remove_field => "id"
            remove_field => "name"
            remove_field => "beat.*"
            remove_field => "fields.*"
            remove_field => "host.*"
            remove_field => "input.type"
            remove_field => "log.file.path"
            remove_field => "offset"
            remove_field => "prospector.type"
            remove_field => "source"
        }

        #ruby {
        #    code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )"
        #}

        date {
            match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ]
            target => "@timestamp"
        }

        ruby {
            code => "event.set('logtime',event.get('@timestamp').time.localtime  + 8*60*60)"
        }
    }}output {
    if [type] == "253_ciphermachine" {
        elasticsearch {
            hosts => ["192.168.1.253:9200"]
            user => "elastic"
            password => "changeme"
            index => "webservice.log-%{+YYYY.MM.dd}"
        }

        if [level]  =~ /(ERR|error|ERROR|Failed)/ {
            zabbix {
                zabbix_host => "[zabbix_host]"
                zabbix_key => "[zabbix_key]"
                zabbix_server_host => "192.168.1.252"
                zabbix_server_port => "10051"
                zabbix_value => "message"
            }
        }
    }}

网上搜了一下,提示是kafka并发问题。因而从新修改了配置文件:shell

input {
    kafka {
        bootstrap_servers => "192.168.1.253:9092"
        group_id => "ciphermachine"
        client_id => "ciphermachine-1"                #须要指定client_id,不然logstash启动报错
        topics => "253-ciphermachine"
        auto_offset_reset => "latest"
        codec => "json"
        consumer_threads => 5
        decorate_events => false
        type => "253_ciphermachine"
    }}filter {
    if [type] == "253_ciphermachine" {
        ruby {
            code => "event.set('log_time',event.get('@timestamp').time.localtime  + 8*60*60)"
        }
        #grok{
        #    match => [ "log_time","(?^\d{4}-\d{1,2}-\d{1,2})"  ]
        #}

        grok {
            match => [ "message", "%{TIME:thistime} %{NOTSPACE:level}" ]
        }

        mutate {
            add_field => [ "[zabbix_key]", "ciphermachine_error" ]
            add_field => [ "[zabbix_host]", "192.168.1.253" ]

            remove_field => "@version"
            remove_field => "host"
            remove_field => "path"
            remove_field => "_type"
            remove_field => "_score"
            remove_field => "_id"
            remove_field => "thread-id"
            remove_field => "log_time"
            remove_field => "thisdate"
            remove_field => "thistime"
            remove_field => "score"
            remove_field => "id"
            remove_field => "name"
            remove_field => "beat.*"
            remove_field => "fields.*"
            remove_field => "host.*"
            remove_field => "input.type"
            remove_field => "log.file.path"
            remove_field => "offset"
            remove_field => "prospector.type"
            remove_field => "source"
        }

        #ruby {
        #    code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )"
        #}

        date {
            match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ]
            target => "@timestamp"
        }

        ruby {
            code => "event.set('logtime',event.get('@timestamp').time.localtime  + 8*60*60)"
        }
    }}output {
    if [type] == "253_ciphermachine" {
        elasticsearch {
            hosts => ["192.168.1.253:9200"]
            user => "elastic"
            password => "changeme"
            index => "webservice.log-%{+YYYY.MM.dd}"
        }

        if [level]  =~ /(ERR|error|ERROR|Failed)/ {
            zabbix {
                zabbix_host => "[zabbix_host]"
                zabbix_key => "[zabbix_key]"
                zabbix_server_host => "192.168.1.252"
                zabbix_server_port => "10051"
                zabbix_value => "message"
            }
        }
    }}

反复测试,发现是input部分没有指定client_id,任意指定client_id便可,重启logstash后再也不报错。apache


删除topic错误

在部署过程当中,原先直接使用logstash收集日志传输到kafka存在问题,kafka没法接收到logstash传入的数据,而手动去kafka的topic中生产数据,又会致使kafka重复消费的问题,可是我其实配置了auto_offset_reset => "latest",多是个人配置文件存在问题。因而最终放弃了直接用logstash收集日志到kafka,改由filebeat收集日志到kafka。json

可是在此以前,我须要删除以前的topic。可是在删除topic时,一直提示zookeeper marked,实际topic没有删除,只是被zookeeper标记,能够经过下面解决。bootstrap

  • 修改kafka配置:
# vim /usr/local/kafka/config/server.propertiesdelete.topic.enable=true            #容许删除topic

  • 删除topic:
# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.1.253:2181 --topic 253-webservice

若是这样没法删除,进入zookeeper命令行删除:vim

# /usr/local/kafka/bin/zookeeper-shell.sh 192.168.1.253:2181ls /brokers/topics

rmr /brokers/topics/253-ciphermachine

rmr /brokers/topics/253-webservice

  • 列出当前topic:
# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.1.253:2181
相关文章
相关标签/搜索