1、命令行输入输出操做html
一、命令行输出:前端
/application/elk/logstash/bin/logstash -e 'input { stdin{} } output { stdout{} }'java
说明:nginx
a、stdin{}[标准输入] 正则表达式
b、stdout{}[标准输出]redis
二、以json格式展现,在logstash中等号用 => 表示docker
/application/elk/logstash/bin/logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug} }'apache
三、输出到esjson
a、要使用按照自定义方式根据当前时间生成索引的方式来输入到es必须开启 manage_template => true此参数,如使用logstash默认的logstash-%{+YYYY.MM.dd} 则能够不用打开此参数,这个问题困扰了一下午。缓存
/application/elk/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.30.41:9200"] index => "wohaoshuai-%{+YYYY.MM.dd}"} manage_template => true}'
若是不使用manage_template => true参数会报错以下:
[406] {"error":"Content-Type header [text/plain; charset=ISO-8859-1] is not supported","status":406} {:class=>"Elasticsearch::Transport::Transport::Errors::NotAcceptable", :level=>:error}
b、若是只是本身命名的index则不须要添加manage_template参数。
/application/elk/logstash/bin/logstash -e 'input { stdin{ } } output { elasticsearch { hosts => ["192.168.30.41:9200"] index => "wohaoshuaitest"} }'
四、既输出到es又输出到屏幕:
/application/elk/logstash/bin/logstash -e 'input { stdin{ } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["192.168.30.41:9200"] index => "wohaoshuaitest"} }'
五、要删除后从新生成index收集须要删除相应的记录
rm -rf /application/elk/logstash/data/plugins/inputs/file/.sincedb_*
六、nginx日志格式设置:
log_format access_log_json '{"user_ip":"$http_x_real_ip","lan_ip":"$remote_addr","log_time":"$time_iso8601","user_req":"$request","http_code":"$status","body_bytes_sent":"$body_bytes_sent","req_time":"$request_time","user_ua":"$http_user_agent"}';
七、filter
a、grok:对咱们收进来的事件进行过滤。
利用正则表达式进行匹配进行字段的拆分,所以grok提供了一下预约义的正则表达式,logstash 5.6.1相应的文件在路径 /application/elk/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns下
简单的grok案例:
下面匹配的内容为:55.3.244.1 GET /index.html 15824 0.043
input {
file {
path => "/var/log/http.log" } } filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } #这一行的意思是,将消息按照logstash提供的正则字段匹配,而后将匹配的内容的字段命名为冒号后面自定义的名字 } }
b、收集http日志,使用软件自定义的阿帕奇系统日志正则就能够,文件在/application/elk/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/httpd中
如图
其中截图中序号1的意思是引用序号2中的匹配字段,而后又引用了两次QS匹配字段,QS匹配字段在同目录下的grok-patterns中
c、debuger地址http://grokdebug.herokuapp.com(需FQ)
2、公司架构设计
一、每一个ES上面都启动一个Kibana
二、Kibana都连本身的ES
三、前端Nginx负载均衡+ ip_hash + 验证 +ACL
3、rsyslog记录
一、系统日志配置文件在/etc/rsyslog.conf中
二、配置文件中路径前面加 - 是为了避免让日志立马写到文件中而是先进行缓存,在不少系统优化中都使用到。
三、要打开系统日志收集功能须要以下操做:
a、sed -i 's/#*.* @@remote-host:514/*.* @@192.168.30.42:514/g' /etc/rsyslog.conf
b、 systemctl restart rsyslog
四、手动产生系统日志方法
logger hehe
4、tcp日志收集
一、给tcp端口发送消息方法
yum install -y nc
a、方法1:
echo "wohaoshuai" | nc 192.168.56.12 6666
b、方法2:
nc 192.168.30.42 6666 < /etc/resolv.conf
c、方法3:伪设备的方式
echo "wohaoshuai" > /dev/tcp/192.168.30.42/6666
5、收集http日志架构
6、使用elk进行日志收集需求与思路
一、需求分析:
a、访问日志: apache访问日志、nginx访问日志、tomcat file - filter
b、错误日志:error log 、java日志 只接收,java异常须要处理
c、系统日志:/var/log/* syslog syslog,rsyslog
d、运行日志:程序写的 file,json
e、网络日志:防火墙,交换机,路由器的日志 syslog
二、标准化:日志放哪里 (/application/logs),格式是什么(JSON),命名规则 access_log error_log runtime_log 日志怎么切割,按天 按小时。access error crontab进行切分 runtime_log,全部的原始文本 rsync到NAS(文件服务器)后删除最近三天前的。
三、工具化:如何使用logstash进行收集方案
四、若是使用redis list 做为ELKstack的消息队列,那么请对全部list key的长度进行监控 llen key_name
a、根据实际状况,例如超过10万就报警。
7、相应logstash配置文件
一、stdin调试
input{
stdin{}
}
filter{
}
output{
#elasticsearch plugin
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "log-%{+YYYY.MM.dd}"
manage_template => true
}
stdout{
codec => rubydebug
}
}
二、file插件
input{
file{
path => ["/var/log/messages","/var/log/secure"]
#type => "system-log"
start_position => "beginning"
}
}
filter{
}
output{
#elasticsearch plugin
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "system-log-%{+YYYY.MM.dd}"
manage_template => true
}
stdout{
codec => rubydebug
}
}
三、使用type判断
input{
file{
path => ["/var/log/messages","/var/log/secure"]
type => "system-log"
start_position => "beginning"
}
file{
path => ["/application/elk/elasticsearch/logs/elk-elasticsearch.log"]
type => "es-log"
start_position => "beginning"
}
}
filter{
}
output{
#elasticsearch plugin
if [type] == "system-log"
{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "system-log-%{+YYYY.MM.dd}"
manage_template => true
}
}
if [type] == "es-log"
{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "es-log-%{+YYYY.MM.dd}"
manage_template => true
}
}
stdout{
codec => rubydebug
}
}
四、收集某个目录下的全部日志
input{
file{
path => ["/var/log/messages","/var/log/secure"]
type => "system-log"
start_position => "beginning"
}
file{
path => ["/application/elk/elasticsearch/logs/elk-elasticsearch.log"]
type => "es-log"
start_position => "beginning"
}
file{
path => ["/application/elk/elasticsearch/logs/**/*.log"]
type => "docker-log"
start_position => "beginning"
}
}
filter{
}
output{
#elasticsearch plugin
if [type] == "system-log"
{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "system-log-%{+YYYY.MM.dd}"
manage_template => true
}
}
if [type] == "es-log"
{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "es-log-%{+YYYY.MM.dd}"
manage_template => true
}
}
if [type] == "docker-log"
{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "docker-log-%{+YYYY.MM.dd}"
manage_template => true
}
}
stdout{
codec => rubydebug
}
}
五、匹配与合并
input{
stdin {
codec => multiline
{
pattern => "^\[" #匹配这个正则
negate => true #匹配到这个正则后,能够为true或false
what => "previous" #和上面这一行合并起来。 还有一个值为next,和下面这一行合并起来.
}
}
}
filter{
}
output{
stdout{
codec => rubydebug
}
}
六、综合后写入es
input{
file{
path => ["/var/log/messages","/var/log/secure"]
type => "system-log"
start_position => "beginning"
}
file{
path => ["/application/elk/elasticsearch/logs/elk-elasticsearch.log"]
type => "es-log"
start_position => "beginning"
}
file{
path => ["/application/elk/elasticsearch/logs/containers/**/*.log"]
type => "docker-log"
start_position => "beginning"
codec => multiline
{
pattern => "^\{" #匹配这个正则
negate => true #匹配到这个正则后,能够为true或false
what => "previous" #和上面这一行合并起来。 还有一个值为next,和下面这一行合并起来.
}
}
}
filter{
}
output{
#elasticsearch plugin
if [type] == "system-log"
{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "system-log-%{+YYYY.MM.dd}"
manage_template => true
}
}
if [type] == "es-log"
{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "es-log-%{+YYYY.MM.dd}"
manage_template => true
}
}
if [type] == "docker-log"
{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "docker-log-%{+YYYY.MM.dd}"
manage_template => true
}
}
stdout{
codec => rubydebug
}
}
七、收集nginx日志,并转换成json格式输出到es,nginx日志格式见本章 一.6
input{
file{
path => ["/var/log/nginx/access_log_json.log"]
start_position => "beginning"
codec => "json"
type => "nginx-log"
}
}
filter{
}
output{
if [type] == "nginx-log"
{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "nginx-log-%{+YYYY.MM.dd}"
manage_template => true
}
}
stdout{
codec => rubydebug
}
}
八、收集系统日志
input{
syslog{
type => "system-syslog"
port => 514
}
}
filter{
}
output{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "system-syslog-%{+YYYY.MM}"
}
stdout{
codec => rubydebug
}
}
九、收集tcp日志
input{
tcp{
type => "tcp"
port => "6666"
mode => "server" #还有一个client
}
}
filter{
}
output{
stdout{
codec => rubydebug
}
}
十、filter匹配与筛选字段
input{
stdin{
#输入内容为:55.3.244.1 GET /index.html 15824 0.043
}
}
filter{
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
output{
stdout{
codec => rubydebug
}
}
十一、使用logstash自带的匹配规则匹配http日志
input{
file {
type => "http-log"
path => "/var/log/httpd/access_log"
start_position => beginning
}
}
filter{
grok {
match => {"message" => "%{HTTPD_COMBINEDLOG}" }
}
}
output{
if [type] == "http-log"
{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "http-log-%{+YYYY.MM.dd}"
manage_template => true
}
}
stdout{
codec => rubydebug
}
}
十二、获取输入信息到redis
input{
stdin{}
}
output{
redis{
host => "192.168.30.42"
port => "6379"
db => "6"
data_type => "list"
key => "demo"
}
stdout{
codec => rubydebug
}
}
1三、收集http日志到redis
input{
file {
type => "http-log"
path => "/var/log/httpd/access_log"
start_position => beginning
}
}
output{
redis{
host => "192.168.30.42"
port => "6379"
db => "6"
data_type => "list"
key => "apache-accesslog"
}
stdout{
codec => rubydebug
}
}
1四、获取redis日志到es
input{
redis{
host => "192.168.30.42"
port => "6379"
db => "6"
data_type => "list"
key => "apache-accesslog"
}
}
filter{
grok {
match => {"message" => "%{HTTPD_COMBINEDLOG}" }
}
}
output{
elasticsearch{
hosts => ["192.168.30.41:9200"]
index => "redis-log-%{+YYYY.MM.dd}"
manage_template => true
}
stdout{
codec => rubydebug
}
}