B站视频地址:Logstash如何成为镇得住场面的数据管道nginx
公众号视频地址:Logstash如何成为镇得住场面的数据管道正则表达式
知乎视频地址:Logstash如何成为镇得住场面的数据管道json
首先咱们延续上一期视频中日志采集架构的案例,Filebeat采集日志并推送Kafka消息队列进行分发,再由Logstash消费日志消息,并将日志数据最终落地在Elasticsearch集群索引当中,Kafka做为消息队列分发服务须要将收集到的日志消息继续分发下去,最终数据落地在Elasticsearch集群索引当中。ruby
那么链接整个过程的主角Logstash是如何工做的,就是咱们今天讲解的重点。架构
Logstash工做过程分为三个部分:Input输入、Filter过滤、Output输出,它们共同协做造成了完整的Logstash数据管道传输机制elasticsearch
咱们先从一个最简单的例子演示开始,看看Logstash是怎么输入和输出的,这一次先跳过filter过滤环节。分布式
下面查看已经预置好的一个配置文件01-kafka-elastic-nginx.confide
首先是input输入配置点,从Kafka订阅消息,Kafka集群地址与filebeat中都指向了一个地址,其余配置咱们先略过,后续Kafka专题再说大数据
下来看到要订阅的Topic主题TestT3,咱们先不用json格式解码消息,默认就是纯文本的方式spa
同样的,这一步先略过过滤环节,直接看看output输出配置点,目标是给Elasticsearch输出数据,并指定了elasticsearch集群的三个节点
输出环节建立须要写入的elasticsearch日志索引,咱们先按照默认的filebeat采集时间,进行日期格式化,按照每一个小时创建一个索引,这块会有时间问题,一下子再说。
让数据输出到终端,方便咱们调试结果。
经过演示中最简单的配置方式,这时候的Logstash已经成为链接Kafka和Elastisearch之间的数据管道了!
好,接下来咱们将全部系统运行起来,并生成一条nginx请求日志,看看管道各个阶段的数据变化。
首先nginx日志数据被filebeat采集,是一条典型的无结构的文本日志数据,你们注意红色标注的时间是2021年2月21日13时
接着这条日志数据经过Kafka进入到了Logstash管道的输入阶段,
Logstash为这条日志生成了更为很是庞大的Json数据,里面包括了全部被采集主机的信息,以及nginx日志,实际上这些原始信息并无被良好的进行数据清洗与结构化
最后数据被写入到Elastisearch一个按小时划分的索引当中,对应时间为2021年2月21日5时
咱们发现Logstash对原始数据在没有任何处理的状况下,会很不方便未来数据的使用;
此次咱们利用Logstash json解码器让管道从新再来一次,
接下来咱们进入Logstash中对应的配置文件,并找到input输入点的codec配置,删掉注释,打开Logstash对输入数据的json解码方式·。
咱们看看再次进入管道中的日志数据,Logstash首先对原始日志数据进行Json解析
这时候咱们再看Json解析后的数据,是否是就清晰多了,filebeat采集到的本地机器数据、以及红色框中Nginx HTTP日志数据、以及其余标签数据都进行了字段分离
作到这一步其实仍是不够好,为何呢?一方面由于咱们依然但愿将Nginx HTTP的日志数据也进行结构化处理,
另外一个方面,Filebeat传递给Logstash的系统时间是慢了8个小时的UTC时间标准,反而Nginx日志中的时间是咱们本地的北京时间标准,所以咱们但愿用Nginx日志时间做为建立Elasticsearch日志索引的惟一依据
这时候咱们就要使用Logstash的过滤机制了,咱们继续进入Logstash对应的配置中,删掉过滤配置中的注释,让Logstash过滤最经常使用插件grok、date、ruby、mutate起做用
grok插件是专业处理非结构化数据的能手,经过自定义的Nginx日志正则表达式,就能实现Nginx日志的结构化解析
date插件用于处理时间问题,咱们经过date插件将nginx日志中的时间转换成Logstash时间对象,并赋给一个新的临时时间字段indextime
ruby就是在过滤过程当中能够插入ruby脚本语言来进行程序级处理,咱们经过ruby语言对indextime时间格式化,生成一个精确到小时的字符串字段index.date,用于elasticsearch索引名称
mutate是最经常使用的能够对管道中数据字段进行操做的插件了,咱们的目的是删除临时时间字段indextime
最后咱们还须要将output输出中的索引生成方式修改一下,注释掉原来用filebeat生默认时间生成的索引,改为nginx日志时间生成的索引。
咱们从新运行Logstash,数据通过了Input解码、日志grok结构化处理、本地时间对象建立,并进行日期格式化,为了生成新的Elasticsearch索引字段,并对临时字段进行删除,最终通过Output输出阶段,建立Elasticsearch索引或写入日志数据
让咱们看看Elasticsearch最终保存的数据效果,index索引对应的时间来自过滤器建立的index.date字段,index.date字段又来自nginx日志中分离出的本地时间。这样咱们就不用再去修改Logstash的系统时间了
咱们看到菱形标注的字段数据就是由过滤器对nginx http日志进行结构化抽取的结果,
一样elasticsearch依然保存着nginx日志的原始数据以备不时之需
前往读字节的知乎——了解更多关于大数据的知识公众号“读字节” 分布式,大数据,软件架构的深度,专业解读