Filebeat 收集日志的那些事儿

 刘伟 360云计算 node

女主宣言golang






最近由于云原生日志收集的须要,咱们打算使用Filebeat做为容器日志收集工具,并对其进行二次开发,所以笔者将谈谈 Filebeat 收集日志的那些事儿。本文不涉及过具体的源码分析,但愿经过阅读您能够了解filebeat的基本使用方法和原理,姑且算是filebeat的入门吧。json

PS:丰富的一线技术、多元化的表现形式,尽在“360云计算”,点关注哦!网络

1多线程

前言并发

开源日志收集组件众多,之因此选择Filebeat,主要基于如下几点:app

  • 功能上能知足咱们的需求:收集磁盘日志文件,发送到Kafka集群;支持多行收集和自定义字段等;jvm

  • 性能上相比运行于jvm上的logstash和flume优点明显;ide

  • Filebeat基于golang 技术栈,二次开发对于咱们来讲有必定的技术积累;工具

  • 部署方便,没有第三方依赖;

2

Filebeat 能作什么

简单来讲Filebeat就是数据的搬运工,只不过除了搬运还能够对数据做一些深加工,为业务增长一些附加值。

  • Filebeat能够从多种不一样的上游input 中接受须要收集的数据,其中咱们最经常使用的就是 log input,即从日志中收集数据;

  • Filebeat对收集来的数据进行加工,好比:多行合并,增长业务自定义字段,json等格式的encode; 

  • Filebeat将加工好的数据发送到被称为output的下游,其中咱们最经常使用的就是 Elasticsearch 和 Kafka;

  • Filebeat具备ACK反馈确认机制,即成功发送到output后,会将当前进度反馈给input, 这样在进程重启后能够断点续传;

  • Filebeat在发送output失败后,会启动retry机制,和上一次ACK反馈确认机制一块儿,保证了每次消息至少发送一次的语义;

  • Filebeat在发送output时,因为网络等缘由发生阻塞,则在input上游端会减慢收集,自适应匹配下游output的状态。


图片

一图以蔽之。

3

Filebeat 背后的“老大”

说到Filebeat,它其实只是 beats 家族众多成员中的一个。除了Filebeat, 还有不少其余的beat小伙伴:

beat
功能
Filebeat 收集日志文件
Metricbeat 收集各类指标数据
Packetbeat 收集网络数据包
Auditbeat 收集审计数据
Heartbeat 收集服务运行状态监测数据
...
...

若是你愿意的话,你也能够按照beat的规范来写本身的beat。

能实现以上这些beat,都离不开beats家族真正的“老大”—— libbeat, 它是beat体系的核心库。咱们接下来看一下libbeat到底都作了些什么:

  • libbeat提供了publisher组件,用于对接input;

  • 收集到的数据在进入到libbeat后,首先会通过各类 processor的加工处理,好比过滤添加字段,多行合并等等;

  • input组件经过publisher组件将收集到的数据推送到publisher内部的队列;

  • libbeat自己实现了前面介绍过的多种output, 所以它负责将处理好的数据经过output组件发送出去;

  • libbeat自己封装了retry的逻辑;

  • libbeat负责将ACK反馈经过到input组件 ;

因而可知,大部分活儿都是libbeat来作,当“老大”不容易啊~。

input仅须要两件事:

  • 从不一样的介质中收集数据后投递给libbeat; 

  • 接收libbeat反馈回来的ACK, 做相应的持久化;

4

Filebeat 的简单使用示例

Filebeat自己的使用很简单,咱们只须要按需写好相应的input和output配置就行了。下面咱们以一个收集磁盘日志文件到Kafka集群的例子来说一下。

1. 配置inputs.d目录

在filebeat.yml添加以下配置,这样咱们能够将每一种等收集的路径写在单独的配置文件里,而后将这些配置文件统一放到inputs.d目录,方便管理

filebeat.config.inputs:enabled: truepath: inputs.d/*.yml

2. 在inputs.d目录下建立test1.yml,内容以下

  - type: log                       # Change to true to enable t    enabled: true                   # Paths that should be crawl    paths:                            - /home/lw/test/filebeat/*.log    fields:                       log_topic: lw_filebeat_t_2

这个配置说明会收集全部匹配/home/lw/test/filebeat/*.log的文件内容,而且咱们添加了一个自定义的filed:  log_topic: lw_filebeat_t_2, 这个咱们后面会讲到。

3. 在filebeat.yml中配置kafka output:

output.kafka:                                                                hosts: ["xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092"] version: 0.9.0.1                                                           topic: '%{[fields.log_topic]}'                                             partition.round_robin:                                                     reachable_only: true                                                     compression: none                                                          required_acks: 1                                                           max_message_bytes: 1000000                                                 codec.format:                                                                    string: '%{[host.name]}-%{[message]}'

其中:

  • hosts是kafka集群的broker list;

  • topic: '%{[fields.log_topic]}'  : 这项指定了咱们要写入kafka集群哪一个topic, 能够看到它实现上是引用了上面test.yml配置中咱们自定义的filed字段,经过这种方式咱们就能够将收集的不一样路径的数据写入到不一样的topic中,可是这个有个限制就是只能写到一个kafka集群,由于当前版本的filebeat不容许同时配置多个output。

  • codec.format: 指定了写入kafka集群的消息格式,咱们在从日志文件中读取的每行内容前面加上了当前机器的hostname。

启动就很简单了,filebeat和filebeat.yml, inputs.d都在同一目录下,而后 ./filebeat run就行了。


filebeat自己有不少全局的配置,每种input和output又有不少各自的配置,关乎日志收集的内存使用,是否是会丢失日志等方方面面,你们在使用时还须要仔细阅读,这里不赘述。


5

Log input 是如何从日志文件中收集日志的

input的建立:

  • 根据配置文件内容建立相应的Processors, 用于前面提到的对从文件中读取到的内容的加工处理;

  • 建立Acker, 用于持久化libbeat反馈回来的收集发送进度;

  • 使用libbeat提供的Pipeline.queue.Producer建立producer,用于将处理好的文件内容投递到libbeat的内部队列;

收集文件内容:

  • input会根据配置文件中的收集路径(正则匹配)来轮询是否有新文件产生,文件是否已通过期,文件是否被删除或移动;

  • 针对每个文件建立一个Harvester来逐行读取文件内容;

  • 将文件内容封装后经过producer发送到libbeat的内部队列;

image.png

处理文件重命名,删除,截断:

  • 获取文件信息时会获取文件的device id + indoe做为文件的惟一标识;

  • 前面咱们提过文件收集进度会被持久化,这样当建立Harvester时,首先会对文件做openFile, 以 device id + inode为key在持久化文件中查看当前文件是否被收集过,收集到了什么位置,而后断点续传;

  • 在读取过程当中,若是文件被截断,认为文件已经被同名覆盖,将从头开始读取文件;

  • 若是文件被删除,由于原文件已被打开,不影响继续收集,但若是设置了CloseRemoved, 则不会再继续收集;

  • 若是文件被重命名,由于原文件已被打开,不影响继续收集,但若是设置了CloseRenamed , 则不会再继续收集;

6

日志如何被发送

发送流程简述:

  • input将日志内容写入libbeat的内部队列后,剩下的事件就都交由libbeat来作了;

  • libbeat会建立consumer, 复现做libbeat的队列里消费日志event, 封装成Batch对象;

  • 针对每一个Batch对象,还会建立ack Channel, 用来将ACK反馈信息写入这个channel; 

  • Batch对象会被源源不断地写入一个叫workQueue的channel中;

  • 以kafka output为例,在创kafka output时首先会建立一个outputs.Group,它内部封装了一组kafka client, 同时启动一组goroutine; 

  • 上面建立的每一个goroutine都从workQueue队列里读取Batch对象,而后经过kafka client发送出去,这里至关于多线程并发读队列后发送;

  • 若kafka client发送成功,写入信息到ack channel, 最终会经过到input中;

  • 若kafka client发送失败,启动重试机制;

image.png

重试机制:

  • 以kafka output为例,若是msg发送失败,经过读取 ch <-chan *sarama.ProducerError能够获取到全部发送失败的msg;

  • 针对ErrInvalidMessage, ErrMessageSizeTooLarge 和 ErrInvalidMessageSize这三种错误,无需重发;

  • 被发送的 event都会封装成 Batch, 这里重发的时候也是调用Batch.RetryEevnts; 

  • 最后会调用到retryer.retry将须要从新的events再次写入到上图中黄色所示的 workQueue中,从新进入发送流程;

  • 关于重发次数,能够设置max retries, 但从代码中看这个max retries不起做用,目前会一直重试,只不过在重发次数减小到为0时,会挑选出设置了Guaranteed属性的event来发送; 

  • 若是重发的events数量过多,会暂时阻塞住从正常发送流程向workQueue中写入数据,优先发送须要重发的数据;

7

后记

 在本文里,咱们没有深刻到源码层次,为了讲清filebeat运做的原理,咱们也忽略了一些实现细节,后续将会从源码层面做进一步剖析。
相关文章
相关标签/搜索