1、安装elastalert
环境
- CentOS:7.4
- Python:3.6.9
- pip:19.3
- elastalert:0.2.1
- elk:7.3.2
二、配置Python3.6.9环境
安装依赖包html
yum -y install wget openssl openssl-devel gcc gcc-c++
下载包node
wget https://www.python.org/ftp/python/3.6.9/Python-3.6.9.tgz
安装python
tar xf Python-3.6.9.tgz cd Python-3.6.9./configure --prefix=/usr/local/python --with-openssl make && make install
配置nginx
mv /usr/bin/python /usr/bin/python_old ln -s /usr/local/python/bin/python3 /usr/bin/python ln -s /usr/local/python/bin/pip3 /usr/bin/pip pip install --upgrade pip
注意,全部依赖python2的脚本或者命令,须要更改成python2.7,由于如今默认的python版本为3.6,例如
sed -i '1s/python/python2.7/g' /usr/bin/yum sed -i '1s/python/python2.7/g' /usr/libexec/urlgrabber-ext-down
验证c++
$ python -V Python 3.6.9 $ pip -V pip 19.3 from /usr/local/python/lib/python3.6/site-packages/pip (python 3.6)
三、安装elastalert
下载包git
git clone https://github.com/Yelp/elastalert.git cd elastalert
安装github
pip install "elasticsearch<7,>6" pip install -r requirements.txt python setup.py install
安装成功后能够看到四个命令web
ll /usr/local/python/bin/elastalert* /usr/local/python/bin/elastalert /usr/local/python/bin/elastalert-create-index /usr/local/python/bin/elastalert-rule-from-kibana /usr/local/python/bin/elastalert-test-rule
软链接到/usr/bin下,方便使用sql
ln -s /usr/local/python/bin/elastalert* /usr/bin
- elastalert 报警执行的命令,会根据报警规则执行相应操做。
- elastalert-create-index会建立一个索引,ElastAlert会把执行记录存放到这个索引中,默认状况下,索引名叫elastalert_status。其中有4个_type,都有本身的@timestamp字段,因此一样也能够用kibana来查看这个索引的日志记录状况。
- elastalert-rule-from-kibana从Kibana3已保存的仪表盘中读取Filtering设置,帮助生成config.yaml里的配置。不过注意,它只会读取filtering,不包括queries。
- elastalert-test-rule测试自定义配置中的rule设置。
2、使用
官方文档:https://elastalert.readthedocs.ioshell
规则文档:https://elastalert.readthedocs.io/en/latest/ruletypes.html
一、主配置文件
首先是主配置文件的模板为config.yaml.example,生成全局配置
vim config.yaml
# 用来加载rule的目录,默认是example_rules rules_folder: rules # 用来设置定时向elasticsearch发送请求,也就是告警执行的频率 run_every: seconds: 30 # 用来设置请求里时间字段的范围 buffer_time: seconds: 30 # elasticsearch的host地址,端口 es_host: node01 es_port: 9200 # elastalert产生的日志在elasticsearch中的建立的索引 writeback_index: elastalert_status writeback_alias: elastalert_alerts # 失败重试的时间限制 alert_time_limit: days: 2
二、建立告警索引
执行elastalert-create-index命令在ES建立索引,这不是必须的步骤,可是强烈建议建立。由于对于审计和测试颇有用,而且重启ES不影响计数和发送alert。
Elastic Version: 7.3.2 Reading Elastic 6 index mappings: Reading index mapping 'es_mappings/6/silence.json' Reading index mapping 'es_mappings/6/elastalert_status.json' Reading index mapping 'es_mappings/6/elastalert.json' Reading index mapping 'es_mappings/6/past_elastalert.json' Reading index mapping 'es_mappings/6/elastalert_error.json' New index elastalert_status created Done!
看到这个输出,就说明建立成功了,也能够请求一下看看:
curl 127.0.0.1:9200/_cat/indices?v
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open elastalert_status_status lh8LL4iCQeSn0afyzxBX7w 1 1 0 0 460b 230b green open elastalert_status i7B7IfCuSb2Sex8U5KoTZg 1 1 0 0 460b 230b green open elastalert_status_past et2aF44VR4WQnxB8T7zD4Q 1 1 0 0 460b 230b green open elastalert_status_silence lhXHEsuUQeGZaW3cRLp5pQ 1 1 0 0 460b 230b green open elastalert_status_error zykwk4KtSyyOY7ckxQTrkA 1 1 0 0 460b 230b
三、Rule配置
全部的告警规则,经过在rule目下建立配置文件进行定义,这里简单建立一个来做为演示。
首先我已经在elk集群中配置了一个NGINX日志采集的流水线,如今去kibana中利用检索规则,过滤出我想要的告警内容,好比我想让状态码是404的请求,触发告警通知,就用以下语句进行查询:
response: 404
其中group是kafka里边定义的组,后边是状态码,还能够写更多条件进行匹配。
而后来到服务器添加一条规则:
vim nginx_404.yaml
name: Nginx_err use_strftine_index: true index: nginx_info* type: any aggregation: seconds: 10 filter: - query: query_string: query: "response: 404" alert: - "email" email: - "test@qq.com" smtp_host: smtp.163.com smtp_port: 25 smtp_auth_file: /opt/elastalert/smtp_auth_file.yaml from_addr: test01@163.com email_reply_to: teast02@163.com
注意里边在配置邮件通知的时候,还须要引用外部的一个文件,这个文件里用于存放对应邮箱的用户名密码。
vim /opt/elastalert/smtp_auth_file.yaml
user: "test01@163.com" password: "xxxxxxx"
四、规则测试
刚刚已经添加了一条规则,如今能够用自身的命令测试一下刚刚添加的规则。
elastalert-test-rule --config config.yaml nginx_404.yaml
INFO:elastalert:Note: In debug mode, alerts will be logged to console but NOT actually sent. To send them but remain verbose, use --verbose instead. Didn't get any results. INFO:elastalert:Note: In debug mode, alerts will be logged to console but NOT actually sent. To send them but remain verbose, use --verbose instead. 1 rules loaded ......... elastalert_status - {'rule_name': 'Nginx_err', 'endtime': datetime.datetime(2020, 1, 11, 7, 30, 59, 793352, tzinfo=tzutc()), 'starttime': datetime.datetime(2020, 1, 10, 7, 30, 59, 793352, tzinfo=tzutc()), 'matches': 0, 'hits': 0, '@timestamp': datetime.datetime(2020, 1, 11, 7, 31, 0, 76042, tzinfo=tzutc()), 'time_taken': 0.24003815650939941}
若是没有报错,则说明可用。
五、启动
启动方式有两种
(1)指定规则文件路径
python -m elastalert.elastalert --verbose --config config.yaml --rule rules/nginx_404.yaml
(2)在全局路径config.yaml下,配置规则存放在加载规则rules目录下
python -m elastalert.elastalert --verbose
六、验证
服务启动以后,日志可以很清晰看到整个过程,此时能够在刚刚的索引原点请求几个不存在的接口,造一些404状态,过一下子应该能够看到日志中的说明,有告警发出,邮箱应该也能收到了。
3、优化
一、启动方式
上边的启动命令只是在前台启动,并不给力,可使用nohup启动,或者是经过supervisord管理,会更加方便。
supervisord如何安装就不说了.
建立配置文件:
$cat /etc/supervisord.d/elastalert1.ini [program:elastalert1] directory=/data/elastalert1/ command=python -m elastalert.elastalert --verbose --config /data/elastalert1/config.yaml process_name=elastalert1 autorestart=true startsecs=15 stopsignal=INT stopasgroup=true killasgroup=true redirect_stderr=true stdout_logfile=/data/log/elastalert1.log stdout_logfile_maxbytes=5MB
而后启动便可
supervisorctl update supervisorctl start elastalert1
二、报警方式
elastalert的报警方式有不少种,像邮件、微信、钉钉、post等等,咱们主要介绍如下几种经常使用的
(1)邮件报警
alert: - "email" email: - "test@qq.com" smtp_host: smtp.163.com smtp_port: 25 smtp_auth_file: /opt/elastalert/smtp_auth_file.yaml from_addr: test01@163.com email_reply_to: teast02@163.com
修改/opt/elastalert/smtp_auth_file.yaml信息
(2)微信机器人报警
git clone https://github.com/anjia0532/elastalert-wechat-plugin.git cp elastalert-wechat-plugin/elastalert_modules/* elastalert_modules/
添加报警方式
alert: - "elastalert_modules.wechat_qiye_alert.WeChatAlerter" #后台登录后【设置】->【权限管理】->【普通管理组】->【建立并设置通信录和应用权限】->【CorpID,Secret】 #设置微信企业号的appid corp_id: xxx #设置微信企业号的Secret secret: xxx #后台登录后【应用中心】->【选择应用】->【应用id】 #设置微信企业号应用id agent_id: xx #部门id party_id: xx #用户微信号 user_id: xx # 标签id,多个用 | 分隔
(3)钉钉报警方式
git clone https://github.com/xuyaoqiang/elastalert-dingtalk-plugin.git cp elastalert-dingtalk-plugin/elastalert_modules/dingtalk_alert.py elastalert_modules/
添加报警方式
alert: - "elastalert_modules.dingtalk_alert.DingTalkAlerter" dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=fb6500f4c85b8cfe66fa9586870f3ce16c848eab1e1cb23110388d6d443f1e" dingtalk_msgtype: text
三、报警频率
#限定时间内,发生事件次数 num_events: 3 #与上面参数结合使用,表示在2分钟内发生3次就报警 timeframe: minutes: 2
四、避免重复告警
避免必定时间段中重复告警,能够配置realert
和exponential_realert
这两个选项:
# 5分钟内相同的报警不会重复发送 realert: minutes: 5 # 指数级扩大 realert 时间,中间若是有报警, # 则按照5->10->20->40->60不断增大报警时间到制定的最大时间, # 若是以后报警减小,则会慢慢恢复原始realert时间 exponential_realert: hours: 1
五、聚合相同告警
# 根据报警的内容将相同的报警按照 name 来聚合 aggregation_key: name # 聚合报警的内容,只展现 name 与 message summary_table_fields: - name - message
六、告警内容格式化
能够自定义告警内容,内部是使用Python
的format
来实现的。
alert_subject: "Error {1} @{2}" alert_subject_args: - name - "@timestamp" alert_text_type: alert_text_only alert_text: | > Name: {1} > Message: {2} > Host: {3} ({4}) alert_text_args: - name - message - hostname - host
最后,整理了比较全的配置文件
name: test_err use_strftine_index: true index: filebeat-7.3.2-* type: any #将多个匹配项汇总到一个警报中。每次找到匹配项时,ElastAlert将等待该aggregation时间段,并将特定规则在该时间段内发生的全部匹配项一块儿发送。 aggregation: seconds: 10 #限定时间内,发生事件次数 num_events: 3 #与上面参数结合使用,在几分钟内 timeframe: minutes: 2 realert: # 5分钟内相同的报警不会重复发送 minutes: 5 # 指数级扩大 realert 时间,中间若是有报警, # 则按照5->10->20->40->60不断增大报警时间到制定的最大时间, # 若是以后报警减小,则会慢慢恢复原始realert时间 exponential_realert: hours: 1 filter: - query: query_string: query: "404" alert: - "email" #在邮件正文会显示你定义的alert_text alert_text: "You have a err message!" #用户认证文件,须要user和password两个属性 smtp_host: smtp.163.com smtp_port: 25 smtp_auth_file: /opt/elastalert/smtp_auth_file.yaml #从哪一个邮箱发送 from_addr: test@163.com #回复给那个邮箱 email_reply_to: test@163.com email: #接收报警邮件的邮箱 - "test04@163.com"
4、示例
一、监控日志Web攻击行为
1.1 修改nginx日志格式
log_format logstash_json '{"time": "$time_local", ' '"remote_addr": "$remote_addr", ' '"remote_user": "$remote_user", ' '"request": "$request", ' '"status": "$status", ' '"body_bytes_sent": "$body_bytes_sent", ' '"http_referer": "$http_referer", ' '"http_user_agent": "$http_user_agent", ' '"http_x_forwarded_for": "$http_x_forwarded_for", ' '"request_time": "$request_time", ' '"request_length": "$request_length", ' '"host": "$http_host"}';
1.2 编写监控规则
name: web attack realert: minutes: 5 index: logstash-* type: frequency num_events: 10 timeframe: minutes: 1 query_key: - name realert: minutes: 5 exponential_realert: hours: 1 filter: - query_string: # sql insert xss detect query: "request: select.+(from|limit) OR request: union(.*?)select OR request: into.+(dump|out)file OR request: (base64_decode|sleep|benchmark|and.+1=1|and.+1=2|or%20|exec|information_schema|where%20|union%20|%2ctable_name%20|cmdshell|table_schema) OR request: (iframe|script|body|img|layer|div|meta|style|base|object|input|onmouseover|onerror|onload) OR request: .+etc.+passwd OR http_user_agent:(HTTrack|harvest|audit|dirbuster|pangolin|nmap|sqln|-scan|hydra|Parser|libwww|BBBike|sqlmap|w3af|owasp|Nikto|fimap|havij|PycURL|zmeu|BabyKrokodil|netsparker|httperf|bench) OR status: (400|404|500|501) NOT (request:_health.html OR remote_addr:222.222.222.222 ) " smtp_host: smtp.qiye.163.com smtp_port: 25 smtp_auth_file: /opt/elastalert/smtp_auth_file.yaml email_reply_to: xxx@163.com from_addr: xxx@163.com alert: - "email" email: - "shystartree@163.com" alert_subject: "web attack may be by {0} at @{1}" alert_subject_args: - remote_addr - time alert_text_type: alert_text_only alert_text: | 你好,服务器({})可能正在受到web攻击,请采起手段阻止!!!! ### 截止发邮件前匹配到的请求数:{} > 发生时间: {} > timestamp:{} > attacker's ip: {} > request: {} > status:{} > UA头:{} >>> 参考来源:{} alert_text_args: - host - num_hits - time - "@timestamp" - remote_addr - request - status - http_user_agent - source
二、五分钟内流量总和超过200M就发邮件
run_every: minutes: 5 name: flow type: metric_aggregation index: nginx_info buffer_time: minutes: 5 metric_agg_key: body_bytes_sent metric_agg_type: sum max_threshold: 209715200 use_run_every_query_size: true alert_text_type: alert_text_only alert_subject: "Alter 最近五分钟流量超200M,请注意!!!" alert_text: | 最近五分钟总流量: {0} B kibana url: http://xxxxx alert_text_args: - metric_body_bytes_sent_sum smtp_host: smtp.qq.com smtp_port: 25 smtp_auth_file: /opt/elastalert/smtp_auth_file.yaml from_addr: "xxxx@qq.com" alert: - "email" email: - "xxxx@qq.com"
三、对后端请求超过3秒的发送邮件
es_host: 192.168.20.6 es_port: 9200 run_every: seconds: 30 name: xxx_reponse_time index: n-xxx-* type: whitelist compare_key: "request" ignore_null: true whitelist: - /index.html - /siteapp/ecsAuthentication/hasAuthentication type: frequency num_events: 1 timeframe: seconds: 30 filter: - query_string: query: "upstream_response_time: >3 " alert_text_type: alert_text_only alert_subject: "Alter {0} 接口后端处理超过3秒!!!" alert_subject_args: - _index html_table_title: "<h2>This is a heading</h2>" alert_text: | timestamp: {0} request_method: {1} request: {2} request_body: {3} request_time: {4} s upstream_response_time: {5} s body_bytes_sent: {6} B status: {7} remote_addr: {8} http_x_forwarded_for: {9} upstream_addr: {10} agent: {11} alert_text_args: - timestamp - request_method - request - request_body - request_time - upstream_response_time - body_bytes_sent - status - remote_addr - http_x_forwarded_for - upstream_addr - agent smtp_host: smtp.qq.com smtp_port: 25 smtp_auth_file: /opt/elastalert/rule_templates/smtp_auth_file.yaml from_addr: "xxx@qq.com" alert: - "email" email: - "xxxxx@qq.com"