在项目应用运行的过程当中,每每会产生大量的日志,咱们每每须要根据日志来定位分析咱们的服务器项目运行状况与BUG产生位置。通常状况下直接在日志文件中tailf、 grep、awk 就能够得到本身想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量过大、文本搜索太慢、如何多维度查询。这就须要对服务器上的日志收集汇总。常看法决思路是创建集中式日志收集系统,将全部节点上的日志统一收集,管理,访问。前端
通常大型系统每每是一种分布式部署的架构,不一样的服务模块部署在不一样的服务器上,问题出现时,大部分状况须要根据问题暴露的关键信息,定位到具体的服务器和服务模块,因此构建一套集中式日志系统,能够提升定位问题的效率。 一个完整的集中式日志系统,须要包含如下几个主要特色:java
ELK提供了一整套解决方案,而且都是开源软件,之间互相配合使用,完美衔接,高效的知足了不少场合的应用。是目前主流的一种日志系统。node
ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。其中后来新增了一个FileBeat。nginx
即ELK主要由Elasticsearch(搜索)、Logstash(收集与分析)和Kibana(展现)三部分组件组成;git
其中各组件说明以下:github
Filebeat:轻量级数据收集引擎。早期的ELK架构中使用Logstash收集、解析日志,可是Logstash对内存、cpu、io等资源消耗比较高。若是用它来对服务器进行日志收集,将加剧服务器的负载。相比 Logstash,Beats所占系统的CPU和内存几乎能够忽略不计,因此filebeat做为一个轻量级的日志收集处理工具(Agent),它能够用来替代Logstash,因为其占用资源少,因此更适合于在各个服务器上搜集日志后传输给Logstash,这也是官方推荐的一种作法。【收集日志】web
Logstash:数据收集处理引擎。支持动态的从各类数据源搜集数据,并对数据进行过滤、分析、丰富、统一格式等操做,而后存储以供后续使用。【对日志进行过滤、分析】
Elasticsearch:分布式搜索引擎。是基于Lucene的开源分布式搜索服务器,具备高可伸缩、高可靠、易管理等特色。能够用于全文检索、结构化检索和分析,并能将这三者结合起来。【搜集、分析、存储数据】正则表达式
Kibana:可视化平台。它可以搜索、展现存储在 Elasticsearch 中索引数据。使用它能够很方便的用图表、表格、地图展现和分析数据。【图形化展现日志】redis
结合以上,常见的日志系统架构图以下:docker
如上图所示,日志文件分别由filebeat在服务器上进行收集,收集的日志文件汇总到logstash上并对文件数据进行过滤、分析、丰富、统一格式等操做,而后发送到Elasticsearch,进一步对日志进行结构化检索和分析,并存储下来,最后由kibana进行展现。
这只是日志系统的一种最初级结构,生产环境中须要对此结构进行进一步的优化。
环境:CentOS7.5部署ELK 7版本
准备工做: CentOS7.5
elasticsearch7, logstash7, kibana7
关闭防火墙和SELinux并更新yum源(非必须):yum -y update
本次分布式部署的ELK版本为2019年五月份左右最新发布的版本,更新了许多新特性,后面将详细说明。在安装上面本次极大简化了安装步骤,能够源码,组件安装,本次采用YUM+插件docker安装,能达到相同的效果。
#!/bin/bash
# 安装Filebeat
rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
cat >/etc/yum.repos.d/elk-elasticsearch.repo<<EOF
[elastic-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
yum -y install filebeat
systemctl enable filebeat
systemctl start filebeat
1. java环境(7版本自带Java)
yum -y install java
2. 导入elasticsearch PGP key文件
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
3. 配置yum源
vim /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
4. yum -y install elasticsearch logstash kibana
##########关于安装环境,也能够参照如下二种方式,也比较简便#################
----即经过RPM包的方式,此处给出Filebeat的方式,其余组件雷同
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.0.0-x86_64.rpm
sudo rpm -vi filebeat-7.0.0-x86_64.rpm
----Logstash实例另外一种方式(优先采用)
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.rpm
yum install -y java
yum install -y logstash-7.0.0.rpm
##服务启动前必看,因为配置文件在下一章节给出,此处是前提准备,等配置完成后再启动##
在其安装代码的plugins目录,即/usr/share/elasticsearch/plugins,须要增长中文分词插件。
[root@192-168-108-35 plugins]# ll
total 3192
[root@192-168-108-35 plugins]# pwd
/usr/share/elasticsearch/plugins
[root@192-168-108-35 plugins]# wget https://codeload.github.com/medcl/elasticsearch-analysis-ik/tar.gz/v7.0.0
下载后重启时须要删除插件源文件,不然报错以下:
Caused by: java.nio.file.FileSystemException: /usr/share/elasticsearch/plugins/v7.0.0/plugin-descriptor.properties: Not a directory
因此安装完成后必须删除源文件v7.0.0
-- 首先在安装完毕后会生成不少文件,包括配置文件日志文件等等,下面几个是最主要的配置文件路径
/etc/elasticsearch/elasticsearch.yml # els的配置文件
/etc/elasticsearch/jvm.options # JVM相关的配置,内存大小等等
/etc/elasticsearch/log4j2.properties # 日志系统定义
/usr/share/elasticsearch # elasticsearch 默认安装目录
/var/lib/elasticsearch # 数据的默认存放位置
-- 建立用于存放数据与日志的目录
数据文件会随着系统的运行飞速增加,因此默认的日志文件与数据文件的路径不能知足咱们的需求,那么手动建立日志与数据文件路径。
mkdir -p /data/elkdata
mkdir -p /data/elklogs
-- JVM配置 (7.0版本针对此作出优化,能够基本保障溢出问题,但最好设置一下)
因为Elasticsearch是Java开发的,因此能够经过/etc/elasticsearch/jvm.options配置文件来设定JVM的相关设定。若是没有特殊需求按默认便可。
不过其中仍是有两项最重要的-Xmx1g与-Xms1gJVM的最大最小内存。若是过小会致使Elasticsearch刚刚启动就马上中止。太大会拖慢系统自己。
vim /etc/elasticsearch/jvm.options # JVM最大、最小使用内存
-Xms1g
-Xmx1g
-- 使用ROOT帐户执行命令
elasticsearch的相关配置已经完成,下面须要启动elasticsearch集群。可是因为安全的考虑,elasticsearch不容许使用root用户来启动,因此须要建立一个新的用户,并为这个帐户赋予相应的权限来启动elasticsearch集群。
建立ES运行用户
# 建立用户组 group add elk
# 建立用户并添加至用户组 useradd elk -g elk
# (可选)更改用户密码
passwd elasticsearch
同时修改ES目录权限, 如下操做都是为了赋予es用户操做权限,
-- 安装源码文件目录
[root@192-168-108-35 share]# chown -R elk :elk /usr/share/elasticsearch/
[root@192-168-108-35 share]# chown -R elk :elk /var/log/elasticsearch/
[root@192-168-108-35 share]# chown -R elk :elk /data
-- 运行常见的报错信息
[1]文件数目不足
# 修改系统配置文件属性
# vim /etc/security/limits.conf 添加
elk soft memlock unlimited
elk hard memlock unlimited
elk soft nofile 65536
elk hard nofile 131072
退出用户从新登陆,使配置生效
[2]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
解决办法:
在 /etc/sysctl.conf文件最后添加一行
vm.max_map_count=262144
便可永久修改
前台启动服务
# 需切换为es用户
su elk
# 启动服务(当前的路径为:/usr/share/elasticsearch/)
./bin/elasticsearch
后台运行ES
能够加入-p 命令 让es在后台运行, -p 参数 记录进程ID为一个文件
# 设置后台启动
./bin/elasticsearch -p ./elasticsearch-pid -d
通常状况下,直接执行一下命令便可
./bin/elasticsearch -d
结束进程
# 查看运行的pid,并查杀 cat /tmp/elasticsearch-pid && echo
kill -SIGTERM {pid}
# 暴力结束进程(另外一种方式)
kill -9 `ps -ef |grep elasticsearch|awk '{print $2}' `
验证一下服务是否正常
curl -i "http://192.168.60.200:9200"
【1】使用docker的集成好的elasticsearch-head
# docker run -p 9100:9100 mobz/elasticsearch-head:5
docker容器下载成功并启动之后,运行浏览器打开http://localhost:9100/
【2】使用git安装elasticsearch-head
# yum install -y npm
# git clone git://github.com/mobz/elasticsearch-head.git
# cd elasticsearch-head
# npm install
# npm run start
检查端口是否起来
netstat -antp |grep 9100
浏览器访问测试是否正常
http://IP:9100/
【注意】因为elasticsearch-head:5镜像对elasticsearch的7版本适配性未经检测。这里也推荐推荐另一个镜像lmenezes/cerebro,下载后执行
docker run -d -p 9000:9000 lmenezes/cerebro
就能够在9000端口查看了。
最后,filebeat logstash kibana能够在配置文件后,正常启动,若是须要切换用户的话,也能够参照上面。本次这三个组件所有默认用root用户启动。因为分开部署,与ES互不影响。
1.zookeeper集群环境,本次采用Kafka自带的Zookeeper环境。
kafka是依赖于zookeeper注册中心的一款分布式消息对列,因此须要有zookeeper单机或者集群环境。
2.三台服务器:
192.168.108.200 ELK-kafka-cluster
192.168.108.165 ELK-kafka-salve1
192.168.108.103 ELK-kafka-salve2
3.下载kafka安装包
在 http://kafka.apache.org/downloads 中下载,目前最新版本的kafka已经到2.2.0,这里下载的是kafka_2.11-2.2.0.tgz
1.上传压缩包到三台服务器解压缩到/opt/目录下
tar -zxvf kafka_2.11-2.2.0.tgz -C /opt/
ls -s kafka_2.11-2.2.0 kafka
2.修改 server.properties (/opt/kafka/config目录下)
############################# Server Basics #############################
######################## Socket Server Settings ########################
listeners=PLAINTEXT://192.168.108.200:9092
advertised.listeners=PLAINTEXT://192.168.108.200:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/var/log/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
######################## Internal Topic Settings #########################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
######################### Log Retention Policy ########################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true
######################## Group Coordinator Settings ##########################
group.initial.rebalance.delay.ms=0
3.拷贝两份到192.168.108.165和192.168.108.103
[root@192.168.108.165 config]# cat server.properties
listeners=PLAINTEXT://192.168.108.165:9092
advertised.listeners=PLAINTEXT://192.168.108.165:9092
[root@k8s-n3 config]# cat server.properties
listeners=PLAINTEXT://192.168.108.103:9092
advertised.listeners=PLAINTEXT://192.168.108.103:9092
而后添加环境变量 在/etc/profile 中添加
export ZOOKEEPER_HOME=/opt/kafka
export PATH=$PATH:$ZOOKEEPER_HOME/bin
source /etc/profile 重载生效
4.修改zookeeper.properties:
一、设置链接参数,添加以下配置
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
二、设置broker Id的服务地址
server.0=192.168.108.200:2888:3888
server.1=192.168.108.165:2888:3888
server.2=192.168.108.103:2888:3888
总结就在Kafka源码目录的/kafka/config/zookeeper.properties文件设置以下
而后在zookeeper数据目录添加id配置(dataDir=/opt/zookeeper)
在zookeeper.properties的数据目录中建立myid文件
在各台服务的zookeeper数据目录添加myid文件,写入服务broker.id属性值,如这里的目录是/usr/local/zookeeper/data
第一台broker.id为0的服务到该目录下执行:echo 0 > myid
[root@192-168-108-165 kafka]# cd /opt/zookeeper/
[root@192-168-108-165 zookeeper]# ls
myid version-2
[root@192-168-108-165 zookeeper]# cat myid
1
[root@192-168-108-165 zookeeper]#
集群启动:cd /opt/kafka
先分别启动zookeeper
kafka使用到了zookeeper,所以你要首先启动一个zookeeper服务,若是你没有zookeeper服务。kafka中打包好了一个简洁版的单节点zookeeper实例。
kafka启动时先启动zookeeper,再启动kafka;关闭时相反,先关闭kafka,再关闭zookeeper
前台启动:bin/zookeeper-server-start.sh config/zookeeper.properties
[root@192-168-108-200 ~]# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 5197 root 98u IPv6 527171 0t0 TCP *:eforward (LISTEN)
能够看到已经启动
后台启动:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
再分别启动kafka
前台启动:bin/kafka-server-start.sh config/server.properties
[root@192-168-108-200 ~]# lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 5519 root 157u IPv6 528511 0t0 TCP 192-168-108-200:XmlIpcRegSvc (LISTEN)
java 5519 root 167u IPv6 528515 0t0 TCP 192-168-108-200:XmlIpcRegSvc->192.168.108.191:56656 (ESTABLISHED)
java 5519 root 169u IPv6 528516 0t0 TCP 192-168-108-200:XmlIpcRegSvc->192.168.108.187:49368 (ESTABLISHED)
java 5519 root 176u IPv6 528096 0t0 TCP 192-168-108-200:48062->192-168-108-200:XmlIpcRegSvc (ESTABLISHED)
java 5519 root 177u IPv6 528518 0t0 TCP 192-168-108-200:XmlIpcRegSvc->192-168-108-200:48062 (ESTABLISHED)
java 5519 root 178u IPv6 528097 0t0 TCP 192-168-108-200:XmlIpcRegSvc->192.168.108.187:49370 (ESTABLISHED)
后台启动:bin/kafka-server-start.sh -daemon config/server.properties
服务启动脚本
cd /opt/kafka
kill -9 `ps -ef |grep kafka|awk '{print $2}' `
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
/opt/kafka/bin
1.建立topic:
kafka-topics.sh --create --zookeeper 192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181 --replication-factor 3 --partitions 3 --topic test
2.显示topic
kafka-topics.sh --describe --zookeeper 192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181 --topic test
[root@192-168-108-200 bin]# kafka-topics.sh --describe --zookeeper 192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181 --topic tyun
Topic:tyun PartitionCount:3 ReplicationFactor:3 Configs:
Topic: tyun Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: tyun Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: tyun Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
PartitionCount:partition个数
ReplicationFactor:副本个数
Partition:partition编号,从0开始递增
Leader:当前partition起做用的broker.id
Replicas: 当前副本数据所在的broker.id,是一个列表,排在最前面的其做用
Isr:当前kakfa集群中可用的broker.id列表
3.列出topic
kafka-topics.sh --list --zookeeper 192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181
test
建立 producer(生产者);
kafka-console-producer.sh --broker-list 192.168.108.200:9092 --topic test
hello
建立 consumer(消费者)
kafka-console-consumer.sh --bootstrap-server 192.168.108.200:9092 --topic test --from-beginning
hello
4.查看写入kafka集群中的消息(重要命令,判断日志是否写入Kafka的重要依据)
bin/kafka-console-consumer.sh --bootstrap-server 192.168.108.200:9092 --topic tiops --from-beginning
5.删除 Topic----命令标记删除后,再次删除对应的数据目录
bin/kafka-topics.sh --delete --zookeeper master:2181,slave1:2181,slave2:2181 --topic topic_name
若 delete.topic.enable=true
直接完全删除该 Topic。
若 delete.topic.enable=false
若是当前 Topic 没有使用过即没有传输过信息:能够完全删除。
若是当前 Topic 有使用过即有过传输过信息:并无真正删除 Topic 只是把这个 Topic 标记为删除(marked for deletion),重启 Kafka Server 后删除。
注:delete.topic.enable=true 配置信息位于配置文件 config/server.properties 中(较新的版本中无显式配置,默认为 true)。
整个系统一共含有10台主机(filebeat部署在客户端,不计算在内),其中Logstash有四台,Elasticsearch有二台,Kafka集群三台,kibana一台并配置Nginx代理。
架构解释:
(1)首先用户经过nginx代理访问ELK日志统计平台,这里的Nginx能够设置界面密码。
(2)Nginx将请求转发到kibana
(3)kibana到Elasticsearch中去获取数据,这里的Elasticsearch是两台作的集群,日志数据会随机保存在任意一台Elasticsearch服务器。
(4)Logstash1从Kafka中取出数据并发送到Elasticsearch中。
(5)Kafka服务器作日志数据的持久化保存,避免web服务器日志量过大的时候形成的数据收集与保存不一致而致使日志丢失,其中Kafka能够作集群,而后再由Logstash服务器从Kafka持续的取出数据。
(6)logstash2从Filebeat取出的日志信息,并放入Kafka中进行保存。
(7)Filebeat在客户端进行日志的收集。
注1:【Kafka的加入缘由与做用】
整个架构加入Kafka,是为了让整个系统更好的分层,Kafka做为一个消息流处理与持久化存储软件,可以帮助咱们在主节点上屏蔽掉多个从节点之间不一样日志文件的差别,负责管理日志端(从节点)的人能够专一于向 Kafka里生产数据,而负责数据分析聚合端的人则能够专一于从 Kafka内消费数据。因此部署时要把Kafka加进去。
并且使用Kafka进行日志传输的缘由还在于其有数据缓存的能力,而且它的数据可重复消费,Kafka自己具备高可用性,可以很好的防止数据丢失,它的吞吐量相对来讲比较好而且使用普遍。能够有效防止日志丢失和防止logsthash挂掉。综合来讲:它均衡了网络传输,从而下降了网络闭塞,尤为是丢失数据的可能性,
注2:【双层的Logstash做用】
这里为何要在Kafka前面增长二台logstash呢?是由于在大量的日志数据写入时,容易致使数据的丢失和混乱,为了解决这一问题,增长二台logstash能够经过类型进行汇总分类,下降数据传输的臃肿。
若是只有一层的Logstash,它将处理来自不一样客户端Filebeat收集的日志信息汇总,而且进行处理分析,在必定程度上会形成在大规模日志数据下信息的处理混乱,并严重加深负载,因此有二层的结构进行负载均衡处理,而且职责分工,一层汇聚简单分流,一层分析过滤处理信息,而且内层都有二台Logstash来保障服务的高可用性,以此提高整个架构的稳定性。
接下来分别说明原理与各个组件之间的交互(配置文件)。
这里为了方便记忆,把此处的Logstash称为Logstash-collect,首先看Filebeat的配置文件:
其中,filebeat配置输出到logstash:5044端口,在logstash上启动5044端口做为logstash与filebeat的通讯agent,而logstash自己服务起在9600端口
[root@192-168-108-191 logstash]# ss -lnt
State Recv-Q Send-Q Local Address:Port Peer Address:Port
LISTEN 0 128 :::5044 :::*
LISTEN 0 50 ::ffff:192.168.108.191:9600 :::*
vim /etc/filebeat/filebeat.yml # 对几个重要配置进行设置以下
- type: log
# Change to true to enable this input configuration.
enabled: true
# Paths that should be crawled and fetched. Glob based paths.
paths:
- /var/log/*.log
- /var/log/tiops/**/*.log # 即tiops平台的全部日志位置, 指定数据的输入路径为/tiops/**/*.log结尾的全部文件,注意/tiops/子目录下的日志不会被读取,孙子目录下的日志能够
#- c:\programdata\elasticsearch\logs\*
fields:
service: filebeat
multiline: # 多行日志合并为一行,适用于日志中每一条日志占据多行的状况,好比各类语言的报错信息调用栈。
pattern: ‘^\[‘
negate: true
match: after
# Exclude lines. A list of regular expressions to match. It drops the lines that are
# 删除以DBG开头的行:
exclude_lines: ['^DBG']
# filebeat的日志可发送到logstash、elasticsearch、kibana,选择一个便可,本次默认选择logstash,其余二个能够注释
output.logstash:
# The Logstash hosts
hosts: ["192.168.108.191:5044", "192.168.108.87:5044"] # 发往二台Logstash-collect
loadbalance: true
worker: 2
#output.elasticsearch:
#hosts: ["http://192.168.108.35:9200"]
#username: "elk"
#password: ""
#setup.kibana:
#host: "192.168.108.182:5601"
注1:【Filebeat与Logstash-collect的日志消息流转】
注意事项:Logstash-collect怎么接收到Filebeat发送的消息,并再次发向下一级呢?
不只仅配置发往二台Logstash-collect的ip地址加端口,还要注意fields字段,能够理解为是一个Key,当Logstash-collect收到多个Key时,它能够选择其中一个或者多个Key来进行下一级发送,达到日志消息的转发。
在使用了6.2.3版本的ELK之后,若是使用if [type]配置,则匹配不到在filebeat里面使用document_type定义的字符串。由于6.0版本以上已经取消了document_type的定义。若是要实现以上的配置只能使用以下配置:
fields:
service: filebeat
service : filebeat都是本身定义的,定义完成后使用Logstash的if 判断,条件为if [fields][service] == "filebeat".就能够了,具体能够看下面的转发策略。
注2:【Filebeat负载平衡主机的输出】
这里还有一点须要说明一下,就是关于Filebeat与Logstash-collect链接的负载均衡设置。
logstash是一个无状态的流处理软件。logstash怎么集群配置只能横向扩展,而后本身用配置管理工具分发,由于他们内部并无交流的。
Filebeat提供配置选项,可使用它来调整负载平衡时发送消息到多个主机。要启用负载均衡,您指定loadbalance的值为true。
output.logstash:
# The Logstash hosts
hosts: ["192.168.108.191:5044", "192.168.108.87:5044"] # 发往二台Logstash-collect
loadbalance: true
worker: 2
loadbalance: false # 消息只是往一个logstash里发,若是这个logstash挂了,就会自动将数据发到另外一个logstash中。(主备模式)
loadbalance: true # 若是为true,则将数据均分到各个logstash中,挂了就不发了,往存活的logstash里面发送。
即logstash地址若是为一个列表,若是loadbalance开启,则负载到里表中的服务器,当一个logstash服务器不可达,事件将被分发到可到达的logstash服务器(双活模式)
loadbalance选项可供 Redis,Logstash, Elasticsearch输出。Kafka的输出能够在其自身内部处理负载平衡。
同时每一个主机负载平衡器还支持多个workers。默认是1。若是你增长workers的数量, 将使用额外的网络链接。workers参与负载平衡的总数=主机数量*workers。
在这个小节,配置整体结构,与日志数据流向以下图:
首先看/etc/logstash/logstash.yml文件,做为Logstash-collect的公共配置文件,咱们须要作一下的更改:
vim /etc/logstash/logstash.yml
注意上面配置文件的读取路径 /etc/logstash/conf.d,在这个文件夹里面,咱们能够设置Logstash的输入输出。logstash支持把配置写入文件/etc/logstash/conf.d/xxx.conf,而后经过读取配置文件来采集数据。
logstash收集日志基本流程: input–>codec–>filter–>codec–>output ,因此配置文件能够设置输入输出与过滤的基本格式以下:(这里暂时忽略filter,由于这一层的Logstash主要汇聚数据,暂不分析匹配)
关于codec => json # json处理
logstash最终会把数据封装成json类型,默认会添加@timestamp时间字段、host主机字段、type字段。原消息数据会整个封装进message字段。若是数据处理过程当中,用户解析添加了多个字段,则最终结果又会多出多个字段。也能够在数据处理过程当中移除多个字段,总之,logstash最终输出的数据格式是json格式。因此数据通过Logstash会增长额外的字段,能够选择过滤。
Logstash主要由 input,filter,output三个组件去完成采集数据。
input {
指定输入
}
filter {
}
output {
指定输出
}
解释说明以下:
input
input组件负责读取数据,能够采用file插件读取本地文本文件,stdin插件读取标准输入数据,tcp插件读取网络数据,log4j插件读取log4j发送过来的数据等等。本次用 beats插件读取Filebeat发送过来的日志消息。
filter
filter插件负责过滤解析input读取的数据,能够用grok插件正则解析数据,date插件解析日期,json插件解析json等等。
output
output插件负责将filter处理过的数据输出。能够用elasticsearch插件输出到es,redis插件输出到redis,stdout插件标准输出,kafka插件输出到kafka等等
在实际的Tiops平台日志处理流程中,配置以下:
二个Logstash-collect均采用此相同的配置
在/etc/logstash/conf.d目录下,建立filebeat-kafka.conf文件,内容以下:
input {
beats {
port => 5044 # logstash上启动5044端口做为logstash与filebeat的通讯agent
codec => json # json处理
}
}
#此处附加Redis做为缓存消息的配置,本次采用Kafka,因此此部分暂时注释
#output {
# if [fields][service] == "filebeat"{
# redis {
# data_type => "list"
# host => "192.168.108.200"
# db => "0"
# port => "6379"
# key => "tiops-tcmp"
# }
# }
#}
output {
if [fields][service] == "filebeat"{ # 参照第一部分,Logstash-collect怎么接收到Filebeat发送的消息,并再次发向下一级呢?这里就是取filebeat的Fields字段的Key值,进行向下发送
kafka {
bootstrap_servers => "192.168.108.200:9092,192.168.108.165:9092,192.168.108.103:9092" # kafka集群配置,所有IP:端口都要加上
topic_id => "tiops" # 设置Kafka的topic,这样发送到kafka中时,kafka就会自动建立该topic
}
}
}
在这个小节,配置整体结构与日志数据流向以下图:
在上面第二部分,咱们已经成功把数据从Logstash-collect发送到Kafka,而Kafka自己的输入输出并无配置。而是Logstash-collect指定了一个topic,这并不表明kafka不须要配置参数,只是kafka自己不须要配置pull/push目标参数,它被动接受
别的生产者发送过来的数据,由于Logstash-collect指定了Kafka集群的IP地址和端口。那么数据将会被发送到Kafka中,那么Kafka如何处理这些日志数据,那么这时候就须要Kafka配置消息处理参数了。
这个咱们如今暂时先给出参数,至于具体缘由涉及到MQ的高可用、消息过时策略、如何保证消息不重复消费、如何保证消息不丢失、如何保证消息按顺序执行以及消息积压在消息队列里怎么办等问题,咱们以后详细分析并解决。
如下是Kafka集群中,某一台的配置文件,在Kafka源码目录的/kafka/config/server.properties文件设置以下:(加粗的为重要配置文件)
同时kafka是依赖于zookeeper注册中心的一款分布式消息队列,因此须要有zookeeper单机或者集群环境。本次采用Kafka自带的Zookeeper环境。
在Kafka源码目录的/kafka/config/zookeeper.properties文件设置以下
其余注意事项,请看以前的Kafka集群安装步骤,这里主要给出集群某一节点的配置文件参数。
到目前为止,咱们配置了kafka的参数,对消息进行了处理(持久化...),接下来就须要对Kafka中的消息进行下一步的传输,即传送到Logstash-grok。
这里须要注意的是:并非Kafka主动把消息发送出去的,固然Kafka也支持这种操做,可是在这里,采用的是Logstash-grok做为消费者,主动拉取Kafka中的消息,来进行消费。
因此就须要像第二步Logstash-collect那样进行输入输出设置,这里Logstash-grok不只仅配置输入输出,最重要的是它的过滤filter做用。
在/etc/logstash/conf.d目录下,建立logstash-es.conf文件,这里给出Logstash-grok中一个的内容以下(二个Logstash-grok配置同样):
#input {
# redis {
# data_type => "list"
# host => "192.168.108.200"
# db => "0"
# port => "6379"
# key => "tiops-tcmp"
# # password => "123456"
# }
#}
input{
kafka{
bootstrap_servers => ["192.168.108.200:9092,192.168.108.165:9092,192.168.108.103:9092"] # 配置拉取的kafka集群消息地址
group_id => "tyun" # 设置 group_id,对于这个消费组而言会有一个消费 offset,消费掉是 auto_commit 部分控制的,这个参数会按期将你消费的 offset 提交到 kafka,下次启动的时候已经消费过的就不会重复消费了;
# 若是想从新消费,能够换一个 group_id 便可
auto_offset_reset => "earliest"
consumer_threads => "3" # 多个实例的consumer_threads数之和应该等于topic分区数近似达到logstash多实例的效果。
decorate_events => "false"
topics => ["tiops"] # kafka topic 名称,获取指定topic内的消息
type => "log"
codec => json
}
}
filter { # 此处Logstash最主要的过滤做用,能够自定义正则表达式,选择性输出消息
grok {
match => {
#截取
"message" => "(?<LOG>(?<=architecture=x86_64}|containerized=true})(.*)/?)"
}
}
}
output { # 过滤后的消息,发送到ES集群
elasticsearch {
hosts => ["192.168.108.35:9200","192.168.108.194:9200"]
codec => json
index => "tiops-%{+YYYY.MM.dd}" # 自定义索引名称
}
在这个小节,配置整体结构与日志数据流向以下图:
ElasticSearch集群
在第三步中,咱们已经将Logstash-grok过滤后的消息,发送到了ES集群,这里的ES集群被动接受发送过来的消息,即ES集群自己不须要配置输入源,已经由其余组件发送决定了。
ES的配置将集中体如今集群自身的配置上。
Elasticsearch 能够横向扩展至数百(甚至数千)的服务器节点,同时能够处理PB级数据
Elasticsearch 天生就是分布式的,而且在设计时屏蔽了分布式的复杂性。
ES功能:(1)分布式的搜索引擎和数据分析引擎
(2)全文检索,结构化检索,数据分析
(3)对海量数据进行近实时的处理
安装后的ES集群,配置在/etc/elasticsearch/elasticsearch.yml文件,集群中,各个节点配置大致相同,不一样之处下面有红字标出,主要就是节点名称与网络监听地址,须要每一个节点自行按实际填写。
可见集群配置中最重要的两项是node.name与network.host,每一个节点都必须不一样。其中node.name是节点名称主要是在Elasticsearch本身的日志加以区分每个节点信息。
discovery.seed_hosts是集群中的节点信息,可使用IP地址、可使用主机名(必须能够解析)。
当ElasticSearch的节点启动后,它会利用多播(multicast)(或者单播),若是用户更改了配置)寻找集群中的其它节点,并与之创建链接。(节点发现)
这里没有配置分片的数目,在ES的7版本中,默认为1,这个包括replicas可后来自定义设置。
在这个小节,配置整体结构与日志数据流向以下图:
Kibana是一个开源的分析和可视化平台,和Elasticsearch一块儿工做时,就能够用Kibana来搜索,查看,并和存储在Elasticsearch索引中的数据进行交互。
能够轻松地执行高级数据分析,而且以各类图标、表格和地图的形式可视化数据。
同时Kibana使得理解大量数据变得很容易。它简单的、基于浏览器的界面使你可以快速建立和共享动态仪表板,实时显示Elasticsearch查询的变化。
在第四步中,咱们已经将ES集群中的数据进行了处理,创建了索引(至关于MySQL的database概念),而且分片、副本等都存在,并且处理后的数据更加方便于检索。
Kibana的配置在/etc/kibana/kibana.yml文件中,以下:
能够看出,这里的Kibana会主动去ES集群中去拉取数据,最后在浏览器Web端进行实时展现。
这时,咱们打开浏览器界面以下,看看最终的结果:
注:Nginx代理配置,能够经过自定义域名访问并设置密码,提高安全性。(可选)
upstream kibana_server {
server 192.168.108.182:5601 weight=1 max_fails=3 fail_timeout=60;
}
server {
listen 80;
server_name www.kibana.com;
auth_basic "Restricted Access";
auth_basic_user_file /etc/nginx/conf.d/htpasswd.users;
location / {
proxy_pass http://kibana_server;
proxy_http_version 1.1;
}
}
在这个小节,配置整体结构与日志数据流向以下图:
再将日志流向的配置文件输入输出标注一下: