filebeat -> kafka -> logstash -> elasticsearch -> UI(kibana/我的定制化)
因为博主是在本机测试,因此的filebeat和logstash是用的传统方式安装,其余组件用的docker容器。html
除了filebeat,其余组件会用到java虚拟机,因此为了方便,最好都走docker。我这为了好调试logstash就是直接安装的,因此还必须安装JAVA。java
文章的内容是基于单机本机搭建的,要用集群的方式修改,根据对应组件的官网文档来修改相应的hosts便可。git
略github
filebeat的安装docker
配置文件: filebeat.yml (路径根据本身的安装方式,会有不一样。Linux是在/etc/filebeat/)bootstrap
配置信息:bash
#=========================== Filebeat inputs ============================= filebeat.inputs: # Each - is an input. Most options can be set at the input level, so # you can use different inputs for various configurations. # Below are the input specific configurations. - type: log # Change to true to enable this input configuration. enabled: true # Paths that should be crawled and fetched. Glob based paths. paths: - /*/log/* fields: testname: xxxnn fields_under_root: true #----------------------------- kafka output -------------------------------- output.kafka: enabled: true hosts: ["localhost:9092"] max_retries: 5 timeout: 300 topic: "filebeat"
./filebeat test config -c filebeat.yml
docker pull wurstmeister/zookeeper:latest docker pull wurstmeister/kafka:latest
sudo docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper:latest sudo docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka:latest
这一步,kafka服务意境启动成功了。服务器
(这里有个坑, 若是你想经过内网其余服务器链接到kafka容器,这里KAFKA_ADVERTISED_HOST_NAME应该设置为eth0的IP地址,详情能够看这里https://github.com/wurstmeister/kafka-docker/issues/17)架构
docker exec -it kafka bash
cd /opt/kafka
bash-4.4# grep -Ev "^$|^#" config/server.properties broker.id=1 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/kafka/kafka-logs-fbfc07e603b5 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=48 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=zookeeper:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 advertised.port=9092 advertised.host.name=127.0.0.1 port=9092 bash-4.4# grep -Ev "^$|^#" config/consumer.properties bootstrap.servers=localhost:9092 group.id=test-consumer-group bash-4.4# grep -Ev "^$|^#" config/producer.properties bootstrap.servers=localhost:9092 compression.type=none
logstash安装socket
input { kafka { bootstrap_servers => "127.0.0.1:9092" topics => ["filebeat"] group_id => "test-consumer-group" codec => "plain" consumer_threads => 1 decorate_events => true } } output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "test" workers => 1 } }
bin/logstash -f test.conf --config.test_and_exit
docker pull docker.elastic.co/elasticsearch/elasticsearch-oss:6.2.4
这个拉取的是没有x-pack的镜像,具体的选择看这里es镜像
依次启动
es:
sudo docker run -d --name es -p 9200:9200 -t docker.elastic.co/elasticsearch/elasticsearch-oss:6.2.4
logstash:
bin/logstash -f test.conf --config.reload.automatic
kafka:
上文已经启动了
filebeat:
./filebeat -e -c filebeat.yml -d "publish"
当在控制台上看到filebeat,publish成功后,再去es里面看,数据已经被索引成功啦!
目前主流的日志采集系统架构都是基于ELK(es+logstash+kibana),或者ELFK,多了一个filebeat。
为何要加入kafka消息中间件呢?
博主认为kafka有两个重要的特色,特别适合引入到日志采集系统中,高吞吐率,以及数据持久化(固然还有其余优秀的特性,好比消息主题,分布式高可用等等),提升系统中数据的可靠传输。