Logstash初探

前言
  • logstash是一个开源带实时管道处理能力的数据收集处理引擎,不仅仅可以收集转换结构化数据,对非结构化数据也有强大的分析处理能力。
    在这里插入图片描述
pipeline三板斧
  • logstash的管道包含三个部分,输入,过滤,输出,复杂的pipeline可能会有多个input输入,filter和多个ouput输出,可以使用命令-e参数行定义变量配置,使用配置文件定义复杂配置,通过参数f指定使用某个配置文件,或者某个目录下所有配置,若出现多个-f,则默认只会使用最后一个配置,input,生成logstash中所用的数据结构Event,filter,对Event进行过滤及修改, output,输出event,其中Event是logstash内部数据的表现形式。

  • 在pipeline管道中每一个input输入拥有自己的线程,默认情况下,input会将转换后的event输出到内存中队列(也可以配置输出到磁盘),每一个pipeline的worker线程,批量的从队列中取出事件处理。pipeline的worker线程和批量取多少事件,是可以配置的。默认情况下,logstash使用的是内存队列,如果进程非正常结束,有可能会丢失数据。

  • 实际上input是可以有多个,每个input有一个编码器codec,queue负责把流入的数据分发到不同的Pipeline中,每个Pipeline有filter和output和batcher,如图中有3个Pipeline,batcher的作用是从queue中批量的取数据。batcher当event的数目或者时间达到了一定的条件就会到filter中,然后再到output,到了output之后会有发送一个ACK给Queue来告诉Queue这些Logstash Event已经处理完了。注意这里的Worker threads是同步进行的,也就是说线程越多,那么处理数据的能力就越强。

配置文件
  • pipeline configuration files,定义pipeline管道处理。
  • settings files,定义控制logstash启动和执行的配置。
  • pipeline.workers,pipeline中的worker线程数配置,默认是cpu的核数。
  • pipeline.batch.size,配置每个worker线程每次收集event的数量,配置越大越有效率,但同理,内存开销也会增大,可以在jvm.options中配置增加堆空间。
  • queue.type,用作内部event数据的缓冲,默认是内存队列,可以配置persisted使之持久化到磁盘,并且ack确认后即删除。
缓冲队列
  • 默认情况下logstash使用的是内存队列,如果主机发生宕机或者进程意外终止,则在内存中的数据会丢失。持久化队列的特性是会将消息持久化到本地磁盘上,一旦队列快满了,则会减缓/停止input的输入。其中工作模式为:input → queue → filter + output
  • 持久化队列的好处是它提供了类似于redis或kafka特性的数据缓存,至少一次运输保证消息不会丢失,重启logstash后也会尝试将未传送的持久化数据进行发送,直到成功。它的缺点是,在input阶段若没有请求响应协议或是磁盘故障则无法保证数据完整性。
#默认memory,设置为持久化队列
queue.type:persisted
#数据文件存储位置,默认:path.data/queue
path.queue
#持久化队列的最大容量,默认1g,需要注意主机磁盘容量是否充足,如果仅仅是为了保证数据不丢失,则可以将这个值设置小一些
queue.max_bytes

#logstash提交队列event数据到磁盘的操作称为checkpoint,可以设置如下属性为1,表示按一个event为单位落盘,在需要考虑磁盘开销的情况下,并不建议设置为1。
queue.checkpoint.writes
  • 引用字段,除了input模块,在filter和output模块中可以引用字段,可以层级引用,如:[response][status]
自监控api
curl -XGET 'localhost:9600/?pretty'
curl -XGET 'localhost:9600/_node/pipelines?pretty'
curl -XGET 'localhost:9600/_node/jvm?pretty'