以前是使用NLog直接将日志发送到了ELK,本篇将会使用Docker搭建ELK和kafka,同时替换NLog为Log4net。html
一.搭建kafkagit
1.拉取镜像github
//下载zookeeper docker pull wurstmeister/zookeeper //下载kafka docker pull wurstmeister/kafka:2.11-0.11.0.3
2.启动docker
//启动zookeeper docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper //启动kafka docker run -d --name kafka --publish 9092:9092 \ --link zookeeper \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.3.131:2181 \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.3.131 \ --env KAFKA_ADVERTISED_PORT=9092 \ --volume /etc/localtime:/etc/localtime \ wurstmeister/kafka:2.11-0.11.0.3
3.测试Kafkajson
//查看Kafka容器ID docker ps 进入容器 docker exec -it [容器ID] bin/bash //建立topic bin/kafka-topics.sh --create --zookeeper 192.168.3.131:2181 --replication-factor 1 --partitions 1 --topic mykafka //查看topic bin/kafka-topics.sh --list --zookeeper 192.168.3.131:2181 //建立生产者 bin/kafka-console-producer.sh --broker-list 192.168.3.131:9092 --topic mykafka //再开一个客户端进入容器 //建立消费者 bin/kafka-console-consumer.sh --zookeeper 192.168.3.131:2181 --topic mykafka --from-beginning
再生产端发送消息,消费端能够成功接收到就说明没问题。bootstrap
二.Docker安装ELKapi
1.拉取镜像 docker pull sebp/elk 2.启动ELK docker run -p 5601:5601 -p 9200:9200 -p 9300:9300 -p 5044:5044 -e ES_MIN_MEM=128m -e ES_MAX_MEM=2048m -d --name elk sebp/elk //若启动过程出错通常是由于elasticsearch用户拥有的内存权限过小,至少须要262144 切换到root用户 执行命令: sysctl -w vm.max_map_count=262144 查看结果: sysctl -a|grep vm.max_map_count 显示: vm.max_map_count = 262144 上述方法修改以后,若是重启虚拟机将失效,因此: 解决办法: 在 /etc/sysctl.conf文件最后添加一行 vm.max_map_count=262144 便可永久修改
等几十秒,而后访问9200和5601端口就能够看到ELK相关的面板。浏览器
而后咱们还须要配置下logstash:bash
1.查看elk容器ID docker ps 2.进入elk容器 docker exec -it 容器ID bin/bash 3.执行命令 /opt/logstash/bin/logstash -e 'input { stdin { } } output { elasticsearch { hosts => ["localhost"] } }'
当命令成功被执行后,看到:Successfully started Logstash API endpoint {:port=>9600} 信息后,输入:this is a dummy entry 而后回车,模拟一条日志进行测试。app
打开浏览器,输入:http://:9200/_search?pretty 如图,就会看到咱们刚刚输入的日志内容。
注意:若是看到这样的报错信息 Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the "path.data" setting. 请执行命令:service logstash stop 而后在执行就能够了。
OK,测试没问题的话,说明Logstash和ES之间是能够正常联通,而后咱们须要配置Logstash从Kafka消费消息:
1.找到config文件 cd /opt/logstash/config 2.编辑配置文件 vi logstash.config
input { kafka{ bootstrap_servers =>["192.168.3.131:9092"] client_id => "test" group_id => "test" consumer_threads => 5 decorate_events => true topics => "mi" } } filter{ json{ source => "message" } } output { elasticsearch { hosts => ["localhost"] index => "mi-%{app_id}" codec => "json" } }
bootstrap_servers:Kafka地址
这里介绍下这个app_id的做用,生产场景下咱们根据不一样的项目生成不一样的ES索引,好比服务是一个单独的索引,Web是一个,MQ是一个,这些就能够经过传入的app_id来区分建立。
配置完成后加载该配置:
/opt/logstash/bin/logstash -f /opt/logstash/config/logstash.conf
没问题的话,此时Logstash就会从Kafka消费数据了,而后咱们新建一个.net core API项目测试一下:
1.经过NuGet引用 Microsoft.Extensions.Logging.Log4Net.AspNetCore;
2.启动文件中注入 Log4Net:
public static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .ConfigureLogging((logging) => { // 过滤掉 System 和 Microsoft 开头的命名空间下的组件产生的警告级别如下的日志 logging.AddFilter("System", LogLevel.Warning); logging.AddFilter("Microsoft", LogLevel.Warning); logging.AddLog4Net(); }) .UseStartup<Startup>() .Build();
3.根目录下添加 log4net.config,设置为 “若是较新则复制”
<?xml version="1.0" encoding="utf-8" ?> <log4net> <appender name="KafkaAppender" type="log4net.Kafka.Core.KafkaAppender, log4net.Kafka.Core"> <KafkaSettings> <broker value="192.168.3.131:9092" /> <topic value="mi" /> </KafkaSettings> <layout type="log4net.Kafka.Core.KafkaLogLayout,log4net.Kafka.Core" > <appid value="api-test" /> </layout> </appender> <root> <level value="ALL"/> <appender-ref ref="KafkaAppender" /> </root> </log4net>
[Route("api/[controller]")] public class ValuesController : Controller { private readonly ILogger _logger; public ValuesController(ILogger<ValuesController> logger) { _logger = logger; } // GET api/values [HttpGet] public IEnumerable<string> Get() { _logger.LogInformation("根据appId最后一次测试Kafka!"); return new string[] { "value1", "value2" }; } }
OK,运行而后访问5601端口查看: