强大的功能,丰富的插件,让logstash在数据处理的行列中出类拔萃html
一般日志数据除了要入ES提供实时展现和简单统计外,还须要写入大数据集群来提供更为深刻的逻辑处理,前边几篇ELK的文章介绍过利用logstash将kafka的数据写入到elasticsearch集群,这篇文章将会介绍如何经过logstash将数据写入HDFSnode
本文全部演示均基于logstash 6.6.2版本nginx
logstash默认不支持数据直接写入HDFS,官方推荐的output插件是webhdfs
,webhdfs使用HDFS提供的API将数据写入HDFS集群web
插件安装比较简单,直接使用内置命令便可json
# cd /home/opt/tools/logstash-6.6.2 # ./bin/logstash-plugin install logstash-output-webhdfs
HDFS集群内经过主机名进行通讯因此logstash所在的主机须要配置hadoop集群的hosts信息bootstrap
# cat /etc/hosts 192.168.107.154 master01 192.168.107.155 slave01 192.168.107.156 slave02 192.168.107.157 slave03
若是不配置host信息,可能会报下边的错api
[WARN ][logstash.outputs.webhdfs ] Failed to flush outgoing items
kafka里边的源日志格式能够参考这片文章:ELK日志系统之使用Rsyslog快速方便的收集Nginx日志ruby
logstash的配置以下:架构
# cat config/indexer_rsyslog_nginx.conf input { kafka { bootstrap_servers => "10.82.9.202:9092,10.82.9.203:9092,10.82.9.204:9092" topics => ["rsyslog_nginx"] codec => "json" } } filter { date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "time_local" } ruby { code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))" } ruby { code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))" } } output { webhdfs { host => "master01" port => 50070 user => "hadmin" path => "/logs/nginx/%{index.date}/%{index.hour}.log" codec => "json" } stdout { codec => rubydebug } }
logstash配置文件分为三部分:input、filter、output运维
input指定源在哪里,咱们是从kafka取数据,这里就写kafka集群的配置信息,配置解释:
filter能够对input输入的内容进行过滤或处理,例如格式化,添加字段,删除字段等等
output指定处理过的日志输出到哪里,能够是ES或者是HDFS等等,能够同时配置多个,webhdfs主要配置解释:
webhdfs还有一些其余的参数例如compression
,flush_size
,standby_host
,standby_port
等可查看官方文档了解详细用法
# bin/logstash -f config/indexer_rsyslog_nginx.conf
由于logstash配置中开了stdout
输出,因此能在控制台看到格式化的数据,以下:
{ "server_addr" => "172.18.90.17", "http_user_agent" => "Mozilla/5.0 (iPhone; CPU iPhone OS 10_2 like Mac OS X) AppleWebKit/602.3.12 (KHTML, like Gecko) Mobile/14C92 Safari/601.1 wechatdevtools/1.02.1902010 MicroMessenger/6.7.3 Language/zh_CN webview/ token/e7b92168159736c30401a55589317d8c", "remote_addr" => "172.18.101.0", "status" => 200, "http_referer" => "https://ops-coffee.cn/wx02935bb29080a7b4/devtools/page-frame.html", "upstream_response_time" => "0.056", "host" => "ops-coffee.cn", "request_uri" => "/api/community/v2/news/list", "request_time" => 0.059, "upstream_status" => "200", "@version" => "1", "http_x_forwarded_for" => "192.168.106.100", "time_local" => 2019-03-18T11:03:45.000Z, "body_bytes_sent" => 12431, "@timestamp" => 2019-03-18T11:03:45.984Z, "index.date" => "20190318", "index.hour" => "19", "request_method" => "POST", "upstream_addr" => "127.0.0.1:8181" }
查看hdfs发现数据已经按照定义好的路径正常写入
$ hadoop fs -ls /logs/nginx/20190318/19.log -rw-r--r-- 3 hadmin supergroup 7776 2019-03-18 19:07 /logs/nginx/20190318/19.log
至此kafka到hdfs数据转储完成
logstash在处理数据时会自动生成一个字段@timestamp
,默认状况下这个字段存储的是logstash收到消息的时间,使用的是UTC时区,会跟国内的时间差8小时
咱们output到ES或者HDFS时一般会使用相似于rsyslog-nginx-%{+YYYY.MM.dd}
这样的变量来动态的设置index或者文件名,方便后续的检索,这里的变量YYYY
使用的就是@timestamp
中的时间,由于时区的问题生成的index或者文件名就差8小时不是很准确,这个问题在ELK架构中由于所有都是用的UTC时间且最终kibana展现时会自动转换咱们无需关心,但这里要生成文件就须要认真对待下了
这里采用的方案是解析日志中的时间字段time_local
,而后根据日志中的时间字段添加两个新字段index.date
和index.hour
来分别标识日期和小时,在output的时候使用这两个新加的字段作变量来生成文件
logstash filter配置以下:
filter { # 匹配原始日志中的time_local字段并设置为时间字段 # time_local字段为本地时间字段,没有8小时的时间差 date { match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"] target => "time_local" } # 添加一个index.date字段,值设置为time_local的日期 ruby { code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))" } # 添加一个index.hour字段,值设置为time_local的小时 ruby { code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))" } }
output的path中配置以下
path => "/logs/nginx/%{index.date}/%{index.hour}.log"
在没有指定codec的状况下,logstash会给每一条日志添加时间和host字段,例如:
源日志格式为
ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000
通过logstash处理后多了时间和host字段
2019-03-19T06:28:07.510Z %{host} ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000
若是不须要咱们能够指定最终的format只取message,解决方法为在output中添加以下配置:
codec => line { format => "%{message}" }
文章未完,所有内容请关注公众号【运维咖啡吧】或我的网站https://ops-coffee.cn查看,运维咖啡吧专一于原创精品内容分享,感谢您的支持