Elastalert

elastalert 是一款基于elasticsearch的开源告警产品(官方说明文档)。相信许多人都会使用ELK作日志收集系统,可是产生一个基于日志的“优秀”的安全告警确是一个难题。告警规则难编写,告警规则难管理等。本文是做者探索的安全告警的一些思路,但愿能帮助到有须要的人。html

本人对ELK告警处理思路:java

elastalert 经过post的告警模式,post一个告警数据包到服务端,经过服务端匹配须要告警的对象,告警的方式,最终将安全告警发出。node

告警对象(企业人员) 怎么来? 来源调用钉钉API、CMDB、LDAP。python

告警方式 怎么选择?根据告警级别、告警来源(wazuh、驭龙HIDS、elastalert规则)采用不一样的告警方式。nginx

 

 

环境说明

Elastic Stack v6.2.2 (适用于6.0+)
Elastalert v0.1.29git

elastalert 源码部署

下载 elastalert 源码github

git clone https://github.com/Yelp/elastalert.git 

安装依赖sql

pip install -r requirements.txt pip install "elasticsearch>=6.0.0" 

建立elastalert索引(Index)&映射(Mapping)数据库

python elastalert/create_index.py --host localhost --port 9200 --index elastalert 

建立elastalert的配置文件 config.yaml :json

# 告警规则存放的文件夹 rules_folder: myrules # 每2分钟查询一次elasticsearch run_every:  minutes: 2 # 查询时间范围5分钟 buffer_time:  minutes: 5 # 链接elasticsearch配置 es_host: localhost es_port: 9200 # elasticsearch认证,若是未使用可注释 es_username: kibana es_password: kibana # elastalert状态索引 writeback_index: elastalert 

开启elastalert

python elastalert/elastalert.py --config config.yaml 

elastalert规则类型

官方规则类型描述并非太清晰,如下给出alert方式为post的json数据,便于后续你们速查速写。

如下的规则类型均使用如下文档样本做触发告警:

doc = {
        "@timestamp": get_now(), "codec": "nodejs", "tags": "31", "level": "high", "server": "nginx", "status": "anystatus", "message": ">>> [ xxx ]: valid id error ." } 

elastalert索引中,hits表示规则命中条数;matches表示规则命中条数,而且匹配规则触发告警数量。

any类型

说明:任何规则都会匹配, 查询返回的每一个命中将生成一个警报。

规则:当匹配status字段为anystatus,触发告警。

# rule名称 name: any_rule # 规则类型 type: any # 监控索引 index: testalert # 监控时间1分钟内 timeframe:  minutes: 1 # Elastic DSL语法 filter: - term:  status: "anystatus" # 告警方式 alert: post # 服务端接口 http_post_url: "http://localhost:8088/alertapi" http_post_static_payload: # 添加到post包中的数据,规则名称  rule_name: any_rule # 添加到post包中的数据,告警级别  rule_level: medium 

post结果:

{
    "status": "anystatus", "_type": "mydata", "level": "high", "num_hits": 5, "@timestamp": "2018-01-31T02:26:52.268477Z", "rule_level": "medium", "server": "nginx", "rule_name": "any_rule", "_index": "testalert", "num_matches": 5, "message": ">>> [ xxx ]: valid id error .", "_id": "AWFKCd4a5xzN_sFQhZgO", "codec": "nodejs", "tags": "31" } 

blacklist类型

说明:黑名单规则将检查黑名单中的某个字段,若是它在黑名单中则匹配。

规则:当字段status匹配到关键字hacker、huahua,触发告警

name: blacklist_rule type: blacklist index: testalert timeframe:  minutes: 1  compare_key: status  blacklist: - "hacker" - "huahua"  alert: post http_post_url: "http://localhost:8088/alertapi" http_post_static_payload:  rule_name: blacklist_rule  rule_level: medium 

若关键字在文件中,可用 - "!file /path/to/file",目测关键字不支持正则(未测过)。

post结果:

{
    "status": "huahua", "_type": "mydata", "level": "high", "num_hits": 2, "@timestamp": "2018-01-31T02:37:46.071850Z", "rule_level": "medium", "server": "nginx", "rule_name": "blacklist_rule", "_index": "testalert", "num_matches": 1, "message": ">>> [ xxx ]: valid id error .", "_id": "AWFKE9gM5xzN_sFQhZg2", "codec": "nodejs", "tags": "31" } 

whitelist类型

说明:与黑名单相似,此规则将某个字段与白名单进行比较,若是列表中不包含该字词,则匹配。

change类型

说明:此规则将监视某个字段,并在该字段更改时进行匹配,该领域必须相对于最后一个事件发生相同的变化。

规则:当server字段值相同,codec字段值不一样时,触发告警。

name: change_rule type: change index: testalert timeframe:  minutes: 1  compare_key: codec  ignore_null: true  query_key: server  alert: post http_post_url: "http://localhost:8088/alertapi" http_post_static_payload:  rule_name: change_rule  rule_level: medium 

字段解析:

compare_key:与上一条记录作对比的字段

query_key:与上一条记录相同的字段

ignore_null:忽略记录不存在compare_key字段的状况

post结果:

{
    "status": "up", "_type": "mydata", "_id": "AWFKIgZA5xzN_sFQhZh5", "tags": "31", "num_hits": 4, "@timestamp": "2018-01-31T02:53:15.413240Z", "rule_level": "medium", "old_value": [ "nodejs" ], "server": "nginx", "rule_name": "change_rule", "_index": "testalert", "new_value": [ "java" ], "num_matches": 1, "message": ">>> [ xxx ]: valid id error .", "level": "high", "codec": "java" } 

frequency类型

说明:当给定时间范围内至少有必定数量的事件时,此规则匹配。 这能够按照每一个query_key来计数。

规则:当字段status匹配到关键字frequency超过3次(包括3次),触发告警

name: frequency_rule type: frequency index: testalert num_events: 3 timeframe:  minutes: 1  filter: - term:  status: "frequency"  alert: post http_post_url: "http://localhost:8088/alertapi" http_post_static_payload:  rule_name: frequency_rule  rule_level: medium 

post结果:

{
    "status": "frequency", "_type": "mydata", "level": "high", "num_hits": 3, "@timestamp": "2018-01-31T03:28:00.793290Z", "rule_level": "medium", "server": "nginx", "rule_name": "frequency_rule", "_index": "testalert", "num_matches": 1, "message": ">>> [ xxx ]: valid id error .", "_id": "AWFKQdg_5xzN_sFQhZjW", "codec": "java", "tags": "31" } 

spike类型

说明:当某个时间段内的事件量比上一个时间段的spike_height时间大或小时,这个规则是匹配的。它使用两个滑动窗口来比较事件的当前和参考频率。 咱们将这两个窗口称为“参考”和“当前”。

规则:当前窗口数据量为3,当前窗口超过参考窗口数据量次数1次,触发告警。

name: spike_rule type: spike index: testalert timeframe:  minutes: 1  threshold_cur: 3  spike_height: 1  spike_type: "up"  filter: - term:  status: "spike"  alert: post http_post_url: "http://localhost:8088/alertapi" http_post_static_payload:  rule_name: spike_rule  rule_level: medium 

字段解析:

threshold_cur:当前窗口初始值

spike_height:当前窗口数据量连续比参考窗口数据量高(/低)的次数

spike_type:高或低

post结果:

{
    "status": "spike", "_type": "mydata", "_id": "AWFLMbye5xzN_sFQhZlk", "tags": "31", "num_hits": 13, "@timestamp": "2018-01-31T07:50:02.382708Z", "rule_level": "medium", "server": "nginx", "rule_name": "spike_rule", "_index": "testalert", "spike_count": 8, "reference_count": 0, "num_matches": 1, "message": ">>> [ xxx ]: valid id error .", "level": "high", "codec": "java" } 

flatline类型

说明:当一个时间段内的事件总数低于一个给定的阈值时,匹配规则。

规则:当信息量低于3条时,触发告警。

name: flatline_rule type: flatline index: testalert timeframe:  minutes: 1  threshold: 3  alert: post http_post_url: "http://localhost:8088/alertapi" http_post_static_payload:  rule_name: flatline_rule  rule_level: medium 

post结果:

{
    "count": 1, "num_hits": 1, "@timestamp": "2018-01-31T09:02:35.720517Z", "rule_level": "medium", "rule_name": "flatline_rule", "key": "all", "num_matches": 1 } 

cardinality类型

说明:当一个时间范围内的特定字段的惟一值的总数高于或低于阈值时,该规则匹配

规则:1分钟内,level的惟一数量超过2个(不包括2个),触发告警。

name: test_rule index: testalert type: cardinality timeframe:  minutes: 1  cardinality_field: level  max_cardinality: 2  alert: post http_post_url: "http://localhost:8088/api/alert" http_post_static_payload:  rule_name: test_rule  rule_level: medium 

post结果:

{
    "status": "cardinality", "_type": "mydata", "level": "info", "num_hits": 3, "@timestamp": "2018-01-31T09:17:02.276937Z", "rule_level": "medium", "server": "nginx", "rule_name": "cardinality_rule", "_index": "testalert", "num_matches": 1, "message": ">>> [ xxx ]: valid id error .", "_id": "AWFLgWKw5xzN_sFQhZvg", "codec": "java", "tags": "31" } 

percentage match类型

说明:当计算窗口内的匹配桶中的文档的百分比高于或低于阈值时,此规则匹配。计算窗口默认为buffer_time。

规则:当level字段未high,时间窗口内日志量高于前一个时间窗口95%,触发告警。(未完整测试)

name: percentage_match_rule type: percentage_match index: testalert # description: "test description"  buffer_time:  minutes: 1  max_percentage: 95  match_bucket_filter: - term:  level: high  doc_type: mydata  alert: post http_post_url: "http://localhost:8088/alertapi" http_post_static_payload:  rule_name: percentage_match_rule  rule_level: medium 

post结果:

{
    "num_hits": 10, "@timestamp": "2018-01-31T09:39:05.199394Z", "rule_level": "medium", "rule_name": "percentage_match_rule", "num_matches": 1, "percentage": 100.0 } 

告警方式

elastalert内置的告警方式并不太使用与国人的习惯,因此这块建议自行写服务端从新定义。

为何不在elastalert源码alerts.py中直接加类,而经过post出来本身作服务端接收告警? 主要考虑到elastalert项目更新。

目前比较经常使用的告警模式有:钉钉、微信、邮件、短信。

首先设计好的告警内容,因而咱们能够建立好4种告警类型,并逐步实现功能。

基于elastalert的安全告警剖析

钉钉告警

目前钉钉有两种告警方法,一种是得到管理员token,能够调用企业通知产生告警,这种方式的好处是能够通知到企业中对应的人,对应部门中全部人等。

这里分享一下实现的大体思路:

def send(self, post_alert_content): # 告警内容 msgcontent = { "title": post_alert_content["name"], "text": "## 规则:{0} \n ## 级别:{1} \n ## 时间:{2} \n ## 内容:{3}".format( post_alert_content["name"],post_alert_content["level"],post_alert_content["create_at"],post_alert_content["content"] ) } # 获取须要通知的用户列表 userid_list = users.getDingDingUserIdByName(post_alert_content["contact_users"]) msgtype = "markdown" agent_id = DD_AgentId dept_id_list = None try: msgcontent = json.dumps(msgcontent) except JSONDecodeError: pass args = locals().copy() payload = {} for k, v in args.items(): if k in ('msgtype', 'agent_id', 'msgcontent', 'userid_list', 'dept_id_list'): if v is not None: payload.update({k: v}) # 发送钉钉告警信息 resp = self.callDingDingWebApi(self.access_token, 'dingtalk.corp.message.corpconversation.asyncsend', **payload) if "error_response" in resp.json().keys(): self.getAccessToken() self.send(post_alert_content) 

效果:告警出如今企业通知中。

基于elastalert的安全告警剖析

另外一种则是经过钉钉建立群,添加钉钉机器人告警。

def sendByRobot(self, post_alert_content): DD_level = post_alert_content.get("level", "") DD_name = post_alert_content.get("name", "") DD_content = post_alert_content.get("content", "") DD_url = post_alert_content.get("url", "") headers = {"Content-Type": "application/json"} message = { "msgtype": "markdown", "markdown": { "title": "【" + DD_level + "】" + DD_name, "text": "### 时间:" + datetime.now().strftime("%Y-%m-%d %X") + "\n" \ "### 规则:" + "【" + DD_level + "】" + DD_name + "\n" \ "### 内容:" + DD_content + "\n" } } r = requests.post(url=DD_url, headers=headers, data=json.dumps(message)) return True 

短信告警

短息告警的具体实现与企业采用的短信通道有关,可是方式基本类似。

def send(self, post_alert_content): """ param: phone @string raw_content @string return: @bool """ self.params['phone'] = post_alert_content["users_phone"] self.params['report'] = True content = self.getContent(post_alert_content) self.params['msg'] = urllib.quote(content) response = requests.post(SMS_SEND_MSG_URL, json=self.params) rv = response.json() 

微信告警

微信告警,实现的大体思路:

def send(self, users, subject, content): """ params: users @string subject @string content @string return: @bool """ # 微信API post_url = WECHAT_MSG_URL + self.token for user in users.split(","): message = { # 企业号中的用户账号 "touser": user, # 消息类型 "msgtype": "text", # 企业号中的应用id "agentid": WECHAT_AGENTID, "text": { "content": subject + '\n' + content }, "safe": "0" } # 触发告警 r = requests.post(url=post_url, data=json.dumps(message), verify=False) print r.text return True 

邮件告警

邮箱告警要注意使用SSL,否则邮箱帐密被撸了就呵呵了。

def send(self, post_alert_content): to_addrs = "{}".format(post_alert_content["to_addrs"]) subject = "【规则】 {}".format(post_alert_content["name"]) message = "【时间】{} \n 【内容】{}".format(post_alert_content["create_at"], post_alert_content["content"]) # to_addr = to_addrs.split(",") for to_addr in to_addrs.split(","): msg = self.format_msg(self.from_addr, to_addr, subject, message) s = smtplib.SMTP_SSL(Mail_Host, Mail_Port) s.login(Mail_User, Mail_Pass) s.sendmail(self.from_addr, [to_addr], msg.as_string()) s.quit() return True 

规则管理

为了方便远程管理规则,咱们须要数据库存储规则信息,而后经过服务端接口查看当前规则信息,数量;操做YAML规则文件实现规则管理。

若是咱们须要添加规则,那么在规则目录下,建立对应的yaml规则文件便可。

def insertElastRule(params): # 查看数据库中是否存在同名规则 _es_rule = ElastRule.query.filter_by(rule_esalert_name=rule_esalert_name).first() if _es_rule: return False else: now = datetime.now() insertRule = ElastRule( rule_name=params["rule_name"], rule_type=params["rule_type"], rule_index=params["rule_index"], rule_num_events=params["rule_num_events"], rule_timeframe=params["rule_timeframe"], rule_filter=params["rule_filter"], rule_level=params["rule_level"], rule_content=params["rule_content"], create_at=now, end_at=now ) db.session.add(insertRule) db.session.commit() # 建立yaml规则文件 createRuleYAML(params["rule_name"]) return True 

建立YAML函数:

def createRuleYAML(rule_esalert_name): _rule = ElastRule.query.filter_by(rule_esalert_name=rule_esalert_name).first() ruleJson = { "name": _rule.rule_esalert_name, "type": _rule.rule_type, "index": _rule.rule_index, "num_events": int(_rule.rule_num_events), "timeframe": {'minutes': int(_rule.rule_timeframe)}, "filter": _rule.rule_filter, "alert": "post", "http_post_url": "http://localhost:8088/api/alert", "http_post_static_payload":{"rule_name": _rule.rule_esalert_name, "rule_level": _rule.rule_level} } with open('/easywatch/elastalert_rules/{}.yaml'.format(rule_esalert_name),'w') as fw: yaml.safe_dump(ruleJson, stream=fw, allow_unicode=True, default_flow_style=False) 

告警思考

渠道的使用,经过级别组合使用告警方式:

高级别告警使用3个或以上的方式告警 – 短信、钉钉(微信)、邮件

中级别告警使用2个或以上的方式告警 – 钉钉(微信)、邮件

低级别告警使用1个或以上的方式告警 – 邮件

ELK展现告警效果:

经过构建视图、面板,查看具体告警态势

相关文章
相关标签/搜索