ELK是指Elasticsearch + Logstash + Kibaba三个组件的组合。Elasticsearch主要充当一个全文检索和分析引擎,Logstash是一款分布式日志收集系统,Kibana能够为这个平台提供可视化的Web界面。node
首先从官网(https://www.elastic.co/cn/downloads)下载安装包linux
elasticsearch-6.2.3.tar.gzbootstrap
kibana-6.2.3-linux-x86_64.tar.gzvim
logstash-6.2.3.tar.gz浏览器
jdk-8u151-linux-x64.tar.gzbash
因为elk官方指定使用oracle的jdk8,若是操做系统自带有openjdk能够先进行卸载而后再安装jdk。oracle
jdk安装包解压ssh
tar -zxvf jdk-8u151-linux-x64.tar.gz
配置路径,在/etc/profile文件配置curl
vim /etc/profile
添加配置elasticsearch
export JAVA_HOME=/home/jdk1.8.0_151 export PATH=$PATH:$JAVA_HOME/bin
是配置生效
source /etc/profile
修改/etc/sysctl.conf 和/etc/security/limits.conf 配置
vi /etc/sysctl.conf #添加配置 vm.max_map_count = 655360 #使配置生效 sysctl -p
vi /etc/security/limits.conf #添加配置 * soft nofile 65536 * hard nofile 131072 * soft nproc 65536 * hard nproc 131072
解压elasticsearch-6.2.3.tar.gz到/home/eselk/elk目录下
tar -zxvf elasticsearch-6.2.3.tar.gz
进入到elasticsearch的配置目录(/home/eselk/elk/elasticsearch-6.2.3/config
)下修改配置文件elasticsearch.yml
cluster.name: es_cluster #集群的名称 node.name: secms-elk1 #节点名称 path.data: /home/eselk/elk/data #数据存储的目录(多个目录使用逗号分隔) path.logs: /home/eselk/elk/logs #日志路径 network.host: 0.0.0.0 http.port: 9200 #端口默认9200 bootstrap.mlockall: true #锁住内存,使内存不会分配至交换区(swap)
启动es,查看是否启动成功
./bin/elasticsearch -d
执行命令
curl http://10.118.213.223:9200
能够看到输出
{ "name" : "secms-elk1", "cluster_name" : "es_cluster", "cluster_uuid" : "PetmCJcwQX-RAwVdWiVC-Q", "version" : { "number" : "6.2.3", "build_hash" : "c59ff00", "build_date" : "2018-03-13T10:06:29.741383Z", "build_snapshot" : false, "lucene_version" : "7.2.1", "minimum_wire_compatibility_version" : "5.6.0", "minimum_index_compatibility_version" : "5.0.0" }, "tagline" : "You Know, for Search" }
es安装成功,将es安装复制到到其余机子上面,修改配置文件elasticsearch.yml中node.name节点名字以后启动各个节点,es系统会自动将cluster name相同的机器组成一个集群。
解压logstash-6.2.3.tar.gz 到/home/eselk/elk目录下
tar -zxvf logstash-6.2.3.tar.gz
编辑配置文件,这里在logstash安装目录下新建文件logstash-simple.conf,文件内容
input{ #接收kafka中数据 kafka{ bootstrap_servers => "10.118.213.175:9092,10.118.213.219:9092" #kafka集群 topics=>["sentinel_logs","siem-alarm","siem-audit","siem-ddos","siem-fw","siem-ips","siem-risk","siem-syflog","siem-waf"] #接收主题中数据 codec => plain consumer_threads => 4 decorate_events => true } } output{ #输出数据到es中 elasticsearch{ hosts => ["10.118.213.223:9200"] index => "all-log" } }
保存文件以后启动logstash
./bin/logstash -f logstash-simple.conf > /dev/null &
查看logstash进程,确认logstash是否启动成功
ps -ef|grep logstash
下面咱们在另外一个机子上面也安装logstash,用来接收用户操做命令道es中,按照一样步骤解压logstash到指定目录,而后新建配置文件logstash-simple.conf文件内容
input { #读取文件中数据 file { path=> [ "/var/log/history.log" ] type=>"history_log" } } output { elasticsearch { hosts => "10.118.213.223:9200" index => "ssh_opt" } }
按照前面步骤启动logstash。
解压kibana-6.2.3-linux-x86_64.tar.gz到指定目录/home/eselk/elk
修改conf中配置文件
#访问端口 server.port: 5601 #全部站点均可以访问 server.host: "0.0.0.0" #指到es集群master节点 elasticsearch.url: "http://10.118.213.223:9200" #kibana是一个小系统,本身也须要存储数据(将kibana的数据保存到.kibana的索引中,会在ES里面建立一个.kibana) kibana.index: ".kibana"
启动kibaba
./bin/kibana > /dev/null &
能够经过http://10.118.213.223:5601/在浏览器中访问
若是咱们要查找logstash发送过来的数据,须要先配置index
新建index匹配规则
一样操做再新建收集用户操做命令的index匹配规则ssh_opt*,这样能够在Discover页面查询搜索数据。如今无论是all-log*或者ssh_opt*都是不能匹配到数据的,由于咱们尚未发送数据到logstash。
1.咱们经过kaka客户端程序向kafuka主题中写入日志数据,而后查询all-log*匹配规则能够发现查询到接收到的数据。
2.收集用户操做命令数据,咱们首先要将用户的操做命令写入/var/log/history.log目录下,而后再经过logstash进行收集。
在主机/etc/bashrc添加配置
vim /etc/bashrc
在文件末尾添加以下内容(注意先贴到记事本中,下面的指令只有两行):
export HISTORY_FILE="/var/log/history.log" export PROMPT_COMMAND='{ thisHistID=`history 1|awk "{print\\$1}"`;lastCommand=`history 1| awk "{\\$1=\"\" ;print}"`;user=`id $(whoami)`;whoStr=(`who -u am i`);realUser=${whoStr[0]};logMonth=${whoStr[2]};logDay=${whoStr[3]};logTime=${whoStr[4]};pid=${whoStr[6]};ip=${whoStr[7]};if [ ${thisHistID}x != ${lastHistID}x ];then echo -E `date "+%Y/%m/%d %H:%M:%S"` $user\($realUser\)@$ip[PID:$pid][LOGIN:$logMonth $logDay $logTime] --- $lastCommand ;lastHistID=$thisHistID;fi; } >> $HISTORY_FILE'
执行source /etc/bashrc使配置生效。新建history.log文件并赋权
[root@rhserver ~]# cd /var/log [root@rhserver ~]# touch history.log [root@rhserver ~]# chmod 777 history.log
再执行一些命令查看ssh_opt*匹配到数据刚执行的测试命令