系统:CentOS 7 es主节点/es数据节点/kibana/head 192.168.1.253 kafka/zookeeper/logstash 192.168.1.253 日志测试/filebeat 192.168.1.253
# systemctl stop firewalld && systemctl disable firewalld # sed -i 's/=enforcing/=disabled/g' /etc/selinux/config && setenforce 0
# vim /etc/security/limits.conf
* soft nofile 65536
* hard nofile 131072
* soft nproc 2048
* hard nproc 4096
# vim /etc/sysctl.conf vm.max_map_count=655360 # sysctl -p
# tar zxf jdk-8u191-linux-x64.tar.gz && mv jdk1.8.0_191/ /usr/local/jdk # vim /etc/profile JAVA_HOME=/usr/local/jdk PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib export JAVA_HOME PATH CLASSPATH # source !$ # java -version # ln -s /usr/local/jdk/bin/java /usr/local/bin/java
生产初期,Service服务较少,访问量较少,使用ELFK集群就能够知足生产需求。但随着业务量的不断增长,日志量成倍增加,针对此状况,须要对ELFK增长消息队列,以减轻前端ES集群的压力。前端
那么选择redis仍是kafka做为消息队列呢?从如下三点考虑:java
* 消息推送的可靠性: Redis 消息推送(基于分布式 Pub/Sub)多用于实时性较高的消息推送,并不保证可靠。 Redis-Pub/Sub 断电就会清空数据,而使用 Redis-List 做为消息推送虽然有持久化,也并不是彻底可靠不会丢失。 Kafka 虽然有一些延迟但保证可靠。 * 订阅功能的分组: Redis 发布订阅除了表示不一样的 topic 外,并不支持分组。 Kafka 中发布一个内容,多个订阅者能够分组,同一个组里只有一个订阅者会收到该消息,这样能够用做负载均衡。 * 集群资源的消耗: Redis 3.0以后个有提供集群ha机制,可是要为每一个节点都配置一个或者多个从节点,从节点从主节点上面拉取数据,主节点挂了,从节点顶替上去成为主节点,可是这样对资源比较浪费。 Kafka 做为消息队列,能充分的运用集群资源,每一个应用至关于一个topic,一个topic可拥有多个partition,而且partition能轮询分配到每一个节点上面,而且生产者生产的数据也会均匀的放到partition中, 即便上层只有1个应用kafka集群的资源也会被充分的利用到,这样就避免了redis集群出现的数据倾斜问题,而且kafka有相似于hdfs的冗余机制,一个broker挂掉了不影响整个集群的运行。
这里,咱们选择kafka做为消息队列,配置kafka集群,结合ELFK集群收集应用日志。linux
Elasticsearch、Kibana及Head安装过程此处省略。web
Apache kafka是消息中间件的一种,是一种分布式的、基于发布/订阅的消息系统。能实现一个为处理实时数据提供一个统1、高吞吐、低延迟的平台,且拥有分布式的,可划分的,冗余备份的持久性的日志服务等特色。redis
kafka/zookeeper/logstash 192.168.1.253
# cd /software # wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz # tar zxf kafka_2.11-2.2.0.tgz && mv kafka_2.11-2.2.0 /usr/local/kafka
# vim /usr/local/kafka/config/zookeeper.properties dataDir=/usr/local/kafka/zookeeper clientPort=2181 maxClientCnxns=1024 tickTime=2000 initLimit=20 syncLimit=10 server.1=192.168.1.253:2888:3888
说明:apache
tickTime : 这个时间是做为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每一个 tickTime 时间就会发送一个心跳。 2888 端口:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口; 3888 端口:表示的是万一集群中的 Leader 服务器挂了,须要一个端口来从新进行选举,选出一个新的 Leader ,而这个端口就是用来执行选举时服务器相互通讯的端口。
在zookeeper目录下建立myid文件,里面的内容为数字,用于标识主机,若是这个文件没有的话,zookeeper是无法启动的。json
# mkdir /usr/local/kafka/zookeeper # echo 1 > /usr/local/kafka/zookeeper/myid #若是是集群,每台机器id不一样便可
# vim /usr/local/kafka/config/server.properties broker.id=1 #ID惟一,填数字 port=9092 host.name=192.168.1.253 #本机ip log.dirs=/data/kafka/kafka-logs #数据存放目录,不是日志目录 num.partitions=16 #每一个topic的默认分区数 log.retention.hours=168 #过时时间,默认为1周 zookeeper.connect=192.168.1.253:2181 #zookeeper ip及端口 # mkdir -p /data/kafka
# vim /usr/bin/zk_kafka #!/bin/bash #chkconfig: 2345 55 24 #description: zookeeper and kafka service manager BASE_DIR=/usr/local/kafka SERVICE=$1 START_ZK() { cd $BASE_DIR nohup $BASE_DIR/bin/zookeeper-server-start.sh $BASE_DIR/config/zookeeper.properties &>/dev/null & } STOP_ZK() { cd $BASE_DIR nohup $BASE_DIR/bin/zookeeper-server-stop.sh &>/dev/null & } START_KAFKA() { cd $BASE_DIR nohup $BASE_DIR/bin/kafka-server-start.sh $BASE_DIR/config/server.properties &>/dev/null & } STOP_KAFKA() { cd $BASE_DIR nohup $BASE_DIR/bin/kafka-server-stop.sh &>/dev/null & } if [ -z "$1" ]; then echo $"Usage: $0 {zookeeper|kafka} {start|stop|restart}" exit 0 else if [ "$1" != "zookeeper" ] && [ "$1" != "kafka" ]; then echo $"Usage: $0 {zookeeper|kafka} {start|stop|restart}" exit 1 fi fi START() { if [ "$SERVICE" = "zookeeper" ]; then START_ZK if [ $? -eq 0 ]; then echo -e "\033[32m Start $SERVICE OK. \033[0m" fi else START_KAFKA if [ $? -eq 0 ]; then echo -e "\033[32m Start $SERVICE OK. \033[0m" fi fi } STOP() { if [ "$SERVICE" = "zookeeper" ]; then STOP_ZK if [ $? -eq 0 ]; then echo -e "\033[32m Stop $SERVICE OK. \033[0m" fi else STOP_KAFKA if [ $? -eq 0 ]; then echo -e "\033[32m Stop $SERVICE OK. \033[0m" fi fi } case "$2" in start) START ;; stop) STOP ;; restart) STOP sleep 2 START ;; *) echo $"Usage: $0 {zookeeper|kafka} {start|stop|restart}" exit 1 ;; esac exit 0
这里先要启动zookeeper,才能启动kafka。bootstrap
# chmod +x /usr/bin/zk_kafka # zk_kafka zookeeper start # netstat -lntp | grep -E "2181|2888|3888" #检查端口,拥有2888端口为leader tcp6 0 0 192.168.100.132:2888 :::* LISTEN 6787/java tcp6 0 0 192.168.100.132:3888 :::* LISTEN 6787/java tcp6 0 0 :::2181 :::* LISTEN 6787/java
zookeeper集群已经启动起来了,下面启动kafka。vim
# zk_kafka kafka start
# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.253:2181 --replication-factor 1 --partitions 1 --topic test #注意:factor大小不能超过broker的个数
Created topic test.
# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.1.253:2181 test
查看topic test的详情ruby
# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.1.253:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
上面,
#主题名称:test #Partition:只有一个,从0开始 #leader :id为1的broker #Replicas 副本存在于broker id为1,2,3的上面 #Isr:活跃状态的broker
# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.253:9092 --topic test >This is a test message for kafka producer >welcome >let's go
另开一个会话
# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.253:9092 --topic test --from-beginning # --from-beginning表示从开始接收,不然只接收新产生的消息 This is a test message for kafka producer welcome let's go
能够看到,以上是Kafka生产者和消费者的测试,基于Kafka的Zookeeper集群没有问题。
192.168.1.253 logstash/filebeat
# cd /software # tar zxf logstash-6.7.1.tar.gz && mv logstash-6.7.1/ /usr/local/logstash # mkdir /usr/local/logstash/conf.d # useradd elk
# vim /usr/local/logstash/config/logstash.yml http.host: "192.168.1.253" #填本机ip http.port: 9600 #没有部署x-pack则省略下面部分 xpack.monitoring.enabled: true xpack.monitoring.elasticsearch.username: logstash_system xpack.monitoring.elasticsearch.password: elk-2019 xpack.monitoring.elasticsearch.hosts: ["http://192.168.100.128:9200","http://192.168.100.129:9200","http://192.168.100.130:9200"] xpack.monitoring.collection.interval: 10s
服务配置文件
# vim /etc/default/logstash LS_HOME="/usr/local/logstash" LS_SETTINGS_DIR="/usr/local/logstash" LS_PIDFILE="/usr/local/logstash/run/logstash.pid" LS_USER="elk" LS_GROUP="elk" LS_GC_LOG_FILE="/usr/local/logstash/logs/gc.log" LS_OPEN_FILES="16384" LS_NICE="19" SERVICE_NAME="logstash" SERVICE_DESCRIPTION="logstash"
服务文件
# vim /etc/systemd/system/logstash.service [Unit] Description=logstash [Service] Type=simple User=elk Group=elk # Load env vars from /etc/default/ and /etc/sysconfig/ if they exist. # Prefixing the path with '-' makes it try to load, but if the file doesn't # exist, it continues onward. EnvironmentFile=-/etc/default/logstash EnvironmentFile=-/etc/sysconfig/logstash ExecStart=/usr/local/logstash/bin/logstash "--path.settings" "/usr/local/logstash/config" "--path.config" "/usr/local/logstash/conf.d" Restart=always WorkingDirectory=/ Nice=19 LimitNOFILE=16384 [Install] WantedBy=multi-user.target
管理服务:
# mkdir /usr/local/logstash/{run,logs} && touch /usr/local/logstash/run/logstash.pid # touch /usr/local/logstash/logs/gc.log && chown -R elk:elk /usr/local/logstash # yum install -y bash-completion && source /etc/profile #命令自动补全 # systemctl daemon-reload # systemctl enable logstash
# tar zxf filebeat-6.7.1-linux-x86_64.tar.gz && mv filebeat-6.7.1-linux-x86_64 /usr/local/filebeat
服务文件
# vim /usr/lib/systemd/system/filebeat.service [Unit] Description=Filebeat sends log files to Logstash or directly to Elasticsearch. Documentation=https://www.elastic.co/products/beats/filebeat Wants=network-online.target After=network-online.target [Service] ExecStart=/usr/local/filebeat/filebeat -c /usr/local/filebeat/filebeat.yml -path.home /usr/local/filebeat -path.config /usr/local/filebeat -path.data /usr/local/filebeat/data -path.logs /usr/local/filebeat/logs Restart=always [Install] WantedBy=multi-user.target
管理服务:
# mkdir /usr/local/filebeat/{data,logs} # systemctl daemon-reload # systemctl enable filebeat
这里,咱们以收集日志/home/logs/ciphermachine/ciphermachine.log
和/home/logs/webservice/webservice.log
为例,测试ELFK与kafka结合。
filebeat配置多个topic
# vim /usr/local/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true paths: - /home/logs/webservice/webservice.log fields: serverip: 192.168.1.253 logtopic: 253-webservice - type: log enabled: true paths: - /home/logs/ciphermachine/ciphermachine.log fields: serverip: 192.168.1.253 logtopic: 253-ciphermachine output.kafka: enabled: true hosts: ["192.168.1.253:9092"] topic: '%{[fields.logtopic]}' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip #消息压缩模式 max_message_bytes: 1000000
# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.253:2181 --replication-factor 1 --partitions 1 --topic 253-ciphermachine Created topic 253-ciphermachine. # /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.253:2181 --replication-factor 1 --partitions 1 --topic 253-webservice Created topic 253-webservice.
kafka和logstash部署在相同机器上,所以这一步须要在kafka部署机器上配置logstash。
# vim /usr/local/logstash/conf.d/ciphermachine.conf input { kafka { bootstrap_servers => "192.168.1.253:9092" group_id => "ciphermachine" client_id => "ciphermachine-1" #须要指定client_id,不然logstash启动报错 topics => "253-ciphermachine" auto_offset_reset => "latest" #从最新的偏移量开始消费 codec => "json" consumer_threads => 5 #消费的线程数 decorate_events => false #在输出消息的时候不让输出自身的信息,包括:消费消息的大小、topic来源以及consumer的group信息 type => "253_ciphermachine" } } filter { if [type] == "253_ciphermachine" { ruby { code => "event.set('log_time',event.get('@timestamp').time.localtime + 8*60*60)" } #grok{ # match => [ "log_time","(?<thisdate>^\d{4}-\d{1,2}-\d{1,2})" ] #} grok { match => [ "message", "%{TIME:thistime} %{NOTSPACE:level}" ] } mutate { add_field => [ "[zabbix_key]", "ciphermachine_error" ] add_field => [ "[zabbix_host]", "192.168.1.253" ] remove_field => "@version" remove_field => "host" remove_field => "path" remove_field => "_type" remove_field => "_score" remove_field => "_id" remove_field => "thread-id" remove_field => "log_time" remove_field => "thisdate" remove_field => "thistime" remove_field => "score" remove_field => "id" remove_field => "name" remove_field => "beat.*" remove_field => "fields.*" remove_field => "host.*" remove_field => "input.type" remove_field => "log.file.path" remove_field => "offset" remove_field => "prospector.type" remove_field => "source" } #ruby { # code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )" #} date { match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ] target => "@timestamp" } ruby { code => "event.set('logtime',event.get('@timestamp').time.localtime + 8*60*60)" } } } output { if [type] == "253_ciphermachine" { elasticsearch { hosts => ["192.168.1.253:9200"] user => "elastic" password => "changeme" index => "webservice.log-%{+YYYY.MM.dd}" } if [level] =~ /(ERR|error|ERROR|Failed)/ { zabbix { zabbix_host => "[zabbix_host]" zabbix_key => "[zabbix_key]" zabbix_server_host => "192.168.1.252" zabbix_server_port => "10051" zabbix_value => "message" } } } }
# vim /usr/local/logstash/conf.d/webservice.conf input { kafka { bootstrap_servers => "192.168.1.253:9092" group_id => "webservice" client_id => "webservice-1" topics => "253-webservice" auto_offset_reset => "latest" codec => "json" consumer_threads => 5 decorate_events => false type => "253_webservice" } } filter { if [type] == "253_webservice" { ruby { code => "event.set('log_time',event.get('@timestamp').time.localtime + 8*60*60)" } #grok{ # match => [ "log_time","(?<thisdate>^\d{4}-\d{1,2}-\d{1,2})" ] #} grok { match => [ "message", "%{TIME:thistime} %{NOTSPACE:thread-id}\[%{DATA:name}\] %{LOGLEVEL:level}" ] } mutate { add_field => [ "[zabbix_key]", "webservice_error" ] add_field => [ "[zabbix_host]", "192.168.1.253" ] remove_field => "@version" remove_field => "host" remove_field => "path" remove_field => "_type" remove_field => "_score" remove_field => "_id" remove_field => "thread-id" remove_field => "log_time" remove_field => "thisdate" remove_field => "thistime" remove_field => "score" remove_field => "id" remove_field => "name" remove_field => "beat.*" remove_field => "fields.*" remove_field => "host.*" remove_field => "input.type" remove_field => "log.file.path" remove_field => "offset" remove_field => "prospector.type" remove_field => "source" } #ruby { # code => "event.set('logtime',event.get('thisdate') + ' ' + event.get('thistime') )" #} date { match => [ "logtime","yyyy-MM-dd HH:mm:ss,SSS",'ISO8601' ] target => "@timestamp" } ruby { code => "event.set('logtime',event.get('@timestamp').time.localtime + 8*60*60)" } } } output { if [type] == "253_webservice" { elasticsearch { hosts => ["192.168.1.253:9200"] user => "elastic" password => "changeme" index => "webservice.log-%{+YYYY.MM.dd}" } if [level] =~ /(ERR|error|ERROR|Failed)/ { zabbix { zabbix_host => "[zabbix_host]" zabbix_key => "[zabbix_key]" zabbix_server_host => "192.168.1.252" zabbix_server_port => "10051" zabbix_value => "message" } } } }
# chown -R elk:elk /usr/local/logstash # /usr/local/logstash/bin/logstash-plugin install logstash-input-kafka # /usr/local/logstash/bin/logstash-plugin install logstash-output-zabbix # /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf.d/ciphermachine.conf -t #显示OK说明配置文件没问题 # /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf.d/webservice.conf -t
# systemctl start filebeat # systemctl start logstash
能够查看logstash启动日志
# tail -f /usr/local/logstash/logs/logstash-plain.log
/home/logs/ciphermachine/ciphermachine.log
模拟新日志产生:# echo "echo '10:47:52.225 TRSID[%PARSER_ERROR[wDPI]] - Unable to read additional data from server sessionid 0x164fdd3863600e6, likely server has closed socket, closing socket connection and attempting reconnect' >> /home/logs/ciphermachine/ciphermachine.log" >> /home/logs/ciphermachine/ciphermachine.log # echo "该消息是ciphermachine经过Kafka队列到达ES集群!!!" >> /home/logs/ciphermachine/ciphermachine.log
分别到head、kibana、zabbix及邮箱查看:
/home/logs/webservice/webservice.log
模拟新日志产生:# echo '10:58:26.612 TRSID[DubboShutdownHook] INFO - [DUBBO] Close all registries [], dubbo version: 2.6.2, current host: 172.17.0.1' >> /home/logs/webservice/webservice.log # echo '10:59:26.612 TRSID[DubboShutdownHook] ERROR - [DUBBO] Close all registries [], dubbo version: 2.6.2, current host: 172.17.0.1' >> /home/logs/webservice/webservice.log # echo "该消息是webservice经过Kafka队列到达ES集群!!!" >> /home/logs/webservice/webservice.log
分别到head、kibana、zabbix及邮箱查看:
至此,ELFK结合Kafka和Zabbix收集日志并实现错误告警完成。
一般在日志量不大的状况下,可使用单机部署,但为了高可用负载均衡,建议使用ELFK集群结合Kafka集群。总的来讲,结合Kafka后有利于日志压缩传输。