在以前的文章【大数据实践】游戏事件处理系统(1)——事件收集-filebeat中,对本系统的背景、目标及技术方案进行了概述,并利用filebeat收集到日志,发送到logstash。所以,本文章将对logstash如何接收、处理、输出事件进行介绍。html
logstash根据配置文件****.conf
(如game-score.conf)对每一个事件执行输入、处理、输出过程,logstash为每个过程提供了丰富的插件。配置文件的大致结构以下:web
input {} filter {} output {}
假设日志文件存在目录地址为:/Users/admin/Documents/workspace/elk/logstash-6.2.3/gameScore.log
。json
其内容为:segmentfault
2015-11-02 14:26:53,355 DEBUG [IScoreService] service.score.IScoreService.recordScore Arguments:[SSZ game result. gameId : 2015-11-02_14:26:37_新手入门_1_002_512 tax : 0, [Lservice.score.GameResultBean;@1a4347b9] Returns: [snailiu,999979438,15]cost : 26
logstash的配置文件中,输入模块的配置内容以下:api
input { file { path => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/gameScore.log" type => "game_score" start_position => "beginning" codec => plain { charset => "GBK" } } ## 来自于filebeat的事件做为输入 beats { ## 端口与filebeat中filebeat.yml文件output中配置的端口一致。 port => 5044 } }
日志文件做为输入,须要使用的file
插件(input插件列表)。input中能够有多个file
,用于同时收集多个日志文件(参考官方文档)。其中:数组
path
表示文件路径。type
表示该输入事件的类型,可自由定义。start_position
表示开始位置,设为beginning表示需从文件头导入旧的事件。codec
表示在事件进入input前要使用的编码处理。若日志文件使用GBK编码,那么就须要在codec
指定,以便后续处理过程当中,再也不须要处理编码问题。接收filebeat
发送过来的数据时,须要使用到beats插件。ruby
每个事件都会通过filter中的处理过程(从上到下处理)。可利用多个filter 插件来完成处理过程。filter示例以下:ide
filter { ## 将非结构化数据转为结构化数据 grok { ## 自定义匹配模式文件地址。 patterns_dir => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/game_score_patterns" ## 根据自定义的匹配模式,匹配数据。数组表示匹配多个模式。 match => [ "message","%{DC_SCORE_NORMAL}", "message","%{DC_SCORE_MODE}" ] ## 删掉一些不要的字段。 remove_field => [host,path,message,svr_type,gm_gr,user1_balance,user2_balance,user3_balance,user4_balance,cost,svr_method,type,money] ## 添加字段:将非结构化数据`game_id`中的内容提取出来,转为结构化数据,并kv对加入到map结构中。 add_field => {"game_id" => "%{game_date}_%{game_time}_%{bet_name}_%{bet_count}_%{room_id}_%{desk_id}"} } ## 若是解析出错,则抛弃该事件。 if "_grokparsefailure" in [tags] { drop { } } mutate { ## 对map中数据进行类型转换。 convert => [ "tax" , "integer" , "user1_delta" , "integer" , "user2_delta" , "integer" , "user3_delta" , "integer" , "user4_delta" , "integer" ] ## 字符串替换 gsub => [ "user1_name", "[\\]", "", "user2_name", "[\\]", "", "user3_name", "[\\]", "", "user4_name", "[\\]", "" ] } ## ruby插件,支持ruby脚本 ruby { ## ruby脚本文件路径 path => "/Users/admin/Documents/workspace/elk/logstash-6.2.3/drop_percentage.rb" ## ruby脚本程序的输入参数 script_params => { "PDK" => ",20,30,1,"} } ## 若出错,则丢弃该事件 if "_jsonparsefailure" in [tags] { drop { } } ## 日期处理插件 date { match => [ "date_stamp", "YY-MM-dd HH:mm:ss,SSS" ] timezone => "Asia/Shanghai" remove_field => ["@version","@timestamp","server_tag","date_stamp","tmp_users","user1_name","user2_name","user3_name","user4_name","user1_delta","user2_delta","user3_delta","user4_delta"] } }
其中,用到的filter插件
有:post
grok:将非结构化日志数据解析为结构化数据的插件,很是经常使用。支持自定义一些解析模式(patterns),如game_score_patterns
文件:大数据
DC_SCORE_USER_1 %{DATA:user1_name},%{INT:user1_balance},%{INT:user1_delta} DC_SCORE_USER_2 %{DC_SCORE_USER_1}, %{DATA:user2_name},%{INT:user2_balance},%{INT:user2_delta} DC_SCORE_USER_3 %{DC_SCORE_USER_2}, %{DATA:user3_name},%{INT:user3_balance},%{INT:user3_delta} DC_SCORE_USER_4 %{DC_SCORE_USER_3}, %{DATA:user4_name},%{INT:user4_balance},%{INT:user4_delta} DC_SCORE_USERS %{DC_SCORE_USER_4}|%{DC_SCORE_USER_3}|%{DC_SCORE_USER_2}|%{DC_SCORE_USER_1} DC_SCORE_NORMAL %{DATA:date_stamp} DEBUG \[%{DATA:svr_type}\] com.basecity.hjd.service.score.IScoreService.%{DATA:svr_method} Arguments:\[%{DATA:server_tag} game result. gameId(( : pdk %{DATA:score_type})|( : sreqw %{DATA:score_type})|) : %{DATA:game_date}_%{DATA:game_time}_%{DATA:bet_name}_%{INT:bet_count}_%{DATA:room_id}_%{DATA:desk_id} tax : %{INT:tax}, %{DATA:gm_gr}\] Returns: \[%{DC_SCORE_USERS}\]cost : %{INT:cost} DC_SCORE_MODE %{DATA:date_stamp} DEBUG \[%{DATA:svr_type}\] com.basecity.hjd.service.score.IScoreService.%{DATA:svr_method} Arguments:\[%{DATA:score_type} game result. gameId : %{DATA:game_date}_%{DATA:game_time}_%{DATA:bet_name}_%{INT:bet_count}_%{DATA:room_id}_%{DATA:desk_id} user: %{DATA:username} get prize:%{INT:money}, \[%{DATA:gm_gr}\] Returns: \[%{DC_SCORE_USERS}\]cost : %{INT:cost}
经过match => ...
从非结构化的日志中,提取出想要的数据,并为其指定关键字命名,转为map结构。
mutate
处理以后的结果事件。ruby中对事件
的处理过程可参照event api。时间通过filter以后,转化为结构化的JSON格式。能够经过各类输出插件将其输出到指定位置(或服务)。如:
## 输出到标准输出。 stdout { codec => rubydebug } ## 输出到http服务 http { http_method => "post" url => "http://127.0.0.1:9090" } ## 输出到kafka kafka { codec => json topic_id => "mytopic" }
其中,示例的三个插件分别为:
至此,利用logstash,完成了日志事件的收集和处理的过程,因本系统是搭配kafka
消息中间件工做的,所以,output中使用kafka插件,后续需根据具体状况,完善output中kafka
模块的编写。