国内的Azure最近上线了网络观察程序服务,能够帮助用户监控和分析VNET虚拟网络。其中一个很重要的功能就是能够记录NSG的安全访问日志了。可是若是用户设置了NSG流日志,并下载日志想要分析一下的话,会发现日志其实并非很友好,NSG流日志是以json格式记录的,能够看到的内容大体以下图所示:日志会记录NSG规则名,系统时间,源地址,目的地址,源端口,目的端口,协议类型,流量方向和处理规则。html
不过全部的记录都连在一块儿,若是要查找某个具体的安全访问记录,很是困难。前端
不过咱们可使用多个多种开源工具将相关日志数据可视化。最近正好有客户但愿可使用比较流行的ELK套件来分析NSG安全记录。在参考了微软和 Elastic.co的官方文档,以及同事的文章后,终于实现了使用ELK套件获取并分析Azure.cn的NSG日志,特此记录一下过程和注意事项。java
简单介绍一下ELK,ELK是 Elasticsearch、Logstash、Kibana 三个开源软件的组合。在实时数据检索和分析场合,三者一般是配合共用,并且又都前后归于 Elastic.co 公司名下,故此简称ELK。它具备以下几个优势:git
要部署ELK,咱们如今Azure上部署一台Centos 7.X的虚机,并把这台虚机的NSG做为日志来源。github
经过如下步骤开启NSG流日志:编程
在Azure的服务里找到“网络观察程序”服务。json
在网络观察程序里,选中NSG流日志:windows
在右侧用资源组过滤,找到须要分析的NSG。centos
选中NSG,开启NSG流日志,指定日志存放的存储帐号和日志保留时间。api
完成了Azure NSG流日志的设置,咱们就能够开始安装部署ELK,来获取数据并进行分析了。
一、经过SSH登陆到刚才创建的Centos虚机。先安装Java运行环境:
yum install java-1.8.0-openjdk
二、下载可信签名证书:
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
三、建立 Elasticsearch 的yum repo文件
vi /etc/yum.repos.d/elasticsearch.repo
输入如下内容
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
四、安装Elasticsearch包
yum install elasticsearch
五、修改配置参数,容许外部访问
vi /etc/elasticsearch/ elasticsearch.yml
找到network.host参数,设置为0.0.0.0
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host: 0.0.0.0
#
# Set a custom port for HTTP:
#
http.port: 9200
#
六、启动Elasticsearch并设置系统启动
systemctl enable elasticsearch.service
systemctl start elasticsearch.service
七、建立 Logstash的yum repo文件
vi /etc/yum.repos.d/logstash.repo
输入如下内容
[logstash-5.x]
name=Elastic repository for 5.x packages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
八、安装Logstash包
yum install logstash
九、安装读取Azure Blob存储的插件
/usr/share/logstash/bin/logstash-plugin install logstash-input-azureblob
十、设置存储读取位置为Azure.cn,默认是读取WW的Azure
vi /usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-azureblob-0.9.12-java/lib/logstash/inputs/azureblob.rb
找到如下内容
config :endpoint, :validate => :string, :default => 'core.windows.net'
将endpoint修改成中国Azure的endpoint:core.chinacloudapi.cn
config :endpoint, :validate => :string, :default => 'core.chinacloudapi.cn'
十一、建立Logstash的配置文件
在Azure门户中中找到所使用存储帐号的访问密钥
vi /etc/logstash/conf.d/logstash.conf
输入如下内容
input {
azureblob
{
storage_account_name => "此处更改成NSG流日志所使用的存储帐户"
storage_access_key => "此处更改成存储帐户的访问密钥"
container => "insights-logs-networksecuritygroupflowevent"
codec => "json"
# Refer https://docs.microsoft.com/en-us/azure/network-watcher/network-watcher-read-nsg-flow-logs
# Typical numbers could be 21/9 or 12/2 depends on the nsg log file types
file_head_bytes => 12
file_tail_bytes => 2
# Enable / tweak these settings when event is too big for codec to handle.
# break_json_down_policy => "with_head_tail"
# break_json_batch_count => 2
}
}
filter {
split { field => "[records]" }
split { field => "[records][properties][flows]"}
split { field => "[records][properties][flows][flows]"}
split { field => "[records][properties][flows][flows][flowTuples]"}
mutate{
split => { "[records][resourceId]" => "/"}
add_field => {"Subscription" => "%{[records][resourceId][2]}"
"ResourceGroup" => "%{[records][resourceId][4]}"
"NetworkSecurityGroup" => "%{[records][resourceId][8]}"}
convert => {"Subscription" => "string"}
convert => {"ResourceGroup" => "string"}
convert => {"NetworkSecurityGroup" => "string"}
split => { "[records][properties][flows][flows][flowTuples]" => ","}
add_field => {
"unixtimestamp" => "%{[records][properties][flows][flows][flowTuples][0]}"
"srcIp" => "%{[records][properties][flows][flows][flowTuples][1]}"
"destIp" => "%{[records][properties][flows][flows][flowTuples][2]}"
"srcPort" => "%{[records][properties][flows][flows][flowTuples][3]}"
"destPort" => "%{[records][properties][flows][flows][flowTuples][4]}"
"protocol" => "%{[records][properties][flows][flows][flowTuples][5]}"
"trafficflow" => "%{[records][properties][flows][flows][flowTuples][6]}"
"traffic" => "%{[records][properties][flows][flows][flowTuples][7]}"
}
convert => {"unixtimestamp" => "integer"}
convert => {"srcPort" => "integer"}
convert => {"destPort" => "integer"}
}
date{
match => ["unixtimestamp" , "UNIX"]
}
}
output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "localhost"
index => "nsg-flow-logs"
}
}
十二、启动Logstash并设置系统启动
systemctl enable logstash.service
systemctl start logstash.service
1三、建立 Kibana的yum repo文件
vi /etc/yum.repos.d/kibana.repo
输入如下内容
[kibana-4.5]
name=Kibana repository for 4.5.x packages
baseurl=http://packages.elastic.co/kibana/4.5/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
1四、安装Kibana包
yum install kibana
1五、设置Kibana参数,容许外部访问
vi /etc/kibana/kibana.yml
找到如下server.host参数,修改成0.0.0.0
# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "0.0.0.0"
1六、启动Kibana并设置系统启动
systemctl enable kibana.service
systemctl start kibana.service
1七、检查ELK服务状态
systemctl status elasticsearch
systemctl status logstash
systemctl status kibana
1八、设置虚机的NSG策略,容许外部访问kibana门户
1九、登陆Kibana门户
http://hospip:5601
20、设置state:storeInSessionStorage参数
在“Management”选项卡下的 Advanced Setting里找到state:storeInSessionStorage参数,并enable
2一、在“Management”选项卡下,经过“Index Patterns” 建立索引。
2二、微软已经很贴心的为你们准备了示例的Dashboard,能够直接导入,之后你们还能够按照本身的须要设计
在 Kibana 的“Management”选项卡下,经过“Saved Objects”导入文件。 而后,可从“Dashboard”选项卡打开并加载示例Dashboard。
进入Dashboard就能够查看当前设置的一些示例报表
在Dashboard的右上角能够设置日志分析的时间段,能够帮助用户迅速定位到须要分析的时间点
最后感谢康老师的支持。他的文档给了我很大帮助。
康老师的博客地址:https://www.azure.cn/blog/2017/12/27/AZURE-NSG-FLOW-LOG-Analysis/
Azure官方配置文档说明:https://docs.microsoft.com/zh-cn/azure/network-watcher/network-watcher-visualize-nsg-flow-logs-open-source-tools
若是须要搭建ELK集群服务,能够参考此文档:https://docs.azure.cn/zh-cn/articles/training/open-source-azure-virtual-machines-create-elk-cluster
ELK官方文档:https://www.elastic.co/guide/index.html
Azure存储插件的说明:https://github.com/Azure/azure-diagnostics-tools/tree/master/Logstash/logstash-input-azureblob
固然用户也可使用PowerBI服务来分析NSG的流日志,这样就能够不用本身安装和部署了。文档位置:https://docs.microsoft.com/zh-cn/azure/network-watcher/network-watcher-visualize-nsg-flow-logs-power-bi