ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并做为Apache许可条款下的开放源码发布,是第二流行的企业搜索引擎。设计用于云计算中,可以达到实时搜索,稳定,可靠,快速,安装使用方便。php
LogStash由JRuby语言编写,基于消息(message-based)的简单架构,并运行在Java虚拟机(JVM)上。不一样于分离的代理端(agent)或主机端(server),LogStash可配置单一的代理端(agent)与其它开源软件结合,以实现不一样的功能。java
Logstash是一个彻底开源的工具,他能够对你的日志进行收集、分析,并将其存储供之后使用(如,搜索),您可使用它。说到搜索,logstash带有一个web界面,搜索和展现全部日志。node
两台虚拟机:
hostname:linux-node1和linux-node2
ip地址:192.168.56.11和192.168.56.22mysql
[root@linux-node2 ~]# cat /etc/redhat-release
CentOS Linux release 7.1.1503 (Core)
[root@linux-node2 ~]# uname -a
Linux linux-node2 3.10.0-229.el7.x86_64 #1 SMP Fri Mar 6 11:36:42 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
[root@linux-node2 ~]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.56.11 linux-node1.oldboyedu.com linux-node1
192.168.56.12 linux-node2.oldboyedu.com linux-node2
下载并安装GPG keylinux
[root@linux-node2 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
添加yum仓库nginx
[root@linux-node2 ~]# vim /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-2.x]
name=Elasticsearch repository for 2.x packages
baseurl=http://packages.elastic.co/elasticsearch/2.x/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
安装elasticsearchgit
[root@hadoop-node2 ~]# yum install -y elasticsearch
下载并安装GPG keygithub
[root@linux-node2 ~]# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
添加yum仓库web
[root@linux-node2 ~]# vim /etc/yum.repos.d/logstash.repo
[logstash-2.1]
name=Logstash repository for 2.1.x packages
baseurl=http://packages.elastic.co/logstash/2.1/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
安装logstash正则表达式
[root@linux-node2 ~]# yum install -y logstash
安装kibana
[root@linux-node2 ~]#cd /usr/local/src
[root@linux-node2 ~]#wget https://download.elastic.co/kibana/kibana/kibana-4.3.1-linux-x64.tar.gz
tar zxf kibana-4.3.1-linux-x64.tar.gz
[root@linux-node1 src]# mv kibana-4.3.1-linux-x64 /usr/local/
[root@linux-node2 src]# ln -s /usr/local/kibana-4.3.1-linux-x64/ /usr/local/kibana
安装Redis,nginx和java
[root@linux-node2 ~]#yum install -y redis nginx java
修改elasticsearch配置文件,并受权
[root@linux-node1 src]# grep -n '^[a-Z]' /etc/elasticsearch/elasticsearch.yml
17:cluster.name: chuck-cluster 判别节点是不是统一集群
23:node.name: linux-node1 节点的hostname
33:path.data: /data/es-data 数据存放路径
37:path.logs: /var/log/elasticsearch/ 日志路径
43:bootstrap.mlockall: true 锁住内存,使内存不会再swap中使用
54:network.host: 0.0.0.0 容许访问的ip
58:http.port: 9200 端口
[root@linux-node1 ~]# mkdir -p /data/es-data
[root@linux-node1 src]# chown elasticsearch.elasticsearch /data/es-data/
启动elasticsearch
[root@linux-node1 src]# systemctl start elasticsearch
[root@linux-node1 src]# systemctl enable elasticsearch
ln -s '/usr/lib/systemd/system/elasticsearch.service' '/etc/systemd/system/multi-user.target.wants/elasticsearch.service'
[root@linux-node1 src]# systemctl status elasticsearch
elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; enabled)
Active: active (running) since Thu 2016-01-14 09:30:25 CST; 14s ago
Docs: http://www.elastic.co
Main PID: 37954 (java)
CGroup: /system.slice/elasticsearch.service
└─37954 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConc...
Jan 14 09:30:25 linux-node1 systemd[1]: Starting Elasticsearch...
Jan 14 09:30:25 linux-node1 systemd[1]: Started Elasticsearch.
[root@linux-node1 src]# netstat -lntup|grep 9200
tcp6 0 0 :::9200 :::* LISTEN 37954/java
访问9200端口,会把信息显示出来
查看当前索引和分片状况,稍后会有插件展现
[root@linux-node1 src]# curl -i -XGET 'http://192.168.56.11:9200/_count?pretty' -d '{
"query" {
"match_all": {}
}
}'
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 95
{
"count" : 0, 索引0个
"_shards" : { 分区0个
"total" : 0,
"successful" : 0, 成功0个
"failed" : 0 失败0个
}
}
使用head插件显示索引和分片状况
[root@linux-node1 src]# /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head
在插件中添加一个index-demo/test的索引,提交请求
发送一个GET(固然可使用其余类型请求)请求,查询上述索引id
在基本查询中查看所建索引
将linux-node1的配置文件拷贝到linux-node2中,并修改配置文件并受权
配置文件中cluster.name的名字必定要一致,当集群内节点启动的时候,默认使用组播(多播),寻找集群中的节点
[root@linux-node1 src]# scp /etc/elasticsearch/elasticsearch.yml 192.168.56.12:/etc/elasticsearch/elasticsearch.yml
[root@linux-node2 elasticsearch]# sed -i '23s#node.name: linux-node1#node.name: linux-node2#g' elasticsearch.yml
[root@linux-node2 elasticsearch]# mkdir -p /data/es-data
[root@linux-node2 elasticsearch]# chown elasticsearch.elasticsearch /data/es-data/
启动elasticsearch
[root@linux-node2 elasticsearch]# systemctl enable elasticsearch.service
ln -s '/usr/lib/systemd/system/elasticsearch.service' '/etc/systemd/system/multi-user.target.wants/elasticsearch.service'
[root@linux-node2 elasticsearch]# systemctl start elasticsearch.service
[root@linux-node2 elasticsearch]# systemctl status elasticsearch.service
elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; enabled)
Active: active (running) since Thu 2016-01-14 02:56:35 CST; 4s ago
Docs: http://www.elastic.co
Process: 38519 ExecStartPre=/usr/share/elasticsearch/bin/elasticsearch-systemd-pre-exec (code=exited, status=0/SUCCESS)
Main PID: 38520 (java)
CGroup: /system.slice/elasticsearch.service
└─38520 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConc...
Jan 14 02:56:35 linux-node2 systemd[1]: Starting Elasticsearch...
Jan 14 02:56:35 linux-node2 systemd[1]: Started Elasticsearch.
在linux-node2配置中添加以下内容,使用单播模式(尝试了使用组播,可是不生效)
[root@linux-node1 ~]# grep -n "^discovery" /etc/elasticsearch/elasticsearch.yml
79:discovery.zen.ping.unicast.hosts: ["linux-node1", "linux-node2"]
[root@linux-node1 ~]# systemctl restart elasticsearch.service
在浏览器中查看分片信息,一个索引默认被分红了5个分片,每份数据被分红了五个分片(能够调节分片数量),下图中外围带绿色框的为主分片,不带框的为副本分片,主分片丢失,副本分片会复制一份成为主分片,起到了高可用的做用,主副分片也可使用负载均衡加快查询速度,可是若是主副本分片都丢失,则索引就是完全丢失。
[root@linux-node1 bin]# /usr/share/elasticsearch/bin/plugin install lmenezes/elasticsearch-kopf
从下图能够看出节点的负载,cpu适应状况,java对内存的使用(heap usage),磁盘使用,启动时间
除此以外,kopf插件还提供了REST API 等,相似kopf插件的还有bigdesk,可是bigdesk目前还不支持2.1!!!安装bigdesk的方法以下
/usr/share/elasticsearch/bin/plugin install lukas-vlcek/bigdesk
当第一个节点启动,它会组播发现其余节点,发现集群名字同样的时候,就会自动加入集群。随便一个节点都是能够链接的,并非主节点才能够链接,链接的节点起到的做用只是汇总信息展现
最初能够自定义设置分片的个数,分片一旦设置好,就不能够改变。主分片和副本分片都丢失,数据即丢失,没法恢复,能够将无用索引删除。有些老索引或者不经常使用的索引须要按期删除,不然会致使es资源剩余有限,占用磁盘大,搜索慢等。若是暂时不想删除有些索引,能够在插件中关闭索引,就不会占用内存了。
启动一个logstash,-e:在命令行执行;input输入,stdin标准输入,是一个插件;output输出,stdout:标准输出
[root@linux-node1 bin]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{} }' Settings: Default filter workers: 1
Logstash startup completed
chuck ==>输入
2016-01-14T06:01:07.184Z linux-node1 chuck ==>输出
www.chuck-blog.com ==>输入
2016-01-14T06:01:18.581Z linux-node1 www.chuck-blog.com ==>输出
使用rubudebug显示详细输出,codec为一种编解码器
[root@linux-node1 bin]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug} }'
Settings: Default filter workers: 1
Logstash startup completed
chuck ==>输入
{
"message" => "chuck",
"@version" => "1",
"@timestamp" => "2016-01-14T06:07:50.117Z",
"host" => "linux-node1"
} ==>使用rubydebug输出
上述每一条输出的内容称为一个事件,多个相同的输出的内容合并到一块儿称为一个事件(举例:日志中连续相同的日志输出称为一个事件)!
使用logstash将信息写入到elasticsearch
[root@linux-node1 bin]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.56.11:9200"] } }'
Settings: Default filter workers: 1
Logstash startup completed
maliang
chuck
chuck-blog.com
www.chuck-bllog.com
在elasticsearch中查看logstash新加的索引
在elasticsearch中写一份,同时在本地输出一份,也就是在本地保留一份文本文件,也就不用在elasticsearch中再定时备份到远端一份了。此处使用的保留文本文件三大优点:1)文本最简单 2)文本能够二次加工 3)文本的压缩比最高
[root@linux-node1 bin]# /opt/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.56.11:9200"] } stdout{ codec => rubydebug } }'
Settings: Default filter workers: 1
Logstash startup completed
www.google.com
{
"message" => "www.google.com",
"@version" => "1",
"@timestamp" => "2016-01-14T06:27:49.014Z",
"host" => "linux-node1"
}
www.elastic.co
{
"message" => "www.elastic.co",
"@version" => "1",
"@timestamp" => "2016-01-14T06:27:58.058Z",
"host" => "linux-node1"
}
使用logstash启动一个配置文件,会在elasticsearch中写一份
[root@linux-node1 ~]# cat normal.conf
input { stdin { } }
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}
[root@linux-node1 ~]# /opt/logstash/bin/logstash -f normal.conf
Settings: Default filter workers: 1
Logstash startup completed
123
{
"message" => "123",
"@version" => "1",
"@timestamp" => "2016-01-14T06:51:13.411Z",
"host" => "linux-node1
input {
file {
path => "/var/log/messages"
type => "syslog"
}
file {
path => "/var/log/apache/access.log"
type => "apache"
}
}
path => ["/var/log/messages","/var/log/*.log"]
path => ["/data/mysql/mysql.log"]
ssl_enable => true
my_bytes => "1113" # 1113 bytes
my_bytes => "10MiB" # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes
match => {
"field1" => "value1"
"field2" => "value2"
...
}
port => 33
my_password => "password"
sincedb_path:记录logstash读取位置的路径
start_postion :包括beginning和end,指定收集的位置,默认是end,从尾部开始
add_field 加一个域
discover_internal 发现间隔,每隔多久收集一次,默认15秒
[root@linux-node1 ~]# cat system.conf
input {
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
}
output {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
[root@linux-node1 ~]# /opt/logstash/bin/logstash -f system.conf
此处把上个system日志和这个error(java程序日志)日志,放在一块儿。使用if判断,两种日志分别写到不一样索引中.此处的type(固定的就是type,不可更改)不能够和日志格式的任何一个域(能够理解为字段)的名称重复,也就是说日志的域不能够有type这个名称。
[root@linux-node1 ~]# cat all.conf
input {
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
file {
path => "/var/log/elasticsearch/chuck-cluster.log"
type => "es-error"
start_position => "beginning"
}
}
output {
if [type] == "system" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
if [type] == "es-error" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
}
[root@linux-node1 ~]# /opt/logstash/bin/logstash -f all.conf
以at.org开头的内容都属于同一个事件,可是显示在不一样行,这样的日志格式看起来很不方便,因此须要把他们合并到一个事件中
官方文档提供
input {
stdin {
codec => multiline {
` pattern => "pattern, a regexp"
negate => "true" or "false"
what => "previous" or "next"`
}
}
}
regrxp:使用正则,什么状况下把多行合并起来
negate:正向匹配和反向匹配
what:合并到当前行仍是下一行
在标准输入和标准输出中测试以证实多行收集到一个日志成功
[root@linux-node1 ~]# cat muliline.conf
input {
stdin {
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
output {
stdout {
codec => "rubydebug"
}
}
[root@linux-node1 ~]# /opt/logstash/bin/logstash -f muliline.conf
Settings: Default filter workers: 1
Logstash startup completed
[1
[2
{
"@timestamp" => "2016-01-15T06:46:10.712Z",
"message" => "[1",
"@version" => "1",
"host" => "linux-node1"
}
chuck
chuck-blog.com
123456
[3
{
"@timestamp" => "2016-01-15T06:46:16.306Z",
"message" => "[2\nchuck\nchuck-bloh\nchuck-blog.com\n123456",
"@version" => "1",
"tags" => [
[0] "multiline"
],
"host" => "linux-node1"
继续将上述实验结果放到all.conf的es-error索引中
[root@linux-node1 ~]# cat all.conf
input {
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
file {
path => "/var/log/elasticsearch/chuck-clueser.log"
type => "es-error"
start_position => "beginning"
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
output {
if [type] == "system" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
if [type] == "es-error" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
}
[root@linux-node1 ~]# grep '^[a-Z]' /usr/local/kibana/config/kibana.yml
server.port: 5601 kibana端口
server.host: "0.0.0.0" 对外服务的主机
elasticsearch.url: "http://192.168.56.11:9200" 和elasticsearch练习
kibana.index: ".kibana 在elasticsearch中添加.kibana索引
一个screen,并启动kibana
[root@linux-node1 ~]# screen
[root@linux-node1 ~]# /usr/local/kibana/bin/kibana
使用crtl +a+d退出screen
使用浏览器打开192.168.56.11:5601
在kibana中添加一个es-error索引
能够看到默认的字段
选择discover查看
验证error的muliline插件生效
在这里使用codec的json插件将日志的域进行分段,使用key-value的方式,使日志格式更清晰,易于搜索,还能够下降cpu的负载
更改nginx的配置文件的日志格式,使用json
[root@linux-node1 ~]# sed -n '15,33p' /etc/nginx/nginx.conf
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
log_format json '{ "@timestamp": "$time_local", '
'"@fields": { '
'"remote_addr": "$remote_addr", '
'"remote_user": "$remote_user", '
'"body_bytes_sent": "$body_bytes_sent", '
'"request_time": "$request_time", '
'"status": "$status", '
'"request": "$request", '
'"request_method": "$request_method", '
'"http_referrer": "$http_referer", '
'"body_bytes_sent":"$body_bytes_sent", '
'"http_x_forwarded_for": "$http_x_forwarded_for", '
'"http_user_agent": "$http_user_agent" } }';
# access_log /var/log/nginx/access_json.log main;
access_log /var/log/nginx/access.log json;
启动nginx
[root@linux-node1 ~]# nginx -t
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful
[root@linux-node1 ~]# nginx
[root@linux-node1 ~]# netstat -lntup|grep 80
tcp 0 0 0.0.0.0:80 0.0.0.0:* LISTEN 43738/nginx: master
tcp6 0 0 :::80 :::* LISTEN 43738/nginx: master
日志格式显示以下
使用logstash将nginx访问日志收集起来,继续写到all.conf中
将nginx-log加入kibana中并显示
前文中已经使用文件file的形式收集了系统日志/var/log/messages,可是实际生产环境是须要使用syslog插件直接收集
修改syslog的配置文件,把日志信息发送到514端口上
[root@linux-node1 ~]# vim /etc/rsyslog.conf
90 *.* @@192.168.56.11:514
将system-syslog放到all.conf中,启动all.conf
[root@linux-node1 ~]# cat all.conf
input {
syslog {
type => "system-syslog"
host => "192.168.56.11"
port => "514"
}
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
file {
path => "/var/log/nginx/access_json.log"
codec => json
start_position => "beginning"
type => "nginx-log"
}
file {
path => "/var/log/elasticsearch/chuck-cluster.log"
type => "es-error"
start_position => "beginning"
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
output {
if [type] == "system" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
if [type] == "es-error" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "es-error-%{+YYYY.MM.dd}"
}
}
if [type] == "nginx-log" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "nginx-log-%{+YYYY.MM.dd}"
}
}
if [type] == "system-syslog" {
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "system-syslog-%{+YYYY.MM.dd}"
}
}
}
[root@linux-node1 ~]# /opt/logstash/bin/logstash -f all.conf
在elasticsearch插件中就可见到增长的system-syslog索引
编写tcp.conf
[root@linux-node1 ~]# cat tcp.conf
input {
tcp {
host => "192.168.56.11"
port => "6666"
}
}
output {
stdout {
codec => "rubydebug"
}
}
使用nc对6666端口写入数据
[root@linux-node1 ~]# nc 192.168.56.11 6666 </var/log/yum.log
将信息输入到tcp的伪设备中
[root@linux-node1 ~]# echo "chuck" >/dev/tcp/192.168.56.11/6666
数据源Datasource把数据写到input插件中,output插件使用消息队列把消息写入到消息队列Message Queue中,Logstash indexing Instance启动logstash使用input插件读取消息队列中的信息,Fliter插件过滤后在使用output写入到elasticsearch中。
若是生产环境中不适用正则grok匹配,能够写Python脚本从消息队列中读取信息,输出到elasticsearch中
修改redis的配置文件并启动redis
[root@linux-node1 ~]# vim /etc/redis.conf
37 daemonize yes
65 bind 192.168.56.11
[root@linux-node1 ~]# systemctl start redis
[root@linux-node1 ~]# netstat -lntup|grep 6379
tcp 0 0 192.168.56.11:6379 0.0.0.0:* LISTEN 45270/redis-server
编写redis.conf
[root@linux-node1 ~]# cat redis-out.conf
input{
stdin{
}
}
output{
redis{
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list" # 数据类型为list
key => "demo"
}
启动配置文件输入信息
[root@linux-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf
Settings: Default filter workers: 1
Logstash startup completed
chuck
chuck-blog
使用redis-cli链接到redis并查看输入的信息
[root@linux-node1 ~]# redis-cli -h 192.168.56.11
192.168.56.11:6379> info #输入info查看信息
# Server
redis_version:2.8.19
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:c0359e7aa3798aa2
redis_mode:standalone
os:Linux 3.10.0-229.el7.x86_64 x86_64
arch_bits:64
multiplexing_api:epoll
gcc_version:4.8.3
process_id:45270
run_id:83f428b96e87b7354249fe42bd19ee8a8643c94e
tcp_port:6379
uptime_in_seconds:1111
uptime_in_days:0
hz:10
lru_clock:10271973
config_file:/etc/redis.conf
# Clients
connected_clients:2
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0
# Memory
used_memory:832048
used_memory_human:812.55K
used_memory_rss:5193728
used_memory_peak:832048
used_memory_peak_human:812.55K
used_memory_lua:35840
mem_fragmentation_ratio:6.24
mem_allocator:jemalloc-3.6.0
# Persistence
loading:0
rdb_changes_since_last_save:0
rdb_bgsave_in_progress:0
rdb_last_save_time:1453112484
rdb_last_bgsave_status:ok
rdb_last_bgsave_time_sec:0
rdb_current_bgsave_time_sec:-1
aof_enabled:0
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok
# Stats
total_connections_received:2
total_commands_processed:2
instantaneous_ops_per_sec:0
total_net_input_bytes:164
total_net_output_bytes:9
instantaneous_input_kbps:0.00
instantaneous_output_kbps:0.00
rejected_connections:0
sync_full:0
sync_partial_ok:0
sync_partial_err:0
expired_keys:0
evicted_keys:0
keyspace_hits:0
keyspace_misses:0
pubsub_channels:0
pubsub_patterns:0
latest_fork_usec:9722
# Replication
role:master
connected_slaves:0
master_repl_offset:0
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
# CPU
used_cpu_sys:1.95
used_cpu_user:0.40
used_cpu_sys_children:0.00
used_cpu_user_children:0.00
# Keyspace
db6:keys=1,expires=0,avg_ttl=0
192.168.56.11:6379> select 6 #选择db6
OK
192.168.56.11:6379[6]> keys * #选择demo这个key
1) "demo"
192.168.56.11:6379[6]> LINDEX demo -2 #查看消息
"{\"message\":\"chuck\",\"@version\":\"1\",\"@timestamp\":\"2016-01-18T10:21:23.583Z\",\"host\":\"linux-node1\"}"
192.168.56.11:6379[6]> LINDEX demo -1 #查看消息
"{\"message\":\"chuck-blog\",\"@version\":\"1\",\"@timestamp\":\"2016-01-18T10:25:54.523Z\",\"host\":\"linux-node1\"}"
为了下一步写input插件到把消息发送到elasticsearch中,多在redis中写入写数据
[root@linux-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf
Settings: Default filter workers: 1
Logstash startup completed
chuck
chuck-blog
a
b
c
d
e
f
g
h
i
j
k
l
m
n
o
p
q
r
s
t
u
v
w
x
y
z
查看redis中名字为demo的key长度
192.168.56.11:6379[6]> llen demo
(integer) 28
编写redis-in.conf
[root@linux-node1 ~]# cat redis-in.conf
input{
redis {
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list"
key => "demo"
}
}
output{
elasticsearch {
hosts => ["192.168.56.11:9200"]
index => "redis-demo-%{+YYYY.MM.dd}"
}
}
启动配置文件
[root@linux-node1 ~]# /opt/logstash/bin/logstash -f redis-in.conf
Settings: Default filter workers: 1
Logstash startup completed
不断刷新demo这个key的长度(读取很快,刷新必定要速度)
192.168.56.11:6379[6]> llen demo
(integer) 28
192.168.56.11:6379[6]> llen demo
(integer) 28
192.168.56.11:6379[6]> llen demo
(integer) 19 #能够看到redis的消息正在写入到elasticsearch中
192.168.56.11:6379[6]> llen demo
(integer) 7 #能够看到redis的消息正在写入到elasticsearch中
192.168.56.11:6379[6]> llen demo
(integer) 0
在elasticsearch中查看增长了redis-demo
编写shipper.conf做为redis收集logstash配置文件
[root@linux-node1 ~]# cp all.conf shipper.conf
[root@linux-node1 ~]# vim shipper.conf
input {
syslog {
type => "system-syslog"
host => "192.168.56.11"
port => "514"
}
tcp {
type => "tcp-6666"
host => "192.168.56.11"
port => "6666"
}
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
file {
path => "/var/log/nginx/access_json.log"
codec => json
start_position => "beginning"
type => "nginx-log"
}
file {
path => "/var/log/elasticsearch/chuck-cluster.log"
type => "es-error"
start_position => "beginning"
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}
output {
if [type] == "system" {
redis {
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list"
key => "system"
}
}
if [type] == "es-error" {
redis {
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list"
key => "es-error"
}
}
if [type] == "nginx-log" {
redis {
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list"
key => "nginx-log"
}
}
if [type] == "system-syslog" {
redis {
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list"
key => "system-syslog"
}
}
if [type] == "tcp-6666" {
redis {
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list"
key => "tcp-6666"
}
}
}
在redis中查看keys
192.168.56.11:6379[6]> select 6
OK
192.168.56.11:6379[6]> keys *
1) "system"
2) "nginx-log"
3) "tcp-6666"
编写indexer.conf做为redis发送elasticsearch配置文件
[root@linux-node1 ~]# cat indexer.conf
input {
redis {
type => "system-syslog"
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list"
key => "system-syslog"
}
redis {
type => "tcp-6666"
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list"
key => "tcp-6666"
}
redis {
type => "system"
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list"
key => "system"
}
redis {
type => "nginx-log"
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list"
key => "nginx-log"
}
redis {
type => "es-error"
host => "192.168.56.11"
port => "6379"
db => "6"
data_type => "list"
key => "es-error"
}
}
output {
if [type] == "system" {
elasticsearch {
hosts => "192.168.56.11"
index => "system-%{+YYYY.MM.dd}"
}
}
if [type] == "es-error" {
elasticsearch {
hosts => "192.168.56.11"
index => "es-error-%{+YYYY.MM.dd}"
}
}
if [type] == "nginx-log" {
elasticsearch {
hosts => "192.168.56.11"
index => "nginx-log-%{+YYYY.MM.dd}"
}
}
if [type] == "system-syslog" {
elasticsearch {
hosts => "192.168.56.11"
index => "system-syslog-%{+YYYY.MM.dd}"
}
}
if [type] == "tcp-6666" {
elasticsearch {
hosts => "192.168.56.11"
index => "tcp-6666-%{+YYYY.MM.dd}"
}
}
}
启动shipper.conf
[root@linux-node1 ~]# /opt/logstash/bin/logstash -f shipper.conf
Settings: Default filter workers: 1
因为日志量小,很快就会所有被发送到elasticsearch,key也就没了,因此多写写数据到日志中
[root@linux-node1 ~]# for n in `seq 10000` ;do echo $n >>/var/log/elasticsearch/chuck-cluster.log;done
[root@linux-node1 ~]# for n in `seq 10000` ;do echo $n >>/var/log/nginx/access_json.log;done
[root@linux-node1 ~]# for n in `seq 10000` ;do echo $n >>/var/log/messages;done
查看key的长度看到key在增加
(integer) 2481
192.168.56.11:6379[6]> llen system
(integer) 2613
192.168.56.11:6379[6]> llen system
(integer) 2795
192.168.56.11:6379[6]> llen system
(integer) 2960
启动indexer.conf
[root@linux-node1 ~]# /opt/logstash/bin/logstash -f indexer.conf
Settings: Default filter workers: 1
Logstash startup completed
查看key的长度看到key在减少
192.168.56.11:6379[6]> llen nginx-log
(integer) 9680
192.168.56.11:6379[6]> llen nginx-log
(integer) 9661
192.168.56.11:6379[6]> llen nginx-log
(integer) 9661
192.168.56.11:6379[6]> llen system
(integer) 9591
192.168.56.11:6379[6]> llen system
(integer) 9572
192.168.56.11:6379[6]> llen system
(integer) 9562
kibana查看nginx-log索引
前文学习了input和output插件,在这里学习fliter插件
filter插件有不少,在这里就学习grok插件,使用正则匹配日志里的域来拆分。在实际生产中,apache日志不支持jason,就只能使用grok插件匹配;mysql慢查询日志也是没法拆分,只能石油grok正则表达式匹配拆分。
在以下连接,github上有不少写好的grok模板,能够直接引用
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
在装好的logstash中也会有grok匹配规则,直接能够引用,路径以下
[root@linux-node1 patterns]# pwd
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns
[root@linux-node1 ~]# cat grok.conf
input {
stdin {}
}
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
output {
stdout {
codec => "rubydebug"
}
}
启动logstash,并根据官方文档提供输入,可获得拆分结果以下显示
倒入生产中mysql的slow日志,示例格式以下:
# Time: 160108 15:46:14
# User@Host: dev_select_user[dev_select_user] @ [192.168.97.86] Id: 714519
# Query_time: 1.638396 Lock_time: 0.000163 Rows_sent: 40 Rows_examined: 939155
SET timestamp=1452239174;
SELECT DATE(create_time) as day,HOUR(create_time) as h,round(avg(low_price),2) as low_price
FROM t_actual_ad_num_log WHERE create_time>='2016-01-07' and ad_num<=10
GROUP BY DATE(create_time),HOUR(create_time);
使用multiline处理,并编写slow.conf
[root@linux-node1 ~]# cat mysql-slow.conf
input{
file {
path => "/root/slow.log"
type => "mysql-slow-log"
start_position => "beginning"
codec => multiline {
pattern => "^# User@Host:"
negate => true
what => "previous"
}
}
}
filter {
# drop sleep events
grok {
match => { "message" =>"SELECT SLEEP" }
add_tag => [ "sleep_drop" ]
tag_on_failure => [] # prevent default _grokparsefailure tag on real records
}
if "sleep_drop" in [tags] {
drop {}
}
grok {
match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*" ]
}
date {
match => [ "timestamp", "UNIX" ]
remove_field => [ "timestamp" ]
}
}
output {
stdout{
codec => "rubydebug"
}
}
执行该配置文件,查看grok正则匹配结果
系统日志 rsyslog logstash syslog插件 访问日志 nginx logstash codec json 错误日志 file logstash file+ mulitline 运行日志 file logstash codec json 设备日志 syslog logstash syslog插件 debug日志 file logstash json or mulitline
1)路径固定标准化 2)格式尽可能使用json
系统日志开始->错误日志->运行日志->访问日志