做为运维工程师,咱们天天须要对服务器进行故障排除,那么最早能帮助咱们定位问题的就是查看服务器日志,经过日志能够快速的定位问题。javascript
目前咱们说的日志主要包括系统日志、应用程序日志和安全日志。系统运维和开发人员能够经过日志了解服务器软硬件信息、检查配置过程当中的错误及错误发生的缘由。常常须要分析日志能够了解服务器的负荷,性能安全性,从而及时采起措施纠正错误。并且日志被分散的储存不一样的设备上。css
若是你管理数上百台服务器,咱们登陆到每台机器的传统方法查阅日志。这样是否是感受很繁琐和效率低下。当务之急咱们使用集中化的日志管理,例如:开源的syslog,将全部服务器上的日志收集汇总。html
集中化管理日志后,日志的统计和检索又成为一件比较麻烦的事情,通常咱们使用find、grep、awk和wc等Linux命令能实现检索和统计,可是对于要求更高的查询、排序和统计等要求和庞大的机器数量依然使用这样的方法不免有点力不从心。前端
今天给你们分享的开源实时日志分析ELK平台可以完美的解决咱们上述的问题,ELK由ElasticSearch、Logstash和Kiabana三个开源工具组成。java
ELK=elasticsearch+Logstash+kibana node
1) Elasticsearch是个开源分布式搜索引擎,它的特色有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等,ELK官网:https://www.elastic.co/mysql
2) Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。工做方式为C/S架构,Client端安装在须要收集日志的主机上,Server端负责将收到的各节点日志进行过滤、修改等操做在一并发往Elasticsearch服务器。linux
3) Kibana 也是一个开源和免费的工具,它Kibana能够为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,能够帮助您汇总、分析和搜索重要数据日志;nginx
4) FileBeat是一个轻量级日志采集器,Filebeat属于Beats家族的6个成员之一,早期的ELK架构中使用Logstash收集、解析日志而且过滤日志,可是Logstash对CPU、内存、IO等资源消耗比较高,相比Logstash,Beats所占系统的CPU和内存几乎能够忽略不计;es6
5) Logstash和Elasticsearch是用Java语言编写,而Kibana使用node.js框架,在配置ELK环境要保证系统有JAVA JDK开发库;
6) Redis 负责消息队列机制,存储数据,即便远端Logstash server因故障中止运行,数据将会先被存储下来,从而避免数据丢失。
今天带你们一块儿来学习EKL企业分布式实时日志平台的构建.
若是使用Filebeat,Logstash从FileBeat获取日志文件。Filebeat做为Logstash的输入input将获取到的日志进行处理,将处理好的日志文件输出到Elasticsearch进行处理,
1) ELK工做流程
经过logstash收集客户端APP的日志数据,将全部的日志过滤出来,存入Elasticsearch 搜索引擎里,而后经过Kibana GUI在WEB前端展现给用户,用户须要能够进行查看指定的日志内容。
下图为EKL企业分布式实时日志平台结构图,若是没有使用Filebeat,Logstash收集日志,进行过滤处理,而且将数据发往elasticsearch,结构图以下:
同时也能够加入redis通讯队列:
图一:
图二:
2) 加入Redis队列后工做流程
Logstash包含Index和Agent(shipper) ,Agent负责客户端监控和过滤日志,而Index负责收集日志并将日志交给ElasticSearch,ElasticSearch将日志存储本地,创建索引、提供搜索,kibana能够从ES集群中获取想要的日志信息。
安装ELK以前,须要先安装JDK (Java Development Kit) 是 Java 语言的软件开发工具包(SDK)),这里选择jdk-8u102-linux-x64.rpm ,bin文件安装跟sh文件方法同样,rpm -ivh jdk-8u102-linux-x64.rpm ,回车便可,默认安装到/usr/java/jdk1.8.0_102目录下。
配置java环境变量,参考博客篇:http://www.javashuo.com/article/p-rtvzuadn-eq.html
修改操做系统的内核优化,参考博客篇:https://www.cnblogs.com/zhangan/p/10956138.html#x3
注释:java环境与操做系统的内核优化必须作,否则启动程序会报错。
另外修改操做系统的内核配置文件sysctl.conf(2台),sysctl - p 生效
[root@es-master local]# vim /etc/sysctl.conf #在配置文件最后面添加以下内容 vm.max_map_count=262144 vm.swappiness=0
解释:max_map_count文件包含限制一个进程能够拥有的VMA(虚拟内存区域)的数量。虚拟内存区域是一个连续的虚拟地址空间区域。在进程的生命周期中,每当程序尝试在内存中映射文件,连接到共享内存段,或者分配堆空间的时候,这些区域将被建立。当进程达到了VMA上线但又只能释放少许的内存给其余的内核进程使用时,操做系统会抛出内存不足的错误。
swappiness,Linux内核参数,控制换出运行时内存的相对权重。swappiness参数值可设置范围在0到100之间。 低参数值会让内核尽可能少用交换,更高参数值会使内核更多的去使用交换空间。默认值为60,对于大多数操做系统,设置为100可能会影响总体性能,而设置为更低值(甚至为0)则可能减小响应延迟。
vm.swappiness=1;进行最少许的交换,而不由用交换。若是设置为 0 的话,则等同于禁用 swap
特别提醒:
Elasticsearch-7版本最低支持jdk版本为JDK1.11
Elasticsearch-7.3该版本内置了JDK,而内置的JDK是当前推荐的JDK版本。固然若是你本地配置了JAVA_HOME那么ES就是优先使用配置的JDK启动ES。(言外之意,你不安装JDK同样能够启动,我试了能够的。)
ES推荐使用LTS版本的JDK(这里只是推荐,JDK8就不支持),若是你使用了一些不支持的JDK版本,ES会拒绝启动。
因为咱们平常的代码开发都是使用的JDK1.8,并且可能JAVA_HOME配置成JDK1.8,那么解决方法咱们只需更改Elasticsearch的启动文件,使它指向Elasticsearch-7.3该版本内置了JDK,或者也能够参照jdk安装文档升级jdk高版本
修改启动配置文件
[root@localhost bin]# pwd
/data/elasticsearch/bin
[root@localhost bin]# vi elasticsearch
#配置es自带的jdk路径 export JAVA_HOME=/usr/local/elasticsearch-node1/jdk export PATH=$JAVA_HOME/bin:$PATH #添加jdk判断 if [ -x "$JAVA_HOME/bin/java" ]; then JAVA="/usr/local/elasticsearch-node1/jdk/bin/java" else JAVA=`which java` fi
分别下载ELK软件包:
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.3.1-linux-x86_64.tar.gz wget https://artifacts.elastic.co/downloads/logstash/logstash-7.3.1.tar.gz Wget https://artifacts.elastic.co/downloads/kibana/kibana-7.3.1-linux-x86_64.tar.gz wget http://117.128.6.27/cache/download.redis.io/releases/redis-4.0.14.tar.gz?ich_args2=522-03101307057974_3dd269048c70bdf875dc9acff1b71955_10001002_9c89622bd3c4f7d69f33518939a83798_c8a912a80ccc397fec935bfecdb1cca8 wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.3.1-linux-x86_64.tar.gz
|
1) ELK安装信息
192.168.111.128 Elasticsearch 192.168.111.129 Kibana 192.168.111.130 Logstash + redis |
2) 192.168.111.128上安装elasticsearch-node1
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.3.1-linux-x86_64.tar.gz tar xf elasticsearch-7.3.1.tar.gz -C /usr/local/ cd /usr/local/ mv elasticsearch-7.3.1 elasticsearch-node1 |
3) 配置
注意:因为elasticsearch启动的时候不能直接用root用户启动,因此须要建立普通用户
useradd elk chown -R elk:elk /usr/local/elasticsearch-node1 |
分别建立两个elasticsearch节点的数据目录和日志目录
mkdir -pv /usr/local/elasticsearch-node1/{data,logs} chown -R elk:elk /usr/local/elasticsearch-node1 |
修改elasticsearch节点的配置文件jvm.options
cd /usr/local/elasticsearch-node1/config/ vi jvm.options |
修改以下两个选项:
注意:
1. 最大值和最小值设置为同样的值,不然在系统使用的时候会因jvm值变化而致使服务暂停
2. 过多的内存,会致使用于缓存的内存越多,最终致使回收内存的时间也加长
3. 设置的内存不要超过物理内存的50%,以保证有足够的内存留给操做系统
4. 不要将内存设置超过32GB
修改elasticsearch节点的配置文件elasticsearch.yml
[root@es-master local]# vim elasticsearch-node1/config/elasticsearch.yml #修改如下项 #表示集群标识,同一个集群中的多个节点使用相同的标识 cluster.name: elasticsearch #节点名称 node.name: "elasticsearch-node1" #数据存储目录 path.data: /usr/local/elasticsearch-node1/data #日志目录 path.logs: /usr/local/elasticsearch-node1/logs #节点所绑定的IP地址,而且该节点会被通知到集群中的其余节点 network.host: 192.168.11.128 #绑定监听的网络接口,监听传入的请求,能够设置为IP地址或者主机名 network.bind_host: 192.168.111.128 #发布地址,用于通知集群中的其余节点,和其余节点通信,不设置的话默承认以自动设置。必须是一个存在的IP地址 network.publish_host: 192.168.111.128 #es7.x 以后新增的配置,初始化一个新的集群时须要此配置来选举master cluster.initial_master_nodes: ["192.168.111.128:9300"] #集群通讯端口 transport.tcp.port: 9300 #对外提供服务的http端口,默认为9200 http.port: 9200 #集群中主节点的初始列表,当主节点启动时会使用这个列表进行非主节点的监测 discovery.zen.ping.unicast.hosts: ["192.168.111.128:9300","192.168.111.128:9301"] #下面这个参数控制的是,一个节点须要看到的具备master节点资格的最小数量,而后才能在集群中作操做。官方推荐值是(N/2)+1; #其中N是具备master资格的节点的数量(咱们的状况是2,所以这个参数设置为1) #可是:但对于只有2个节点的状况,设置为2就有些问题了,一个节点DOWN掉后,确定连不上2台服务器了,这点须要注意 discovery.zen.minimum_master_nodes: 1 #集群ping过程的超时 discovery.zen.ping_timeout: 120s #客户端链接超时 client.transport.ping_timeout: 60s #cache缓存大小,10%(默认),可设置成百分比,也可设置成具体值,如256mb。 indices.queries.cache.size: 20% #索引期间的内存缓存,有利于索引吞吐量的增长。 indices.memory.index_buffer_size: 30% #开启了内存地址锁定,为了不内存交换提升性能。可是Centos6不支持SecComp功能,启动会报错,因此须要将其设置为false bootstrap.memory_lock: true bootstrap.system_call_filter: false #设置该节点是否具备成为主节点的资格以及是否存储数据。 node.master: true node.data: true #ElasticSearch 更改search线程池,search 线程设置太小致使程序崩溃 thread_pool.search.queue_size: 1000 #queue_size容许控制没有线程执行它们的挂起请求队列的初始大小。 thread_pool.search.size: 200 #size参数控制线程数,默认为核心数乘以5。 thread_pool.search.min_queue_size: 10 #min_queue_size设置控制queue_size能够调整到的最小量。 thread_pool.search.max_queue_size: 1000 #max_queue_size设置控制queue_size能够调整到的最大量。 thread_pool.search.auto_queue_frame_size: 2000 #auto_queue_frame_size设置控制在调整队列以前进行测量的操做数。它应该足够大,以便单个操做不会过分偏向计算。 thread_pool.search.target_response_time: 6s #target_response_time是时间值设置,指示线程池队列中任务的目标平均响应时间。若是任务一般超过此时间,则将调低线程池队列以拒绝任务。
4) 192.168.111.128上安装elasticsearch-node2
cd /usr/local/ cp -r elasticsearch-node1 elasticsearch-node2 |
修改vim /usr/local/elasticsearch-node2/config/elasticsearch.yml文件
[root@es-master local]# vim elasticsearch-node1/config/elasticsearch.yml #修改如下项 cluster.name: elasticsearch node.name: "elasticsearch-node2" path.data: /usr/local/elasticsearch-node2/data path.logs: /usr/local/elasticsearch-node2/logs network.host: 192.168.11.128 network.bind_host: 192.168.11.128 network.publish_host: 192.168.11.128 cluster.initial_master_nodes: ["192.168.11.128:9300"] transport.tcp.port: 9301 http.port: 9201 discovery.zen.ping.unicast.hosts: ["192.168.111.128:9300","192.168.111.128:9301"] discovery.zen.minimum_master_nodes: 1 discovery.zen.ping_timeout: 120s client.transport.ping_timeout: 60s indices.queries.cache.size: 20% indices.memory.index_buffer_size: 30% bootstrap.memory_lock: true bootstrap.system_call_filter: false node.master: true node.data: true thread_pool.search.queue_size: 1000 thread_pool.search.size: 200 thread_pool.search.min_queue_size: 10 thread_pool.search.max_queue_size: 1000 thread_pool.search.auto_queue_frame_size: 2000 thread_pool.search.target_response_time: 6s
5) 启动
首先切换为elk用户
su - elk /usr/local/elasticsearch-node1/bin/elasticsearch -d /usr/local/elasticsearch-node2/bin/elasticsearch -d |
启动后以下图:
使用curl测试节点是否能够正常访问,
curl 192.168.111.128:9200
curl 192.168.111.128:9201
查看状态
curl "http://127.0.0.1:9200/_status?pretty=true"
查看集群健康
curl "http://127.0.0.1:9200/_cat/health?v"
至此ES配置完毕,
下载解压安装:
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.3.1-linux-x86_64.tar.gz tar xf kibana-7.3.1-linux-x86_64.tar.gz -C /usr/local |
修改kibana的配置文件kibana.yml
[root@localhost local]# cd kibana/config #设置请求时长 |
启动kibana
[root@localhost config]# cd /usr/local//bin |
若是启动正常,在浏览器端访问http://192.168.111.129:5601,便可看到图形化操做工具
vi kibana.yml i18n.locale: "zh-CN" |
时区修改步骤:
进入kibana界面-》管理-》高级设置-》设置日期格式的时区
默认:Browser
可修改成:Asia/Shanghai
重启便可!
1、wget https://artifacts.elastic.co/downloads/logstash/logstash-7.3.1.tar.gz
解压logstash:
tar xzf logstash-7.3.1.tar.gz mv logstash-7.3.1 /usr/local/logstash/ |
二、logstash的优化pipelines.yml丶logstash.yml配置
能够优化的参数,可根据本身的硬件进行优化配置:
①pipeline线程数,官方建议是等于CPU内核数
②实际output时的线程数
③每次发送的事件数
④发送延时(单位是毫秒)
总结:
Logstash中的jvm.options配置文件:
Logstash是一个基于Java开发的程序,须要运行在JVM中,能够经过配置jvm.options来针对JVM进行设定。好比内存的最大最小、垃圾清理机制等等。JVM的内存分配不能太大不能过小,太大会拖慢操做系统。过小致使没法启动。默认以下:
wget http://117.128.6.27/cache/download.redis.io/releases/redis-4.0.14.tar.gz?ich_args2=522-03101307057974_3dd269048c70bdf875dc9acff1b71955_10001002_9c89622bd3c4f7d69f33518939a83798_c8a912a80ccc397fec935bfecdb1cca8 tar zxf redis-4.0.14.tar.gz -C /usr/local cd redis-3.0.5 make make install |
将/usr/local/redis/bin/目录加入至环境变量配置文件/etc/profile末尾,而后Shell终端执行source /etc/profile让环境变量生效。
export PATH=/usr/local/redis/bin:$PATH |
配置:
vim redis.conf #修改如下内容,不是必须的 bind 127.0.0.1 192.168.111.130 #两台主机分别改成本身的IP logfile "/usr/local/src/redis-3.0.3/redis.log" daemonize yes #启用守护模式 #requirepass "admin.123" #设置redis登陆密码 这个看本身需求能够不要 |
Redis作ELK缓冲队列的优化:
启动及中止Redis服务命令:
/usr/local/redis/bin/redis-server /usr/local/redis/redis.conf /usr/local/redis/bin/redis-cli -p 6379 shutdown |
查看redis内存使用状况:redis-cli -r 100 -i 1 info Memory |grep "used_memory_human"
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.3.1-linux-x86_64.tar.gz tar -zxvf filebeat-7.3.1-linux-x86_64.tar.gz -C /usr/local/ mv filebeat-7.3.1-linux-x86_64 filebeat |
启动方式以下:
./filebeat -e -c ../filebeat.yml
-c:配置文件位置
-path.logs:日志位置
-path.data:数据位置
-path.home:家位置
-e:关闭日志输出
-d 选择器:启用对指定选择器的调试。 对于选择器,能够指定逗号分隔的组件列表,也可使用-d“*”为全部组件启用调试.例如,-d“publish”显示全部“publish”相关的消息。
后台启动filebeat
nohup ./filebeat -e -c ../filebeat.yml >/dev/null 2>&1 & 将全部标准输出及标准错误输出到/dev/null空设备,即没有任何输出
nohup ./filebeat -e -c ./filebeat.yml > filebeat.log &
目录/usr/local/filebeat/config/filebeat.yml:
1) filebeat客户端日志采集-存入Redis缓存数据库;
filebeat.yml文件内容:
filebeat.inputs:- type: log - type: log enabled: true paths: - /var/log/mysqld.log encoding: utf-8 tail_files: true exclude_lines: ["invoke:method=execute","profile"] fields: log_topic: mysql close_older: 24h scan_frequency: 3s harvester_buffer_size: 32768 max_bytes: 10485760 idle_timeout: 5s multiline.pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}' multiline.negate: true multiline.match: after processors: - drop_fields: #删除字段,再也不kibana里面展现,默认状况kibana里面会自动展现这些beat字段 fields: ["type","prospector","source","input","beat","version","@version","offset"] output.redis: hosts: ["192.168.111.130:6379"] key: "filebeat" db: 0 datatype: list loadbalance: true
主要配置项说明:
enabled:true 表明开启这个配置节
paths: 监控指定目录下的文件,支持模糊搜索
tail_files: true :第一次读取的是最新内容,不须要整个文件读取 (可选)
exclude_lines: 指定正则表达式,用来指定不要匹配的行,在输入中排除符合正则表达式列表的那些行,能够有几个。(可选)
close_older: 若是一个文件在某个时间段内没有发生过更新则关闭监控的文件handle。默认1h,change只会在下一次scan才会被发现(可选)
scan_frequency: Filebeat以多快的频率去prospector指定的目录下面检测文件更新好比是否有新增文件若是设置为0s则Filebeat会尽量快地感知更新占用的CPU会变高。默认是10s。
harvester_buffer_size: 每一个harvester监控文件时使用的buffer的大小。(可选)
max_bytes: 日志文件中增长一行算一个日志事件max_bytes限制在一第二天志事件中最多上传的字节数多出的字节会被丢弃。The default is 10MB.(可选)
idle_timeout: 后台刷新超时时间,超过定义时间后强制发送,无论spool_size后台事件计数阈值是否达到,默认5秒。
fields: 增长fields额外字段,附加的可选字段,以向output添加额外的信息。output里面可使用这个变量
multiline: 多行日志监控,下面配置的意思是:不以[开头的行都合并到上一行的末尾(正则写的很差,忽略忽略)
pattern:正则表达式
negate:true 或 & false;默认是false,匹配pattern的行合并到上一行;true,不匹配pattern的行合并到上一行
match:after 或 before,合并到上一行的末尾或开头
drop_fields: #删除字段,再也不kibana里面展现,默认状况kibana里面会自动展现这些beat字段
output.redis: 配置host指定redis地址
loadbalance: true 每一条message都随机负载均衡到redis
而后启动Agent:
nohup ./filebeat -e -c ../config/filebeat.yml > filebeat.log &
2) Redis数据-存入ES;
index.conf文件内容:
input { redis { host => "192.168.111.130" #redis地址 port => "6379" #redis端口号 data_type => "list" #logstash redis插件工做方式 key => "filebeat" #监听的键值 #add_field => { "[@metadata][myid]" => "mysql-111.111" } #增长一个字段,用于标识和判断,与下方output输出相对应 ,es6.0版本以上就不支持type类型了 db => 0 #redis数据库的编号,通常是16个,默认登陆后是0,能够经过命令选择。若是应用系统选择使用了不一样的数据库,经过配置这个参数从指定的数据库中读取信息。 threads => 10 #启用线程数量,启用线程数越多效率也快 batch_count => 1 #返回的事件数量,此属性仅在list模式下起做用。 codec => "json" #输出josn格式 } } output { stdout { codec => json_lines #输出json格式 } if [fields][log_type] == "mysql" { # 当有多个输入源的时候,可根据不一样的标识,指定不一样的输出地址 elasticsearch { hosts => ["192.168.111.128:9200"] #若是是集群须要把其他节点加进来 index => "logstash_%{[fields][log_type]}-%{+YYYY.MM.dd.HH}" ## 输出es的日志索引格式。 } } } |
注意:YYYY.MM.dd.HH 是以小时为单位切割索引文件
而后启动logstash:
../bin/logstash -f index.conf
查看启动进程:
注意:全部指定新建索引名称必须是小写,不然会报错 "Could not index event to Elasticsearch"
目录/usr/local/filebeat/config/filebeat.yml:
3) filebeat客户端日志采集-存入Redis缓存数据库;
filebeat.yml文件内容:
filebeat.inputs: - type: log enabled: true paths: - /usr/local/nginx/logs/access.log tail_files: true exclude_lines: ["invoke:method=execute","profile"] fields: log_topic: nginx_access close_older: 24h scan_frequency: 3s idle_timeout: 5s - type: log enabled: true paths: - /usr/local/nginx/logs/error.log tail_files: true exclude_lines: ["invoke:method=execute","profile"] fields: log_topic: nginx_error close_older: 24h scan_frequency: 3s idle_timeout: 5s processors: - drop_fields: #删除字段,再也不kibana里面展现,默认状况kibana里面会自动展现这些beat字段 fields: ["type","prospector","source","input","beat","version","@version","offset"] output.redis: hosts: ["192.168.111.130:6379"] key: "filebeat" db: 0 datatype: list loadbalance: true
而后启动Agent:
../bin/logstash -f agent.conf
4) Redis数据-存入ES;
index.conf文件内容:
input { stdout {
|
而后启动index:
../bin/logstash -f index.conf
查看启动进程:
浏览器访问kibana-WEB:
http://192.168.111.129:5601
如上配置能够正常收集单台服务器的日志,如何批量收集其余服务器的日志信息呢,方法步骤是什么呢?
能够基于SHELL脚本将配置完毕的logstash文件夹同步至其余服务器,或者经过ansible、saltstack服务器都可。
例如收集Nginx日志,index.conf和agent.conf内容保持不变便可:
第一种密码访问方式:x-pack密钥
从 6.8.0 和 7.1.0 版本开始, Elastic Stack安全功能免费提供。用户如今可以对网络流量进行加密、建立和管理用户、定义可以保护索引和集群级别访问权限的角色,而且使用 Spaces 为 Kibana 提供全面保护。
配置 TLS 和身份验证
咱们要作的第一件事是生成证书,经过这些证书便能容许节点安全地通讯。您可使用企业 CA 来完成这一步骤,可是在此演示中,咱们将会使用一个名为 elasticsearch-certutil 的命令,经过这一命令,您无需担忧证书一般带来的任何困扰,便能完成这一步。
第 1步:在 Elasticsearch 主节点上配置 TLS
使用 cd 命令更改至 master 目录,而后运行下列命令:
bin/elasticsearch-certutil cert -out config/elastic-certificates.p12 -pass ""
接下来,打开文件 config/elasticsearch.yaml。将下列代码行粘贴到文件末尾,每一个节点都要加。
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: elastic-certificates.p12
保存文件,如今咱们即可以启动主节点了
第 2 步:Elasticsearch 集群密码
一旦主节点开始运行,即可觉得集群设置密码了。运行命令 bin/elasticsearch-setup-passwords auto。这将会为不一样的内部堆栈用户生成随机密码。或者,您也能够跳过 auto 参数,改成经过 interactive 参数手动定义密码。请记录这些密码,咱们很快就会再次用到这些密码。
注意点:在主节点生成p12文件、配置重启,生成密码完成前,千万不能操做其余节点。
第 3 步:在 Elasticsearch 节点上配置 TLS
在第1步运行bin/elasticsearch-certutil cert -out config/elastic-certificates.p12 -pass ""
该命令的时候 将会在config下生成elastic-certificates.p12文件,将此文件cp -a到其余两个节点的config目录,而后重启节点。
咱们将看到其加入集群。并且,若是看一下主节点的终端窗口,咱们会看到有一条消息显示已有一个节点加入集群。如今,咱们的两节点集群便开始运行了。
注意点:必定要注意p12文件的权限属主
第 4 步:在 Kibana 中实现安全性
要作的最后一件事是为 Kibana 配置用户密码。咱们能够从以前 setup-passwords 命令的输出内容中找到密码。打开 config/kibana.yml 文件。找到相似下面的代码行
#elasticsearch.username: "user"
#elasticsearch.password: "pass"
对 username 和 password 字段取消注释,方法是删除代码行起始部分的 # 符号。将 "user" 更改成 "kibana",而后将 "pass" 更改成 setup-passwords 命令告诉咱们的任何 Kibana 密码。保存文件,而后咱们即可经过运行 bin/kibana启动 Kibana 了。
登陆后能够在此页面建立用户及控制用户权限,还能够对超级用户进行密码修改
而后相应的logstash收集过滤日志也须要配置用户密码,否则输出日志到elasticsearch失败,写法以下:
output { stdout { codec => json_lines } if [fields][log_topic] == "dispatcher-connector_zone2" { elasticsearch { hosts => ["192.168.115.98:9200","192.168.115.87:9200","192.168.115.126:9200"] user => "elastic" password => "Ericss0n" index => "logstash_%{[fields][log_topic]}-%{+YYYY.MM.dd}" } } }
第二种方式:Apache的密码认证进行安全配置。经过访问Nginx转发kibana服务器地址。
当咱们安装完ES、Kibana启动进程,能够直接在浏览器访问,这样不利于数据安全,接下来咱们利用Apache的密码认证进行安全配置。经过访问Nginx转发只ES和kibana服务器。
Kibana服务器安装Nginx:
yum install openssl openssl-devel pcre-devel pcre zlib zlib-devel –y wget -c http://nginx.org/download/nginx-1.8.1.tar.gz tar -xzf nginx-1.8.1.tar.gz cd nginx-1.8.1 useradd www ;./configure --user=www --group=www --prefix=/usr/local/nginx --with-pcre --with-pcre-jit --with-http_sub_module --with-http_stub_status_module --with-http_ssl_module --with-http_flv_module --with-http_realip_module --with-http_spdy_module --with-http_gunzip_module --with-http_gzip_static_module make make install #自此Nginx安装完毕 /usr/local/nginx/sbin/nginx -t 检查nginx配置文件是否正确,返回OK即正确。 |
修改Nginx.conf配置文件代码以下:
#user www www; worker_processes 2; worker_cpu_affinity 00000001 00000010; error_log /opt/ericsson/nginx/logs/error.log crit; pid /opt/ericsson/nginx/nginx.pid; worker_rlimit_nofile 102400; events { use epoll; worker_connections 102400; multi_accept on; } http { include mime.types; log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$request_time"'; default_type application/octet-stream; access_log logs/access.log main ; sendfile on; tcp_nopush on; tcp_nodelay on; keepalive_timeout 60; gzip on; gzip_min_length 1k; gzip_buffers 4 16k; gzip_http_version 1.1; gzip_comp_level 4; gzip_types text/plain application/x-javascript text/css application/xml; gzip_vary on; client_max_body_size 10m; client_body_buffer_size 128k; proxy_connect_timeout 90; proxy_send_timeout 90; proxy_read_timeout 90; proxy_buffer_size 4k; proxy_buffers 4 32k; proxy_busy_buffers_size 64k; large_client_header_buffers 4 4k; client_header_buffer_size 4k; open_file_cache max=102400 inactive=20s; open_file_cache_valid 30s; open_file_cache_min_uses 1; upstream kibana_web1 { server 172.31.15.228:5601 weight=1 max_fails=2 fail_timeout=30s; }
server { listen 8080; server_name 172.31.15.202; location / { auth_basic "ELK Kibana Monitor Center"; auth_basic_user_file /opt/ericsson/nginx/html/.htpasswd; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_pass http://kibana_web1; } } } |
修改kibana配置文件监听IP为127.0.0.1:
重启kibana和Nginx服务,经过Nginx 80端口访问以下:
添加Nginx权限认证:
Nginx.conf配置文件location /中加入以下代码:
auth_basic "ELK Kibana Monitor Center"; auth_basic_user_file /usr/local/nginx/html/.htpasswd; |
经过Apache加密工具htpasswd生成用户名和密码:
须要安装:yum -y install httpd-tools
htpasswd -c /usr/local/nginx/html/.htpasswd admin
重启Nginx web服务,访问以下:
用户名和密码正确,便可登陆成功,
1,普通正则表达式以下:
. 任意一个字符 * 前面一个字符出现0次或者屡次 [abc] 中括号内任意一个字符 [^abc] 非中括号内的字符 [0-9] 表示一个数字 [a-z] 小写字母 [A-Z] 大写字母 [a-zA-Z] 全部字母 [a-zA-Z0-9] 全部字母+数字 [^0-9] 非数字 ^xx 以xx开头 xx$ 以xx结尾 \d 任何一个数字 \s 任何一个空白字符 |
2,扩展正则表达式,在普通正则符号再进行了扩展
扩展正则表达式,在普通正则符号再进行了扩展 ? 前面字符出现0或者1次 + 前面字符出现1或者屡次 {n} 前面字符匹配n次 {a,b} 前面字符匹配a到b次 {,b} 前面字符匹配0次到b次 {a,} 前面字符匹配a或a+次 (string1|string2) string1或string2 |
简单提取IP
114.114.114.114 255.277.277.277
1-3个数字.1-3个数字.1-3个数字.1-3个数字
[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}
1.13 logstash之multiline插件,匹配多行日志
在外理日志时,除了访问日志外,还要处理运行时日志,该日志大都用程序写的,一个异常的日志是多行的,咱们目的是要把有这样的日志合并成一条
安装插件命令是 # logstash-plugin install logstash-filter-multiline
# logstash-plugin install logstash-filter-multiline
Validating logstash-filter-multiline
Installing logstash-filter-multiline
Installation successfu |
在logstash filter中使用multiline插件(不推荐):
不推荐的缘由:
在filter中,加入如下代码:
filter {
multiline { }
}
在logstash input中使用multiline插件(没有filebeat时推荐):
multiline字段属性:
对于multiline插件来讲,有三个设置比较重要:negate , pattern 和 what
codec =>multiline { charset=>... #可选 字符编码(UTF-8) max_bytes=>... #可选 bytes类型 设置最大的字节数 max_lines=>... #可选 number类型 设置最大的行数,默认是500行 pattern=>... #必选 string类型 设置匹配的正则表达式 patterns_dir=>... #可选 array类型 能够设置多个正则表达式 negate=>... #必选 boolean类型 默认false不显示,可设置ture what=>... #必选 向前previous , 向后 next auto_flush_interval => 30 # 若是在规定时候内没有新的日志事件就不等待后面的日志事件 }
下面看看这两个例子:
1.一个java的报错:
input能够设置以下:
input { file { type => "TC" start_position => "end" path => "/opt/ericsson/csp/logs/TC.log" codec => multiline { pattern => "^\[" negate => true what => "previous" charset => "UTF-8" } } }
说明:区配以"["开头的行,若是不是,那确定是属于前一行的
2.好比一个java应用产生的异常日志是这样:
input能够设置以下:
input { file { type => "tsc-remote-status" start_position => "end" path => "/opt/qweq/logs/tsc-remote-status.log" codec => multiline { pattern => "^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}" negate => true what => "previous" charset => "UTF-8" } } }
说明:目的匹配以 “2017-11-15 14:32:03” 这种时间格式开头的日志,若是不是,那确定是属于前一行的
1,条件判断
使用条件来决定filter和output处理特定的事件。logstash条件相似于编程语言。条件支持if、else if、else语句,能够嵌套。
比较操做有:
==
, !=
, <
, >
, <=
, >=
=~(匹配正则)
, !~(不匹配正则)
in(包含)
, not in(不包含)
布尔操做:
and(与)
, or(或)
, nand(非与)
, xor(非或)
一元运算符:
!(取反)
()
(复合表达式), !()
(对复合表达式结果取反)2,经常使用的过滤器为:
mutate插件:使用最频繁的操做,能够对字段进行各类操做,好比重命名、删除、替换、更新等,主要参数以下:
filter { mutate { convert => { "local_port" => "integer" "bytes" => "integer" "latency_ms" => "integer" } rename => { "[host][name]" => "host" } remove_field => ["type","prospector","source","input","beat","version","@version","offset"] } }
drop插件:丢弃一部分events不进行处理,匹配筛选多余日志。
filter { #dispatcher if([message]=~ "DEBUG") or ([message]=~ "traceLogger") or ([message]=~ "^20.*\ WARN\ .*\ by\ regular\ expression.*"){ drop{} } #tsc if([message]=~ ".*\ The\ HTTP\ headers\ are.*") or ([message]=~ "with Queue depth: 0$") or ([message]=~ "%$") or ([message]=~ "execute...$"){ drop{} } #tc if([message]=~ "\[]$") or ([message]=~ "invoke:method=execute") { drop{} } #等等 }
date:时间处理
处理2条不一样格式时间日志示例以下:
2019-09-16 21:59:18.131 [TSC_MESSAGE] [INFO] [c.e.c.t.m.r.RemoteControllMessageResourceImpl] [csp] [Add message] [LB37722Z5KH038111] [OPERATION_SUCCESS] []
192.168.112.24 - - [16/Sep/2019:21:58:16 +0800] "GET /services/serviceconnection/ HTTP/1.1" 200 1836
date使用:
filter { grok { match => {"message" => "%{TIMESTAMP_ISO8601:access_time}"} match => {"message" => "%{IP:ip}\s*%{DATA:a}\s*\[%{HTTPDATE:access_time}\]"} } date { # 这个才是真正的access发生时间,识别为记录的@timestamp timezone => "Asia/Shanghai" match => [ "access_time","ISO8601","dd/MMM/yyyy:HH:mm:ss Z"] } }
注:match => [ "access_time", "ISO8601"] } 注意:时区偏移量只须要用一个字母 Z 便可。
ISO8601 - 应解析任何有效的ISO8601时间戳 如:2019-09-16 21:59:18.131
dd/MMM/yyyy:HH:mm:ss Z - 应解析为16/Sep/2019:21:58:16 +0800的时间格式
date介绍:
就是将匹配日志中时间的key替换为@timestamp的时间,由于@timestamp的时间是日志送到logstash的时间,并非日志中真正的时间。
Grok插件:
详细语法参照博客篇: http://www.javashuo.com/article/p-hjufbcdv-ew.html
语法解释:
%{HOSTNAME},匹配请求的主机名
%{TIMESTAMP_ISO8601:time},表明时间戳
%{LOGLEVEL},表明日志级别
%{URIPATHPARAM},表明请求路径
%{DATA},表明任意数据
%{INT},表明字符串整数数字大小
%{NUMBER}, 能够匹配整数或者小数
%{IP}, 匹配ip
%{WORD}, 匹配请求的方式
%{GREEDYDATA},匹配全部剩余的数据
(?([\S+]*)),自定义正则
\s*或者\s+,表明多个空格
\S+或者\S*,表明多个字符
大括号里面:xxx,至关于起别名
(?<class_info>([\S+]*)), 自定义正则匹配多个字符
举例1,在日志文件中原生日志时间是这样子的:2019-03-19 13:08:07.782
重点是后面的”.782“,后面附加以毫秒为单位的。
那么grok插件中能够这样子定义匹配的规则:
grok { match => {"message" => "%{TIMESTAMP_ISO8601:access_time}"} }
举例2,操做以下:
[2019-08-22 12:25:51.441] [TSC_IHU] [ERROR] [c.e.c.t.i.t.s.IhuTsaUplinkServiceImpl] Activation/Bind uplink, query UserSession by Token failure!
grok { match => {"message" => "\[%{TIMESTAMP_ISO8601:time}\]\s*%{DATA:name}\s*\[%{LOGLEVEL:level}\]\s*%{GREEDYDATA:data}"} }
举例3,操做以下:
2019-09-12 14:16:36.320+08:00 INFO 930856f7-c78f-4f12-a0f1-83a2610b2dfc DispatcherConnector ip-192-168-114-244 [Mqtt-Device-1883-worker-18-1] com.ericsson.sep.dispatcher.api.transformer.v1.MessageTransformer {"TraceID":"930856f7-c78f-4f12-a0f1-83a2610b2dfc","clientId":"5120916600003466K4GA1059","username":"LB37622Z3KX609880"}
grok { match => {"message" => "%{TIMESTAMP_ISO8601:access_time}\s*%{LOGLEVEL:level}\s*%{UUID:uuid}\s*%{WORD:word}\s*%{HOSTNAME:hostname}\s*\[%{DATA:work}\]\s*(?<api>([\S+]*))\s*(?<TraceID>([\S+]*))\s*%{GREEDYDATA:message_data}"} }
举例4,操做以下:
192.168.125.138 - - [12/Sep/2019:14:10:58 +0800] "GET /backend/services/ticketRemind/query?cid=&msgType=1&pageSize=100&pageIndex=1&langCode=zh HTTP/1.1" 200 91
grok { match => {"message" => "%{IP:ip}\s*%{DATA:a}\s*\[%{HTTPDATE:access_time}\]\s*%{DATA:b}%{WORD:method}\s*%{URIPATH:url}%{URIPARAM:param}\s*%{URIPROTO:uri}%{DATA:c}%{NUMBER:treaty}%{DATA:d}\s*%{NUMBER:status}\s*%{NUMBER:latency_millis}"} }
举例5,操做以下:
[08/Nov/2019:11:40:24 +0800] tc-com.g-netlink.net - - 192.168.122.58 192.168.122.58 192.168.125.135 80 GET 200 /geelyTCAccess/tcservices/capability/L6T7944Z0JN427155 ?pageIndex=1&pageSize=2000&vehicleType=0 21067 17
grok { match => { "message"=> "\[%{HTTPDATE:access_time}\] %{DATA:hostname} %{DATA:username} %{DATA:fwd_for} %{DATA:remote_hostname} %{IP:remote_ip} %{IP:local_ip} %{NUMBER:local_port} %{DATA:method} %{DATA:status} %{DATA:uri} %{DATA:query} %{NUMBER:bytes} %{NUMBER:latency_ms}"} }
grok同时处理多种日志不一样行的写法:
filter { if [fields][log_topic] == "dispatcher-connector" { grok { match => {"message" => "%{TIMESTAMP_ISO8601:time}\s*%{LOGLEVEL:level}\s*%{UUID:uuid}\s*%{WORD:word}\s*%{HOSTNAME:hostname}\s*\[%{DATA:work}\]\s*(?<api>([\S+]*))\s*(?<TraceID>([\S+]*))\s*%{GREEDYDATA:message_data}"} match => {"message" => "%{TIMESTAMP_ISO8601:time}\s*%{LOGLEVEL:level}\s*%{WORD:word}\s*%{HOSTNAME:hostname}\s*(?<TraceID>([\S+]*))\s*(?<api>([\S+]*))\s*(?<brackets>([\S+]*))\s*%{GREEDYDATA:message_data}"} remove_field =>["message","time","level","uuid","word","hostname","brackets","work","api","TraceID"] } } }
1,ES数据按期删除
#/bin/bash #es-index-clear #只保留15天内的日志索引 LAST_DATA=`date -d "-7 days" "+%Y.%m.%d"` #删除上个月份全部的索引(根据本身的索引格式编写) curl -XDELETE 'http://ip:port/*-'${LAST_DATA}''
crontab -e添加定时任务:天天的凌晨清除索引。
0 1 * * * /search/odin/elasticsearch/scripts/es-index-clear.sh
2,ES按期清理cache
为避免fields data占用大量的jvm内存,能够经过按期清理的方式来释放缓存的数据。释放的内容包括field data, filter cache, query cache
curl -XPOST "ip:port/_cache/clear"
curl -XPOST -uelastic:Ericss0n "ip:port/_cache/clear"
crontab -e添加定时任务:天天的凌晨清除缓存
3,ES索引写入优化
调小索引副本数,经过增大refresh间隔周期,同时不设置副原本提升写性能。
curl -XPUT 'ip:9200/_all/_settings' -H 'Content-Type: application/json' -d '{"index":{"refresh_interval":"120s","number_of_replicas":"0"},"translog.durability": "async","translog.flush_threshold_size":"1024mb","translog.sync_interval": "120s","merge.scheduler.max_thread_count":"1","merge.policy.floor_segment":"10mb"}'
curl -XPUT 'ip:9200/_all/_settings' -uelastic:Ericss0n -H 'Content-Type: application/json' -d '{"index":{"refresh_interval":"120s","number_of_replicas":"0"},"translog.durability": "async","translog.flush_threshold_size":"1024mb","translog.sync_interval": "120s","merge.scheduler.max_thread_count":"1","merge.policy.floor_segment":"10mb"}'
crontab -e添加定时任务:天天的凌晨执行
4,修改集群分片数,默认只容许1000个分片,难免后期分片数不足丢失数据
curl -XPUT 'ip:9200/_cluster/settings' -uelastic:Ericss0n -H "Content-Type: application/json" -d '{"transient":{"cluster":{"max_shards_per_node":10000}}}'