ELK环境搭建(二)—— 加入kafka

简介

昨天进行了ELK日志环境的搭建,从网上看是说logstash有性能瓶颈,解决的方式就是加入消息队列或者使用logstash集群。因此今天在原有框架上增长了kafka做为消息队列。html

安装

kafka:git

从官网下载tar 包,解压便可github

配置

kafka配置使用默认配置便可web

启动的时候出现提示错误: 找不到或没法加载主类 的解决方案spring

在配置好kafka的server.properties文件后,cmd进入命令窗口输入命令:.\bin\windows\kafka-server-start.bat config\server.properties提示错误:错误: 找不到或没法加载主类 Files\Java\jdk1.7.0_80\lib;C:\Program 解决方式以下:在kafka安装目录中找到bin\windows目录中的kafka-run-class.bat找到142行为%CLASSPATH%加上双引号apache

修改前: 
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %* 
修改后: 
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
bootstrap

修改昨天的logstash数据源配置windows

input {  
  kafka {
    bootstrap_servers => "localhost:9092"
    topics  =>  ["logs"]
 }
}
  
output {  
  elasticsearch {  
     hosts => ["localhost:9200"]  
     index => "applog"  
  }  
}

修改logback的配置,将logstash改成kafkabash

引入lagback-kafka依赖 app

<dependency>
			<groupId>com.github.danielwegener</groupId>
			<artifactId>logback-kafka-appender</artifactId>
			<version>0.1.0</version>
		</dependency>

修改logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="10 seconds">

    <springProperty scope="context" name="springAppName"
                    source="${spring.application.name}" />

    <property name="CONSOLE_LOG_PATTERN"
              value="%date [%thread] %-5level %logger{36} - %msg%n" />

    <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
        <withJansi>true</withJansi>
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>utf8</charset>
        </encoder>
    </appender>

    <!--<appender name="logstash"
              class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>127.0.0.1:4560</destination>
        <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
            <providers>
                <timestamp>
                    <timeZone>UTC</timeZone>
                </timestamp>
                <pattern>
                    <pattern>
                        {
                        "severity":"%level",
                        "service": "${springAppName:-}",
                        "trace": "%X{X-B3-TraceId:-}",
                        "span": "%X{X-B3-SpanId:-}",
                        "exportable": "%X{X-Span-Export:-}",
                        "pid": "${PID:-}",
                        "thread": "%thread",
                        "class": "%logger{40}",
                        "rest": "%message"
                        }
                    </pattern>
                </pattern>
            </providers>
        </encoder>
    </appender>-->

    <!-- This is the kafkaAppender -->
    <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <!-- This is the default encoder that encodes every log message to an utf8-encoded string  -->
        <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            </layout>
        </encoder>
        <topic>logs</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy" />
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />

        <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
        <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
        <!-- bootstrap.servers is the only mandatory producerConfig -->
        <producerConfig>bootstrap.servers=localhost:9092</producerConfig>

        <!-- this is the fallback appender if kafka is not available. -->
        <appender-ref ref="stdout" />
    </appender>


    <appender name="dailyRollingFileAppender" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <File>main.log</File>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <FileNamePattern>main.%d{yyyy-MM-dd}.log</FileNamePattern>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <encoder>
            <Pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{35} - %msg %n</Pattern>
        </encoder>
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>DEBUG</level>
        </filter>
    </appender>

    <springProfile name="!production">
        <logger name="com.example" level="DEBUG" />
        <logger name="org.springframework.web" level="INFO"/>
        <root level="info">
            <appender-ref ref="stdout" />
            <appender-ref ref="dailyRollingFileAppender" />
            <appender-ref ref="kafkaAppender" />
        </root>
    </springProfile>

    <springProfile name="production">
        <logger name="com.example" level="DEBUG" />
        <logger name="org.springframework.web" level="INFO"/>
        <root level="info">
            <appender-ref ref="stdout" />
            <appender-ref ref="dailyRollingFileAppender" />
            <appender-ref ref="kafkaAppender" />
        </root>
    </springProfile>
</configuration>

启动

启动zookeper

进入kafka下的bin目录,执行命令

.\zookeeper-server-start.bat C:\work\kafka_2.11-1.1.0\config\zookeeper.properti
es

启动kafka server

执行命令

.\kafka-server-start.bat C:\work\kafka_2.11-1.1.0\config\server.properties

剩下部分与昨天相同  启动ElasticSearch,启动logstash,启动kibana

访问Controller,能够看到日志也写入了ElasticSearch

相关文章
相关标签/搜索