收集日志的目的是有效的利用日志,有效利用日志的前提是日志通过格式化符合咱们的要求,这样才能真正的高效利用收集到elasticsearch平台的日志。默认的日志到达elasticsearch 是原始格式,乱的让人抓狂,这个时候你会发现Logstash filter的可爱之处,它很像一块橡皮泥,若是咱们手巧的话就会塑造出来让本身舒舒服服的做品,but 若是你没搞好的话那就另说了,本文的宗旨就是带你一块儿飞,搞定这块橡皮泥。当你搞定以后你会以为kibana 的世界瞬间清爽了不少!
FIlebeat 的4大金刚
Filebeat 有4个很是重要的概念须要咱们知道,
Prospector(矿工);
Harvest (收割者);
libeat (汇聚层);
registry(注册记录者);
Prospector 负责探索日志所在地,就如矿工同样要找矿,而Harvest如矿主同样的收割者矿工们的劳动成果,哎,世界无处不剥削啊!每一个Prospector 都有一个对应的Harvest,而后他们有一个共同的老大叫作Libbeat,这个家伙会汇总全部的东西,而后把全部的日志传送给指定的客户,这其中还有个很是重要的角色”registry“,这个家伙至关于一个会计,它会记录Harvest 都收割了些啥,收割到哪里了,这样一但有问题了以后,harvest就会跑到会计哪里问:“上次老子的活干到那块了”?Registry 会告诉Harvest 你Y的上次干到哪里了,去哪里接着干就好了。这样就避免了数据重复收集的问题!mysql
filebeat.prospectors: - input_type: log enabled: True paths: - /var/log/mysql-slow-* #这个地方是关键,咱们给上边日志加上了tags,方便在logstash里边经过这个tags 过滤并格式化本身想要的内容; tags: ["mysql_slow_logs"] #有的时候日志不是一行输出的,若是不用multiline的话,会致使一条日志被分割成多条收集过来,造成不完整日志,这样的日志对于咱们来讲是没有用的!经过正则匹配语句开头,这样multiline 会在匹配开头 以后,一直到下一个这样开通的语句合并成一条语句。 #pattern:多行日志开始的那一行匹配的pattern #negate:是否须要对pattern条件转置使用,不翻转设为true,反转设置为false #match:匹配pattern后,与前面(before)仍是后面(after)的内容合并为一条日志 #max_lines:合并的最多行数(包含匹配pattern的那一行 默认值是500行 #timeout:到了timeout以后,即便没有匹配一个新的pattern(发生一个新的事件),也把已经匹配的日志事件发送出去 multiline.pattern: '^\d{4}/\d{2}/\d{2}' (2018\05\01 个人匹配是已这样的日期开头的) multiline.negate: true multiline.match: after multiline.Max_lines:20 multiline.timeout: 10s - input_type: log paths: - /var/log/mysql-sql-* tags: ["mysql_sql_logs"] multiline.pattern: '^\d{4}/\d{2}/\d{2}' multiline.negate: true multiline.match: after multiline.timeout: 10s encoding: utf-8 document_type: mysql-proxy scan_frequency: 20s harverster_buffer_size: 16384 max_bytes: 10485760 tail_files: true #tail_files:若是设置为true,Filebeat从文件尾开始监控文件新增内容,把新增的每一行文件做为一个事件依次发送,而不是从文件开始处从新发送全部内容。默认是false;
input { kafka { topics_pattern => "mysql.*" bootstrap_servers => "x.x.x.x:9092" #auto_offset_rest => "latest" codec => json group_id => "logstash-g1" } } #终于到了关键的地方了,logstash的filter ,使用filter 过滤出来咱们想要的日志, filter { #if 还可使用or 或者and 做为条件语句,举个栗子: if “a” or “b” or “c” in [tags],这样就能够过滤多个tags 的标签了,咱们这个主要用在同型号的交换设备的日志正规化,好比说你有5台交换机,把日志指定到了同一个syslog-ng 上,收集日志的时候只能经过同一个filebeat,多个prospector加不一样的tags。这个时候过滤就能够经过判断相应的tags来完成了。 if "mysql_slow_logs" in [tags]{ grok { #grok 里边有定义好的现场的模板你能够用,可是更多的是自定义模板,规则是这样的,小括号里边包含全部一个key和value,例子:(?<key>value),好比如下的信息,第一个我定义的key是data,表示方法为:?<key> 前边一个问号,而后用<>把key包含在里边去。value就是纯正则了,这个我就不举例子了。这个有个在线的调试库,能够供你们参考,http://grokdebug.herokuapp.com/ match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+)\.\d+\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"} #过滤完成以后把以前的message 移出掉,节约磁盘空间。 remove_field => ["message"] } } else if "mysql_sql_logs" in [tags]{ grok { match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+\.\d+)\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"} remove_field => ["message"]} } }
2018/05/01 16:16:01.892 - OK - 759.2ms - 172.29.1.7:35184[485388]->172.7.1.39:3306[1525162561129639717]:<DB>:select count(*) from test[]; 过滤后的结果以下: { "date": [ [ "2018/05/01 16:16:01.892" ] ], "datetime": [ [ "16:16:01.892" ] ], "TIME": [ [ "16:16:01.892" ] ], "HOUR": [ [ "16" ] ], "MINUTE": [ [ "16" ] ], "SECOND": [ [ "01.892" ] ], "status": [ [ "OK" ] ], "respond_time": [ [ "759" ] ], "client": [ [ "172.29.1.7" ] ], "IPV6": [ [ null, null ] ], "IPV4": [ [ "172.29.1.7", "172.7.1.39" ] ], "client-port": [ [ "35184" ] ], "server": [ [ "172.7.1.39" ] ], "server-port": [ [ "3306" ] ], "databases": [ [ "<DB>" ] ], "SQL": [ [ "select count(*) from test[];" ] ] }
#有图有真相:sql