最近线上上了ELK,可是只用了一台Redis在中间做为消息队列,以减轻前端es集群的压力,Redis的集群解决方案暂时没有接触过,而且Redis做为消息队列并非它的强项;因此最近将Redis换成了专业的消息信息发布订阅系统Kafka, Kafka的更多介绍你们能够看这里:http://blog.csdn.net/lizhitao/article/details/39499283 ,关于ELK的知识网上有不少的哦, 此篇博客主要是总结一下目前线上这个平台的实施步骤,ELK是怎么跟Kafka结合起来的。好吧,动手!前端
然而我这里的整个日志收集平台就是这样的拓扑:java
1,使用一台Nginx代理访问kibana的请求;node
2,两台es组成es集群,而且在两台es上面都安装kibana;(如下对elasticsearch简称es)linux
3,中间三台服务器就是个人kafka(zookeeper)集群啦; 上面写的消费者/生产者这是kafka(zookeeper)中的概念;nginx
4,最后面的就是一大堆的生产服务器啦,上面使用的是logstash,固然除了logstash也可使用其余的工具来收集你的应用程序的日志,例如:Flume,Scribe,Rsyslog,Scripts……git
角色:github
软件选用:web
elasticsearch-1.7.3.tar.gz #这里须要说明一下,前几天使用了最新的elasticsearch2.0,java-1.8.0报错,目前未找到缘由,故这里使用1.7.3版本redis
Logstash-2.0.0.tar.gzapache
kibana-4.1.2-linux-x64.tar.gz
以上软件均可以从官网下载:https://www.elastic.co/downloads
java-1.8.0,nginx采用yum安装
部署步骤:
1.ES集群安装配置;
2.Logstash客户端配置(直接写入数据到ES集群,写入系统messages日志);
3.Kafka(zookeeper)集群配置;(Logstash写入数据到Kafka消息系统);
4.Kibana部署;
5.Nginx负载均衡Kibana请求;
6.案例:nginx日志收集以及MySQL慢日志收集;
7.Kibana报表基本使用;
es1.example.com:
1.安装java-1.8.0以及依赖包
yum install -y epel-release
yum install -y java-1.8.0 git wget lrzsz
2.获取es软件包
wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz
tar -xf elasticsearch-1.7.3.tar.gz -C /usr/local
ln -sv /usr/local/elasticsearch-1.7.3 /usr/local/elasticsearch
3.修改配置文件
[root@es1 ~]# vim /usr/local/elasticsearch/config/elasticsearch.yml
32 cluster.name: es-cluster #组播的名称地址
40 node.name: "es-node1 " #节点名称,不能和其余节点重复
47 node.master: true #节点可否被选举为master
51 node.data: true #节点是否存储数据
107 index.number_of_shards: 5 #索引分片的个数
111 index.number_of_replicas: 1 #分片的副本个数
145 path.conf: /usr/local/elasticsearch/config/ #配置文件的路径
149 path.data: /data/es/data #数据目录路径
159 path.work: /data/es/worker #工做目录路径
163 path.logs: /usr/local/elasticsearch/logs/ #日志文件路径
167 path.plugins: /data/es/plugins #插件路径
184 bootstrap.mlockall: true #内存不向swap交换
232 http.enabled: true #启用http
4.建立相关目录
mkdir /data/es/{data,worker,plugins} -p
5.获取es服务管理脚本
[root@es1 ~]# git clone https://github.com/elastic/elasticsearch-servicewrapper.git
[root@es1 ~]# mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/
[root@es1 ~]# /usr/local/elasticsearch/bin/service/elasticsearch install
Detected RHEL or Fedora:
Installing the Elasticsearch daemon..
[root@es1 ~]#
#这时就会在/etc/init.d/目录下安装上es的管理脚本啦
#修改其配置:
[root@es1 ~]#
set.default.ES_HOME=/usr/local/elasticsearch #安装路径
set.default.ES_HEAP_SIZE=1024 #jvm内存大小,根据实际环境调整便可
6.启动es ,并检查其服务是否正常
[root@es1 ~]# netstat -nlpt | grep -E "9200|"9300
tcp 0 0 0.0.0.0:9200 0.0.0.0:* LISTEN 1684/java
tcp 0 0 0.0.0.0:9300 0.0.0.0:* LISTEN 1684/java
访问http://192.168.2.18:9200/ 若是出现如下提示信息说明安装配置完成啦,
7.es1节点好啦,咱们直接把目录复制到es2
[root@es1 local]# scp -r elasticsearch-1.7.3 192.168.12.19:/usr/local/
[root@es2 local]# ln -sv elasticsearch-1.7.3 elasticsearch
[root@es2 local]# elasticsearch/bin/service/elasticsearch install
#es2只须要修改node.name便可,其余都与es1相同配置
8.安装es的管理插件
es官方提供一个用于管理es的插件,可清晰直观看到es集群的状态,以及对集群的操做管理,安装方法以下:
[root@es1 local]# /usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head
安装好以后,访问方式为: http://192.168.2.18:9200/_plugin/head,因为集群中如今暂时没有数据,因此显示为空,
此时,es集群的部署完成。
在webserve1上面安装Logstassh
1.downloads 软件包 ,这里注意,Logstash是须要依赖java环境的,因此这里仍是须要yum install -y java-1.8.0.
[root@webserver1 ~]# wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz
[root@webserver1 ~]# tar -xf logstash-2.0.0.tar.gz -C /usr/local
[root@webserver1 ~]# cd /usr/local/
[root@webserver1 local]# ln -sv logstash-2.0.0 logstash
[root@webserver1 local]# mkdir logs etc
2.提供logstash管理脚本,其中里面的配置路径可根据实际状况修改
#!/bin/bash
#chkconfig: 2345 55 24
#description: logstash service manager
#auto: Maoqiu Guo
FILE='/usr/local/logstash/etc/*.conf' #logstash配置文件
LOGBIN='/usr/local/logstash/bin/logstash agent --verbose --config' #指定logstash配置文件的命令
LOCK='/usr/local/logstash/locks' #用锁文件配合服务启动与关闭
LOGLOG='--log /usr/local/logstash/logs/stdou.log' #日志
START() {
if [ -f $LOCK ];then
echo -e "Logstash is already \033[32mrunning\033[0m, do nothing."
else
echo -e "Start logstash service.\033[32mdone\033[m"
nohup ${LOGBIN} ${FILE} ${LOGLOG} &
touch $LOCK
fi
}
STOP() {
if [ ! -f $LOCK ];then
echo -e "Logstash is already stop, do nothing."
else
echo -e "Stop logstash serivce \033[32mdone\033[m"
rm -rf $LOCK
ps -ef | grep logstash | grep -v "grep" | awk '{print $2}' | xargs kill -s 9 >/dev/null
fi
}
STATUS() {
ps aux | grep logstash | grep -v "grep" >/dev/null
if [ -f $LOCK ] && [ $? -eq 0 ]; then
echo -e "Logstash is: \033[32mrunning\033[0m..."
else
echo -e "Logstash is: \033[31mstopped\033[0m..."
fi
}
TEST(){
${LOGBIN} ${FILE} --configtest
}
case "$1" in
start)
START
;;
stop)
STOP
;;
status)
STATUS
;;
restart)
STOP
sleep 2
START
;;
test)
TEST
;;
*)
echo "Usage: /etc/init.d/logstash (test|start|stop|status|restart)"
;;
esac
3.Logstash 向es集群写数据
(1)编写一个logstash配置文件
[root@webserver1 etc]# cat logstash.conf
input { #数据的输入从标准输入
stdin {}
}
output { #数据的输出咱们指向了es集群
elasticsearch {
hosts => ["192.168.2.18:9200","192.168.2.19:9200"] #es主机的ip及端口
}
}
[root@webserver1 etc]#
(2)检查配置文件是否有语法错
[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose
Configuration OK
[root@webserver1 etc]#
(3)既然配置ok咱们手动启动它,而后写点东西看可否写到es
ok.上图已经看到logstash已经能够正常的工做啦.
4.下面演示一下如何收集系统日志
将以前的配置文件修改以下所示内容,而后启动logstash服务就能够在web页面中看到messages的日志写入es,而且建立了一条索引
[root@webserver1 etc]# cat logstash.conf
input { #这里的输入使用的文件,即日志文件messsages
file {
path => "/var/log/messages" #这是日志文件的绝对路径
start_position => "beginning" #这个表示从messages的第一行读取,即文件开始处
}
}
output { #输出到es
elasticsearch {
hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
index => "system-messages-%{+YYYY-MM}" #这里将按照这个索引格式来建立索引
}
}
[root@webserver1 etc]#
启动logstash后,咱们来看head这个插件的web页面
ok,系统日志咱们已经成功的收集,而且已经写入到es集群中,那上面的演示是logstash直接将日志写入到es集群中的,这种场合我以为若是量不是很大的话直接像上面已将将输出output定义到es集群便可,若是量大的话须要加上消息队列来缓解es集群的压力。前面已经提到了我这边以前使用的是单台redis做为消息队列,可是redis不能做为list类型的集群,也就是redis单点的问题无法解决,因此这里我选用了kafka ;下面就在三台server上面安装kafka集群
在搭建kafka集群时,须要提早安装zookeeper集群,固然kafka已经自带zookeeper程序只须要解压而且安装配置就好了
kafka1上面的配置:
1.获取软件包.官网:http://kafka.apache.org
[root@kafka1 ~]# wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
[root@kafka1 ~]# tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/
[root@kafka1 ~]# cd /usr/local/
[root@kafka1 local]# ln -sv kafka_2.11-0.8.2.1 kafka
2.配置zookeeper集群,修改配置文件
[root@kafka1 ~]# vim /usr/local/kafka/config/zookeeper.propertie
dataDir=/data/zookeeper
clienrtPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.2=192.168.2.22:2888:3888
server.3=192.168.2.23:2888:3888
server.4=192.168.2.24:2888:3888
#说明:
tickTime: 这个时间是做为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每一个 tickTime 时间就会发送一个心跳。
2888端口:表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
3888端口:表示的是万一集群中的 Leader 服务器挂了,须要一个端口来从新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通讯的端口。
3.建立zookeeper所须要的目录
[root@kafka1 ~]# mkdir /data/zookeeper
4.在/data/zookeeper目录下建立myid文件,里面的内容为数字,用于标识主机,若是这个文件没有的话,zookeeper是无法启动的哦
[root@kafka1 ~]# echo 2 > /data/zookeeper/myid
以上就是zookeeper集群的配置,下面等我配置好kafka以后直接复制到其余两个节点便可
5.kafka配置
[root@kafka1 ~]# vim /usr/local/kafka/config/server.properties
broker.id=2 # 惟一,填数字,本文中分别为2/3/4
prot=9092 # 这个broker监听的端口
host.name=192.168.2.22 # 惟一,填服务器IP
log.dir=/data/kafka-logs # 该目录能够不用提早建立,在启动时本身会建立
zookeeper.connect=192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181 #这个就是zookeeper的ip及端口
num.partitions=16 # 须要配置较大 分片影响读写速度
log.dirs=/data/kafka-logs # 数据目录也要单独配置磁盘较大的地方
log.retention.hours=168 # 时间按需求保留过时时间 避免磁盘满
6.将kafka(zookeeper)的程序目录所有拷贝至其余两个节点
[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.23:/usr/local/
[root@kafka1 ~]# scp -r /usr/local/kafka 192.168.2.24:/usr/local/
7.修改两个借点的配置,注意这里除了如下两点不一样外,都是相同的配置
(1)zookeeper的配置
mkdir /data/zookeeper
echo "x" > /data/zookeeper/myid
(2)kafka的配置
broker.id=2
host.name=192.168.2.22
8.修改完毕配置以后咱们就能够启动了,这里先要启动zookeeper集群,才能启动kafka
咱们按照顺序来,kafka1 –> kafka2 –>kafka3
[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties & #zookeeper启动命令
[root@kafka1 ~]# /usr/local/kafka/bin/zookeeper-server-stop.sh #zookeeper中止的命令
注意,若是zookeeper有问题 nohup的日志文件会很是大,把磁盘占满,这个zookeeper服务能够经过本身些服务脚原本管理服务的启动与关闭。
后面两台执行相同操做,在启动过程中会出现如下报错信息
[2015-11-13 19:18:04,225] WARN Cannot open channel to 3 at election address /192.168.2.23:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2015-11-13 19:18:04,232] WARN Cannot open channel to 4 at election address /192.168.2.24:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2015-11-13 19:18:04,233] INFO Notification time out: 6400 (org.apache.zookeeper.server.quorum.FastLeaderElection)
因为zookeeper集群在启动的时候,每一个结点都试图去链接集群中的其它结点,先启动的确定连不上后面还没启动的,因此上面日志前面部分的异常是能够忽略的。经过后面部分能够看到,集群在选出一个Leader后,最后稳定了。
其余节点也可能会出现相似的状况,属于正常。
9.zookeeper服务检查
[root@kafka1~]# netstat -nlpt | grep -E "2181|2888|3888"
tcp 0 0 192.168.2.24:3888 0.0.0.0:* LISTEN 1959/java
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 1959/java
[root@kafka2 ~]# netstat -nlpt | grep -E "2181|2888|3888"
tcp 0 0 192.168.2.23:3888 0.0.0.0:* LISTEN 1723/java
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 1723/java
[root@kafka3 ~]# netstat -nlpt | grep -E "2181|2888|3888"
tcp 0 0 192.168.2.24:3888 0.0.0.0:* LISTEN 950/java
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 950/java
tcp 0 0 192.168.2.24:2888 0.0.0.0:* LISTEN 950/java
#能够看出,若是哪台是Leader,那么它就拥有2888这个端口
ok. 这时候zookeeper集群已经启动起来了,下面启动kafka,也是依次按照顺序启动
[root@kafka1 ~]# nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & #kafka启动的命令
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-server-stop.sh #kafka中止的命令
注意,跟zookeeper服务同样,若是kafka有问题 nohup的日志文件会很是大,把磁盘占满,这个kafka服务一样能够经过本身些服务脚原本管理服务的启动与关闭。
此时三台上面的zookeeper及kafka都已经启动完毕,来检测如下吧
(1)创建一个主题
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic summer
#注意:factor大小不能超过broker数
(2)查看有哪些主题已经建立
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181 #列出集群中全部的topic
summer #已经建立成功
(3)查看summer这个主题的详情
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic summer
Topic:summer PartitionCount:1 ReplicationFactor:3 Configs:
Topic: summer Partition: 0 Leader: 2 Replicas: 2,4,3 Isr: 2,4,3
#主题名称:summer
#Partition:只有一个,从0开始
#leader :id为2的broker
#Replicas 副本存在于broker id为2,3,4的上面
#Isr:活跃状态的broker
(4)发送消息,这里使用的是生产者角色
[root@kafka1 ~]# /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.22:9092 --topic summer
This is a messages
welcome to kafka
(5)接收消息,这里使用的是消费者角色
[root@kafka2 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.2.24:2181 --topic summer --from-beginning
This is a messages
welcome to kafka
若是可以像上面同样可以接收到生产者发过来的消息,那说明基于kafka的zookeeper集群就成功啦。
10,下面咱们将webserver1上面的logstash的输出改到kafka上面,将数据写入到kafka中
(1)修改webserver1上面的logstash配置,以下所示:各个参数能够到官网查询.
root@webserver1 etc]# cat logstash.conf
input { #这里的输入仍是定义的是从日志文件输入
file {
type => "system-message"
path => "/var/log/messages"
start_position => "beginning"
}
}
output {
#stdout { codec => rubydebug } #这是标准输出到终端,能够用于调试看有没有输出,注意输出的方向能够有多个
kafka { #输出到kafka
bootstrap_servers => "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092" #他们就是生产者
topic_id => "system-messages" #这个将做为主题的名称,将会自动建立
compression_type => "snappy" #压缩类型
}
}
[root@webserver1 etc]#
(2)配置检测
[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose
Configuration OK
[root@webserver1 etc]#
(2)启动Logstash,这里我直接在命令行执行便可
[root@webserver1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf
(3)验证数据是否写入到kafka,这里咱们检查是否生成了一个叫system-messages的主题
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181
summer
system-messages #能够看到这个主题已经生成了
#再看看这个主题的详情:
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic system-messages
Topic:system-messages PartitionCount:16 ReplicationFactor:1 Configs:
Topic: system-messages Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: system-messages Partition: 1 Leader: 3 Replicas: 3 Isr: 3
Topic: system-messages Partition: 2 Leader: 4 Replicas: 4 Isr: 4
Topic: system-messages Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: system-messages Partition: 4 Leader: 3 Replicas: 3 Isr: 3
Topic: system-messages Partition: 5 Leader: 4 Replicas: 4 Isr: 4
Topic: system-messages Partition: 6 Leader: 2 Replicas: 2 Isr: 2
Topic: system-messages Partition: 7 Leader: 3 Replicas: 3 Isr: 3
Topic: system-messages Partition: 8 Leader: 4 Replicas: 4 Isr: 4
Topic: system-messages Partition: 9 Leader: 2 Replicas: 2 Isr: 2
Topic: system-messages Partition: 10 Leader: 3 Replicas: 3 Isr: 3
Topic: system-messages Partition: 11 Leader: 4 Replicas: 4 Isr: 4
Topic: system-messages Partition: 12 Leader: 2 Replicas: 2 Isr: 2
Topic: system-messages Partition: 13 Leader: 3 Replicas: 3 Isr: 3
Topic: system-messages Partition: 14 Leader: 4 Replicas: 4 Isr: 4
Topic: system-messages Partition: 15 Leader: 2 Replicas: 2 Isr: 2
[root@kafka1 ~]#
能够看出,这个主题生成了16个分区,每一个分区都有对应本身的Leader,可是我想要有10个分区,3个副本如何办?仍是跟咱们上面同样命令行来建立主题就行,固然对于logstash输出的咱们也能够提早先定义主题,而后启动logstash 直接往定义好的主题写数据就行啦,命令以下:
[root@kafka1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME
好了,咱们将logstash收集到的数据写入到了kafka中了,在实验过程当中我使用while脚本测试了若是不断的往kafka写数据的同时停掉两个节点,数据写入没有任何问题。
那如何将数据从kafka中读取而后给咱们的es集群呢?那下面咱们在kafka集群上安装Logstash,安装步骤再也不赘述;三台上面的logstash 的配置以下,做用是将kafka集群的数据读取而后转交给es集群,这里为了测试我让他新建一个索引文件,注意这里的输入日志仍是messages,主题名称仍是“system-messages”
[root@kafka1 etc]# more logstash.conf
input {
kafka {
zk_connect => "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181" #消费者们
topic_id => "system-messages"
codec => plain
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
output {
elasticsearch {
hosts => ["192.168.2.18:9200","192.168.2.19:9200"]
index => "test-system-messages-%{+YYYY-MM}" #为了区分以前实验,我这里新生成的因此名字为“test-system-messages-%{+YYYY-MM}”
}
}
在三台kafka上面启动Logstash,注意我这里是在命令行启动的;
[root@kafka1 etc]# pwd
/usr/local/logstash/etc
[root@kafka1 etc]# /usr/local/logstash/bin/logstash -f logstash.conf
[root@kafka2 etc]# pwd
/usr/local/logstash/etc
[root@kafka2 etc]# /usr/local/logstash/bin/logstash -f logstash.conf
[root@kafka3 etc]# pwd
/usr/local/logstash/etc
[root@kafka3 etc]# /usr/local/logstash/bin/logstash -f logstash.conf
在webserver1上写入测试内容,即webserver1上面利用message这个文件来测试,我先将其清空,而后启动
[root@webserver1 etc]# >/var/log/messages
[root@webserver1 etc]# echo "我将经过kafka集群达到es集群哦^0^" >> /var/log/messages
#启动logstash,让其读取messages中的内容
下图为我在客户端写入到kafka集群的同时也将其输入到终端,这里写入了三条内容
而下面三张图侧能够看出,三台Logstash 很平均的从kafka集群当中读取出来了日志内容
再来看看咱们的es管理界面
ok ,看到了吧,
流程差很少就是下面 酱紫咯
因为篇幅较长,我将
4.Kibana部署;
5.Nginx负载均衡Kibana请求;
6.案例:nginx日志收集以及MySQL慢日志收集;
7.Kibana报表基本使用;
转自
企业日志大数据分析系统ELK+KAFKA实现http://mp.weixin.qq.com/s?__biz=MzIyMDA1MzgyNw==&mid=2651969453&idx=1&sn=1c4d7d7b55deea7300f3b3701fbcdc89&chksm=8c349f81bb431697da71338a862c9528b5cd1c176129424a8b70548ae9c44017289650ac4d61&scene=0#rd