filebeat 源码分析

因为业务须要,咱们要对 beats 进行二次开发。因此最近我在看它的实现。这篇文章就是对此的一段总结。node

beats是知名的ELK日志分析套件的一部分。它的前身是logstash-forwarder,用于收集日志并转发给后端(logstash、elasticsearch、redis、kafka等等)。filebeat是beats项目中的一种beats,负责收集日志文件的新增内容。 虽然标题是《Filebeat源码分析》,不过因为filebeat依赖于公共库 libbeat,本文会花一半的篇幅跟它打交道。libbeat 集合了各个 beat 会用到的内容,包括公共的配置,输出的管理等等。每一个beat专一于本身的收集工做,而后转发给libbeat进一步处理和输出。git

每一个 beat 的构建是独立的。从 filebeat 的入口文件filebeat/main.go能够看到,它向libbeat传递了名字、版本和构造函数来构造自身。跟着走到libbeat/beater/beater.go,咱们能够看到程序的启动时的主要工做都是在这里完成的,包括命令行参数的处理、通用配置项的解析,以及最为重要的:调用象征一个beat的生命周期的若干方法。github

每一个 beat 都实现了Beater接口,其中包含SetupConfigRunCleanupStop五个接口方法。redis

type Beater interface {
        Config(*Beat) error
        Setup(*Beat) error
        Run(*Beat) error
        Cleanup(*Beat) error
        Stop()
}

当libbeat用传进来的构造函数构造了一个beater时,它会依Config -> Setup -> Run -> Cleanup的顺序调用这几个方法,并将Stop注册到信号处理函数中。json

Beat是在libbeat的构造入口处定义的结构体,其功能就是存储构造过程当中的中间产物,好比解析出的配置会存储在Beat.Config,根据配置中建立的Publisher也会存在Beat.Publisher里面。后端

咱们能够跳回filebeat/,在beat/filebeat.go看到接口的具体实现。Config方法里面,filebeat只会读取跟本身名字相关的配置。Setup方法则基本没干什么。CleanupStop方法天然只是作些收尾工做。重头戏在于Run方法。api

Run方法中,filebeat逐个建立了spooler
因为业务须要,咱们要对 beats 进行二次开发。因此最近我在看它的实现(对应版本是v5.0.0-alpha4)。本文是对此的一段总结。async

beats是知名的ELK日志分析套件的一部分。它的前身是logstash-forwarder,用于收集日志并转发给后端(logstash、elasticsearch、redis、kafka等等)。filebeat是beats项目中的一种beat,负责收集日志文件的新增内容。 虽然标题是《filebeat源码分析》,不过因为filebeat依赖于公共库libbeat,本文会花一半的篇幅跟它打交道。libbeat集合了各个beat会用到的内容,包括公共的配置,输出的管理等等。每一个beat专一于本身的收集工做,而后转发给libbeat进一步处理和输出。elasticsearch

beat的构造

每一个 beat 的构建是独立的。从 filebeat 的入口文件filebeat/main.go能够看到,它向libbeat传递了名字、版本和构造函数来构造自身。跟着走到libbeat/beater/beater.go,咱们能够看到程序的启动时的主要工做都是在这里完成的,包括命令行参数的处理、通用配置项的解析,以及最为重要的:调用象征一个beat的生命周期的若干方法。函数

每一个 beat 都实现了Beater接口,其中包含SetupConfigRunCleanupStop五个接口方法。

type Beater interface {
        Config(*Beat) error
        Setup(*Beat) error
        Run(*Beat) error
        Cleanup(*Beat) error
        Stop()
}

当libbeat用传进来的构造函数构造了一个beater时,它会依Config -> Setup -> Run -> Cleanup的顺序调用这几个方法,并将Stop注册到信号处理函数中。CleanupStop其实都是作收尾工做的,只是前者是在beat退出时调用,后者是在收到SIGINT和SIGTERM时调用。

Beat是在libbeat的构造入口处定义的结构体,其功能就是存储构造过程当中的中间产物,好比解析出的配置会存储在Beat.Config,根据配置中建立的Publisher也会存在Beat.Publisher里面。

日志收集逻辑

咱们能够跳回filebeat/,在beat/filebeat.go看到接口的具体实现。Config方法里面,filebeat只会读取跟本身名字相关的配置。Setup方法则基本没干什么。CleanupStop方法天然只是作些收尾工做。重头戏在于Run方法。

Run方法中,filebeat逐个建立了registrarpublisherspoolercrawler四个组件。收集到的数据在它们间的流动方向,恰好跟建立顺序相反。

crawler负责具体的日志采集工做。它会根据配置文件启动多个prospector,每一个prospector处理一类日志文件类型,有着一组独立的配置。prospector会启动一个prospectorer干主要的活。根据默认配置,这个prospectorer会是ProspectorLog类型的。ProspectorLog类型的prospectorer会扫描目标路径下匹配的文件,根据registry里存储的状态判断每一个文件。若是以前处理过,调用harvestExisingFile;不然调用harvestNewFile。前者会判断对应的harvester是否还在运行。这两个函数都涉及到一个Harvester的建立。随便一提,registry是registrar建立以后传递给crawler的,里面是文件的处理状态记录。

对于ProspectorLog来讲,它在建立Harvester时调用的是harvester/log.go中的Harvest方法。该方法首先建立一个LineReader。其次从LogFileReader开始,根据配置,一层层套上LineEncoder、JSONProcessor、Multiline等装饰器。每一个装饰器负责本身相关的文本处理。而后,不停地调用readline从该reader中读取内容,根据读到的内容填充FileEvent的值,把FileEvent发送给spooler。注意FileEvent并不必定含有具体的日志内容,它也有可能只包含文件相关的状态信息。FileEvent也不只仅有具体的日志内容,它还包含读取事件以及其余配置相关的值。当readline时发生错误或者EOF,harvester会随之退出。prospector会每隔scan_frequency(默认10秒)以后重启对应的prospectorer,而后继续建立出harvester,继续收集的工做。本来我觉得filebeat会使用inotify这样的API,只有在文件发生变更时才去读新数据。看来它只是按期去读取上一次offset以后的数据。

以上三部分的代码分别位于crawlerprospectorharvester目录下。三者关系总结以下:

crawler-prospector-harvester

另外,harvester里面有三个目录,reader、processor、encoding,顾名思义,主要是处理一些文本层级上的细节。

如今咱们随着FileEvent(如下简称为事件)来到spooler里面。一进来,就看到事件们在这里排起了长龙。spooler接到事件后,不急着发出去,而是排进队列中。若是队列(长度取决于spool_size,默认2048)已满,调用flush方法把事件刷到publisher里面。此外配置的flush时间idle_timeout(默认5s)到时后,也会调用flush方法。

大热天的排得很焦急,终于到publisher了。publisher从spooler中接收事件,转换其类型。@timestamp和其余跟日志内容无关的tag就是在这一步打入到发送数据中的,详情看input/event.go。它接着调用PublishEvents把数据发送出去,并注册了通知函数和标志位Guaranteed。正是因为这个Guaranteed标志位,filebeat的数据会在发送时一直重试到成功为此。当数据被发送时,发送方调用通知函数,publisher知道能够把这个事件划入到“已确认”的队列中。又是一个队列!事件们领了确认表格,此次它们要排队进入registrar。

publisher的已确认队列每秒刷一次,就像游乐园的栅栏同样,一打开就有一批新的事件挤进registrar。registrar根据每一个事件的文件状态更新记录,并在处理了最后一个事件后以json格式写入到文件registry中。这个文件的内容以下:

{"/root/fnlek/logs/error.log":{"source":"/root/fnlek/logs/error.log","offset":0,"FileStateOS":{"inode":798434,"device":2049}},"/root/fnlek/test.log":{"source":"/root/fnlek/test.log","offset":211670,"FileStateOS":{"inode":798467,"device":2049}},"/root/fnlek/test1.log":{"source":"/root/fnlek/test1.log","offset":0,"FileStateOS":{"inode":798464,"device":2049}},"/root/fnlek/test2.log":{"source":"/root/fnlek/test2.log","offset":0,"FileStateOS":{"inode":798611,"device":2049}}...

正是靠这个文件作持久化,filebeat启动时才能继续前一次的工做(记得讨论prospector时提到的registry吗)。虽然不能保证每条日志仅被发送一次,但至少保证每条日志都有机会被发送。

filebeat

filebeat的内容到此结束,接下来又转到libbeat了。

日志发送逻辑

前情提要:filebeat/publisher把数据经过PublishEvents发送出去。

如今轮到libbeat/publisher干活了。每一个PublishEvents背后,都有一个默默无闻的Client进行具体的发送工做。这个Client由libbeat/publisher/publish.goConnect函数建立的,让咱们进入libbeat/publisher/client.go瞧瞧。咱们接着能够看到,这些事件(已经被打包成消息了)会被同一目录下的async.gopublish方法所发送。

真正负责发送的类型是outputWorker。不过这些worker会被makeAsyncOutput函数根据flush_intervalmax_bulk_size配置包装成BulkWorker。而后待发送的消息会被丢到一个chan message队列里面去,待flush_interval(默认1s)时间到或者消息数超过maxBulkSize(默认50)才被发出去。以后会有一次根据消息内部的events长度,对发送的消息数的切割,使得每次发送都不超过max_bulk_size。还记得吗,因为spool_size默认值是2048,不考虑超时的状况下,filebeat的一次PublishEvents至关于发送端的41次发送。

中间省略若干转发不提……

历经艰难险阻,最终会调用到outputs/mode/mode.go的PublishEvents方法,若是不涉及load banlancer,会进入同一目录下的single/single.go。如今转给实现了mode.ConnectionMode的具体Client去调用PublishEvents方法。“具体”是哪个Client呢?

各输出插件会在init的时候向outputsPlugins表注册本身的插件名和对应的初始化函数。libbeat/publisher初始化时会经过InitOutputs读取outputsPlugins表,具体读取到那个取决于output配置的值。读取到的值会用来建立具体的outputWorker,每一个outputWorker会有一个实现了mode.ConnectionMode的Client。

若是当时调用的PublishEvent,走的流程大概也是这样,不过中间就不须要有队列了。

让咱们挑一个输出插件——elasticsearch看看。代码位于outputs/elasticsearch,第一眼看上去就跟elasticsearch的客户端同样。

elasticsearch/
├── api.go
├── api_integration_test.go
├── api_mock_test.go
├── api_test.go
├── bulkapi.go
├── bulkapi_integration_test.go
├── bulkapi_mock_test.go
├── client.go
├── client_integration_test.go
├── client_test.go
├── config.go
├── enc.go
├── json_read.go
├── output.go
├── output_test.go
├── topology.go
├── url.go
└── url_test.go

elasticsearch插件的PublishEvents方法定义于outputs/elasticsearch/client.go。很简单,先判断是否处于已链接状态,若是不是,调用Connect方法。接着拼接数据,以POST /_bulk请求发送。若是配置了gzip压缩,可能会用GzipEncoder处理下。具体的HTTP/HTTPS发送逻辑由go内置的http包完成。

有趣的是,elasticsearch客户端实现的Connect方法会发送一个HEAD /的请求。尽管HTTP/HTTPS确实不须要提早创建链接,不过用HEAD /来探活意义也不大吧?

发送了请求并不表明成功——还得收到确认无误的响应才行。libbeat在outputs/mode/single/single.gopublish方法实现中实现了重传的机制。若是没有使用Guaranteed标志位,发送重试的次数取决于max_retries的设置(默认为3)。另外,若是部分数据发送成功,会将重试计数器重置为0。举个例子,若是POST /_bulk请求发送的数据有一部分得到了确认,下一次重试时将不会包含这些数据,并且重试的次数会从零算起。一旦发送成功——抑或重试屡次均告失败,通知函数会被调用,告知数据处理完毕。转了一圈,如今咱们又回到filebeat/publisher了。

阅读代码,我得出一个有趣的结论,为了不频繁输出对被收集者的影响,filebeat里面许多地方都用到了队列来实现批处理,结果致使其输出更像是脉冲式的:一段时间内可能会进行连续多个输出。若是你想避免该行为,能够调小队列长度和刷队列的时间。固然这么一来,输出会更加频繁(但更平滑),整体消耗的资源会更多。

相关文章
相关标签/搜索