Elasticsearch + Logstash + Kibaba安装

    ELK是指Elasticsearch + Logstash + Kibaba三个组件的组合。Elasticsearch主要充当一个全文检索和分析引擎,Logstash是一款分布式日志收集系统,Kibana能够为这个平台提供可视化的Web界面。node

1.安装包准备

首先从官网(https://www.elastic.co/cn/downloads)下载安装包linux

elasticsearch-6.2.3.tar.gzbootstrap

 kibana-6.2.3-linux-x86_64.tar.gzvim

logstash-6.2.3.tar.gz浏览器

jdk-8u151-linux-x64.tar.gzbash

2.安装JDK

因为elk官方指定使用oracle的jdk8,若是操做系统自带有openjdk能够先进行卸载而后再安装jdk。oracle

jdk安装包解压ssh

tar -zxvf jdk-8u151-linux-x64.tar.gz

配置路径,在/etc/profile文件配置curl

vim  /etc/profile

添加配置elasticsearch

export JAVA_HOME=/home/jdk1.8.0_151
export PATH=$PATH:$JAVA_HOME/bin

是配置生效

source /etc/profile

3.系统环境

修改/etc/sysctl.conf 和/etc/security/limits.conf 配置

vi /etc/sysctl.conf
#添加配置
vm.max_map_count = 655360
#使配置生效
sysctl -p  
vi /etc/security/limits.conf
#添加配置
*    soft    nofile    65536
*    hard    nofile    131072
*    soft    nproc    65536
*    hard    nproc    131072

4.新建用户

 

5.安装elasticsearch

解压elasticsearch-6.2.3.tar.gz到/home/eselk/elk目录下

tar -zxvf elasticsearch-6.2.3.tar.gz

进入到elasticsearch的配置目录(/home/eselk/elk/elasticsearch-6.2.3/config
)下修改配置文件elasticsearch.yml

cluster.name: es_cluster      #集群的名称
node.name: secms-elk1         #节点名称
path.data: /home/eselk/elk/data     #数据存储的目录(多个目录使用逗号分隔)
path.logs: /home/eselk/elk/logs     #日志路径
network.host: 0.0.0.0
http.port: 9200                     #端口默认9200
bootstrap.mlockall: true            #锁住内存,使内存不会分配至交换区(swap)

 

启动es,查看是否启动成功

./bin/elasticsearch -d

执行命令

curl  http://10.118.213.223:9200

能够看到输出

{
  "name" : "secms-elk1",
  "cluster_name" : "es_cluster",
  "cluster_uuid" : "PetmCJcwQX-RAwVdWiVC-Q",
  "version" : {
    "number" : "6.2.3",
    "build_hash" : "c59ff00",
    "build_date" : "2018-03-13T10:06:29.741383Z",
    "build_snapshot" : false,
    "lucene_version" : "7.2.1",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

es安装成功,将es安装复制到到其余机子上面,修改配置文件elasticsearch.yml中node.name节点名字以后启动各个节点,es系统会自动将cluster name相同的机器组成一个集群。

6.安装Logstash

解压logstash-6.2.3.tar.gz 到/home/eselk/elk目录下

tar -zxvf logstash-6.2.3.tar.gz

编辑配置文件,这里在logstash安装目录下新建文件logstash-simple.conf,文件内容

input{
    #接收kafka中数据
	kafka{
		bootstrap_servers => "10.118.213.175:9092,10.118.213.219:9092"  #kafka集群
		topics=>["sentinel_logs","siem-alarm","siem-audit","siem-ddos","siem-fw","siem-ips","siem-risk","siem-syflog","siem-waf"]  #接收主题中数据
		codec => plain
		consumer_threads => 4
		decorate_events => true
	}
}
output{
    #输出数据到es中
	elasticsearch{
		hosts => ["10.118.213.223:9200"]
		index => "all-log"
	}
}

保存文件以后启动logstash

./bin/logstash -f logstash-simple.conf > /dev/null &

查看logstash进程,确认logstash是否启动成功

ps -ef|grep logstash

下面咱们在另外一个机子上面也安装logstash,用来接收用户操做命令道es中,按照一样步骤解压logstash到指定目录,而后新建配置文件logstash-simple.conf文件内容

input {
	 #读取文件中数据
     file {
		path=> [ "/var/log/history.log" ]
		type=>"history_log"
  	}
}
output {
	elasticsearch {
		hosts => "10.118.213.223:9200"
		index => "ssh_opt"
	 }
}

按照前面步骤启动logstash。

7.安装Kibana

解压kibana-6.2.3-linux-x86_64.tar.gz到指定目录/home/eselk/elk

修改conf中配置文件

#访问端口
server.port: 5601
#全部站点均可以访问
server.host: "0.0.0.0"
#指到es集群master节点
elasticsearch.url: "http://10.118.213.223:9200"
#kibana是一个小系统,本身也须要存储数据(将kibana的数据保存到.kibana的索引中,会在ES里面建立一个.kibana)
kibana.index: ".kibana"

启动kibaba

./bin/kibana > /dev/null &

能够经过http://10.118.213.223:5601/在浏览器中访问

  

若是咱们要查找logstash发送过来的数据,须要先配置index

新建index匹配规则

一样操做再新建收集用户操做命令的index匹配规则ssh_opt*,这样能够在Discover页面查询搜索数据。如今无论是all-log*或者ssh_opt*都是不能匹配到数据的,由于咱们尚未发送数据到logstash。

1.咱们经过kaka客户端程序向kafuka主题中写入日志数据,而后查询all-log*匹配规则能够发现查询到接收到的数据。

2.收集用户操做命令数据,咱们首先要将用户的操做命令写入/var/log/history.log目录下,而后再经过logstash进行收集。

在主机/etc/bashrc添加配置

vim /etc/bashrc

在文件末尾添加以下内容(注意先贴到记事本中,下面的指令只有两行):

export HISTORY_FILE="/var/log/history.log"
export PROMPT_COMMAND='{ thisHistID=`history 1|awk "{print\\$1}"`;lastCommand=`history 1| awk "{\\$1=\"\" ;print}"`;user=`id $(whoami)`;whoStr=(`who -u am i`);realUser=${whoStr[0]};logMonth=${whoStr[2]};logDay=${whoStr[3]};logTime=${whoStr[4]};pid=${whoStr[6]};ip=${whoStr[7]};if [ ${thisHistID}x != ${lastHistID}x ];then echo -E `date "+%Y/%m/%d %H:%M:%S"` $user\($realUser\)@$ip[PID:$pid][LOGIN:$logMonth $logDay $logTime] --- $lastCommand ;lastHistID=$thisHistID;fi; } >> $HISTORY_FILE'

执行source /etc/bashrc使配置生效。新建history.log文件并赋权

[root@rhserver ~]# cd /var/log
[root@rhserver ~]# touch history.log
[root@rhserver ~]# chmod 777 history.log

再执行一些命令查看ssh_opt*匹配到数据刚执行的测试命令

相关文章
相关标签/搜索