Fluentd日志收集

高度扩展性的开源日志收集系统,同时提供td-agent至云端Treasure Data的大数据服务php

架构

六大插件类型

  • Input
  • Parser
  • Filter
  • Output
  • Formatter
  • Buffer

工做流:Events -> Inputs -> Filters -> Buffer -> Matches and Labelsnginx

Event对象Json结构

一个event对象经过source标签的input plugin转换为如下结构web

  • tag:事件来源,用于消息路由
  • time:事发时间(经过source标签中的time_key、time_format来解析)
  • record:json格式的日志内容(每一个format匹配的正则捕获键名做为一个json字段)

安装配置

准备工做

  • 网络时间同步ntp
  • 提升文件句柄上线ulimit -n
  • 高负载环境下优化内核参数
net.core.somaxconn = 1024
	net.core.netdev_max_backlog = 5000
	net.core.rmem_max = 16777216
	net.core.wmem_max = 16777216
	net.ipv4.tcp_wmem = 4096 12582912 16777216
	net.ipv4.tcp_rmem = 4096 12582912 16777216
	net.ipv4.tcp_max_syn_backlog = 8096
	net.ipv4.tcp_slow_start_after_idle = 0
	net.ipv4.tcp_tw_reuse = 1
	net.ipv4.ip_local_port_range = 10240 65535

主配置示例

## 配置路径`/etc/fluentd/fluentd.conf`
## 新配置须要重启:fluentd -c alert-email.conf

<source>
  @type tail
  format nginx
  path /var/log/nginx/access.log #须要确认Fluentd daemon有权读取
  pos_file /var/log/fluentd/tmp/access.log.pos  #上次读取位置的备忘文件
  tag nginx.access
  @label @mainstream
</source>

<source>
  @type http
  format json
  port 8888
  bind 0.0.0.0
  body_size_limit 32m
  keepalive_timeout 10s
  #http模式下,tag取决于http请求的endpoint路径
</source>

#nginx单行错误日志的解析
<source>
    @type tail
    format /^(?<time>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<log_level>\w+)\] (?<pid>\d+).(?<tid>\d+): (?<message>.*)$/
    tag nginx.error
    path /var/log/nginx/error.log
</source>
#nginx多行错误日志的解析
<source>
    @type tail
    tag nginx.error
    path /var/log/nginx/error.log

    format multiline
    format_firstline /^\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} \[\w+\] (?<pid>\d+).(?<tid>\d+): /
    format1 /^(?<time>\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) \[(?<log_level>\w+)\] (?<pid>\d+).(?<tid>\d+): (?<message>.*)/
    multiline_flush_interval 3s
</source>


<filter info.**>
  @type blackhole_plugin  #简单丢弃
</filter >

<filter **.**>
  @type grep
  input_key code
  exclude ^200$  #双行exclude排除语法
  tag filtered.log  #重置tag
</filter>

<filter filtered.log>
    @type record_modifier  #修改器插件(比record_transformer轻量级且高效)
    server_host "#{Socket.gethostname}"  #增长的server_host字段(执行rule指令得到:"#{指令}")
</filter>

<filter filtered.log>
  @type record_transformer
  <record>
    hostname "#{Socket.gethostname}"  #增长的hostname字段(执行rule指令得到:"#{指令}")
  </record>
</filter>

<match filtered.log>
  @type grep
  regexp1 code ^4\d\d$  #单行regex匹配语法
  exclude1 action logout  #单行exclude排除语法
  add_tag_prefix my
</match>

<match my.filtered.log>
  @type stdout
</match>


# label可用于分组不一样处理流程到指定source
<label @mainstream>
  <match docker.**>
    @type file
    @id   output_docker1
    path         /fluentd/log/docker.*.log
    symlink_path /fluentd/log/docker.log
    append       true
    time_slice_format %Y%m%d
    time_slice_wait   1m
    time_format       %Y%m%dT%H%M%S%z
  </match>
  <match **>
    @type file
    @id   output1
    path         /fluentd/log/data.*.log
    symlink_path /fluentd/log/data.log
    append       true
    time_slice_format %Y%m%d
    time_slice_wait   10m
    time_format       %Y%m%dT%H%M%S%z
  </match>
</label>

PHP平台应用日志接入

  • Fluentd安装fluent-plugin-elasticsearch插件gem install fluent-plugin-elasticsearch --no-document
  • Fluentd配置增长Forward输入(Fluentd最高效的接收方式)
<source>
  @type forward
  port 24224
  bind 0.0.0.0
  #forward模式下,tag取决于http请求的endpoint路径
</source>

<match **.**>
  @type elasticsearch
  logstash_format true
  host <hostname> #(optional; default="localhost")
  port <port> #(optional; default=9200)
  index_name <index name> #(optional; default=fluentd)
  type_name <type name> #(optional; default=fluentd)
  buffer_type file
  buffer_path /var/log/fluentd/buffer
  flush_interval 10s # for testing
</match>
  • Laravel应用安装客户端Library库(日志投递扩展性最高) composer require fluent/logger=v1.0.0 --profile --prefer-dist --optimize-autoloader
  • Laravel应用投递日志
use Fluent\Logger\FluentLogger;
$logger = new FluentLogger("localhost","24224");
$logger->post("debug.test",array("hello"=>"world"));

PHP平台cli报错日志接入

<source>
      @type tail
      tag php.error
      path /var/log/php-fpm/www-error.log
      pos_file /tmp/fluentd/pos/php-fpm/php_errors.log.pos
      format multiline
      multiline_flush_interval 2s
      format_firstline /^\[[^\]]*?\]/
      format1 /^\[(?<time>[^\]]*?)\] PHP (?<level>.*):  (?<message>.*)$/
      format2 /^Stack trace:$/
      format3 /^(?<trace>.*)/
      time_format %d-%b-%Y %H:%M:%S %Z
    </source>

    <match php.error>
      @type elasticsearch
      host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
      port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
      scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
      user "elastic"
      password "$ELASTICSEARCH_PASSWORD"
      logstash_format true
      logstash_prefix php.error
      logstash_dateformat %Y.%m.%d
      type_name log
      utc_index false
      reconnect_on_error true
      num_threads 2

      <buffer>
        flush_interval 30s
      </buffer>
    </match>

Nginx访问日志接入

<source>
      #use default nginx main log_format
      @type tail
      tag nginx.access
      path /var/log/nginx/access.log
      pos_file /tmp/fluentd/pos/nginx/access.log.pos
      format /^(?<remote_ip>[^ ]*) - (?<remote_user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<uri>[^\"]*) +\S*)?" (?<status_code>[^ ]*) (?<response_size>[^ ]*) "(?<referer>[^\"]*)" "(?<user_agent>[^\"]*)" "(?<forwarded_for>[^\"]*)"$/
      time_format %d/%b/%Y:%H:%M:%S %z
      types status_code:integer,response_size:integer
    </source>

    #### filters ####
    <filter nginx.access>
      @type record_transformer
      enable_ruby
      <record>
        path ${URI(URI.encode(uri.strip)).path}
      </record>
    </filter>

    <match nginx.access>
      @type elasticsearch
      host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
      port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
      scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
      user "elastic"
      password "$ELASTICSEARCH_PASSWORD"
      logstash_format true
      logstash_prefix nginx.access
      logstash_dateformat %Y.%m.%d
      type_name log
      utc_index false
      reconnect_on_error true
      num_threads 2

      <buffer>
        flush_interval 30s
      </buffer>
    </match>

邮件告警配置

  • 安装插件
gem install fluent-plugin-grepcounter --no-document
gem install fluent-plugin-mail --no-document
  • Fluentd配置
#测试日志中500错误并触发新的count event
<match apache.access>
  @type grepcounter
  count_interval 3  #计算周期
  input_key code    #正则测试的字段
  regexp ^5\d\d$
  threshold 1       #触发阈值
  add_tag_prefix error_5xx  #新event增长tag前缀
</match>

#处理新发的count event
<match error_5xx.apache.access>
  @type copy
  <store>
    @type stdout  #打印到标准输出(调试)
  </store>
  <store>
    @type mail
    host smtp.gmail.com
    port 587
    user USERNAME
    password PASSWORD
    enable_starttls_auto true
    from example@gmail.com
    to alert@example.com
    subject 'HTTP SERVER ERROR'
    message Total 5xx error count: %s\n\nPlease check your Nginx webserver
    message_out_keys filed1     #message的参数字段
  </store>
</match>

调试

  • 手动发送日志logger -t test foobar
  • 增长filter_stdout以记录日志流到/etc/td-agent/td-agent.log
<filter **.**>
  @type stdout
</filter>