最近在作企业安全建设,企业安全建设中最多见的一项就是作监控,监控的种类多种多样,可是底层的技术栈却基本是一致的————大数据技术,下面我记录一下我最近学习到的一些大数据技术,下文只是描述个脉络而已。html
大数据的技术栈,以及对应的上下依赖图以下:
sql
看完这个图,是否是以为和以前学习过的网络协议、框架都很是相识,无非就是把里面的名词替换了一下而已。我感受软件产品的设计思路都是要分模块化、解耦合,你看TCP/IP协议层,每层都各司其职,每层里面的每一个功能也是按照这个整体思路继续向下设计。解耦合的好处不少,建议自行百度。apache
我我的以为,里面比较有难度的就是Flink那块,由于对数据的分析、计算处理都是在这一块中完成的,Flink也能够用storm替换,不过性能没有flink好。
当将计算结果存储到ES以后,就能够作不少事了,好比作自动告警功能了。安全
数据源能够是任何数据,不过如今采集最多的应该就是日志类数据服务器
采集器是最容易理解的,主要是用来汇总日志而后转发的,采集器的技术方案也有不少,这里举例filebeat。网络
Filebeat主要由两个组件构成:prospector(探测器)
和harvester(收集器)
,这两类组件一块儿协做完成Filebeat的工做。并发
Filebeat的工做流程以下:
当开启Filebeat程序的时候,它会启动一个或多个探测器去检测指定的日志目录或文件,对于探测器找出的每个日志文件,Filebeat会启动收集进程,每个收集进程读取一个日志文件的内容,而后将这些日志数据发送到后台处理程序,后台处理程序会集合这些事件,最后发送集合的数据到output指定的目的地。app
Filebeat在有数据源的机器安装好以后,要作的就是写一下配置,
主要配置读取文件的路径,以及输出流的位置以及相应的性能参数等,以Kafka消息中间件做为缓冲,全部的日志收集器都向Kafka输送日志流。框架
定义日志信息输出格式:分布式
<Properties> //存放日志的文件夹名称 <Property name="LOG_HOME">logs</Property> //日志文件名称 <property name="FILE_NAME">collector</property> //日志格式 //[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] 日志输入时间,东八区 //[%level{length=5}] 日志级别,debug、info、warn、error //[%thread-%tid] 当前线程信息 //[%logger] 当前日志信息所属类全路径 //[%X{hostName}] 当前节点主机名。须要经过MDC来自定义。 //[%X{ip}] 当前节点ip。须要经过MDC来自定义。 //[%X{applicationName}] 当前应用程序名。须要经过MDC来自定义。 //[%F,%L,%C,%M] %F:当前日志信息所属的文件(类)名,%L:日志信息在所属文件中的行号,%C:当前日志所属文件的全类名,%M:当前日志所属的方法名 //[%m] 日志详情 //%ex 异常信息 //%n 换行 <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n </property>
Filebeat配置参考信息:
paths: - /usr/local/logs/error-collector.log document_type: "error-log" multiline: # pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串) pattern: '^\[' # 指定匹配的表达式(匹配以 "{ 开头的字符串) negate: true # 是否匹配到 match: after # 合并到上一行的末尾 max_lines: 2000 # 最大的行数 timeout: 2s # 若是在规定时间没有新的日志事件就不等待后面的日志 fields: logbiz: collector logtopic: error-log-collector ## 按服务划分用做kafka topic evn: dev output.kafka: enabled: true hosts: ["192.168.204.139:9092"] topic: '%{[fields.logtopic]}' partition.hash: reachable_only: true compression: gzip max_message_bytes: 1000000 required_acks: 1 logging.to_files: true
Apache kafka是消息中间件的一种,功能是高吞吐量的分布式发布订阅消息系统
Kafka特色:
kafka中的消息不是kafka主动去拉去的,而必须有生产者往kafka写消息。
kafka是不会主动往消费者发布消息的,而必须有消费者主动从kafka拉取消息。
kafka名词解释:
kafka的几个名词须要知道一下,好比topic、producer、consumer、broker,下面用最俗的方式解释
kafka的单节点基本操做:
生产者
# 建立一个主题(标签),Hello-Kafka bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka # 生产者将等待来自stdin的输入并发布到Kafka集群。 默认状况下,每一个新行都做为新消息发布,而后在 config / producer.properties 文件中指定默认生产者属性。 # 在终端中键入几行消息 egg1 egg2
消费者
# 与生产者相似,在 config / consumer.proper-ties 文件中指定了缺省使用者属性。 打开一个新终端并键入如下消息消息语法。 bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka --from-beginning # 自动出现 egg1 egg2
Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通讯以及容错机制等功。
简单的说就是,Flink能够对数据流进行转换、计算、聚合等功能。若是你采集的数据须要作告警功能,那么就须要用Flink或者storm,若是只是将采集的数据进行存储,而后展现,那么就不须要用到Flink这种技术。
好比在企业安全建设中,作监控平台就须要有告警功能,采集到的监控数据会直接往 kafka 里塞,而后告警这边须要从 kafka topic 里面实时读取到监控数据,并将读取到的监控数据作一些 转换、计算、聚合等操做,而后将计算后的结果与告警规则的阈值进行比较,而后作出相应的告警措施(钉钉群、邮件、短信、电话等)。画了个简单的图以下:
flink处理静态sql的代理流程:
这个sql只能是写死在代码里面,若是是想要动态的修改sql,那么就要重启flink服务才能生效。
可是有个需求,就像下图这样,sql语句来以外部,由于须要让安全人员来描述规则,他们跟进安全态势来修改,而且须要经常更新规则来挖掘出最新安全事件,
那么就出现一个问题了,像上面的flink只能处理静态sql,想动态处理怎么办?
使用 flink-siddhi 来处理动态sql:
SIDDHI 是一款功能强大的open source CEP(Complex Event Processing)引擎引擎,具备本身的DSL,丰富的模式匹配功能和可扩展性,
使用Siddhi 引擎的好处就是,里面的sql语句能够任意修改,修改sql后,也不须要重启flink服务。
siddhi引擎我最近也是刚开始学习,这里就不过多笔墨了,后面会出siddhi的专项文章。
ES太常见了,之后有空在补充吧。
Kibana也很常见,之后有空在补充吧。但愿读者给个评论或者推荐,让我有动力更新完。
http://www.javashuo.com/article/p-ubrmippf-nd.html
https://www.jianshu.com/p/a8b66f586fd4
http://kafka.apachecn.org/
https://www.w3cschool.cn/apache_kafka/apache_kafka_introduction.html
http://www.javashuo.com/article/p-txmadmte-mc.html
https://ci.apache.org/projects/flink/flink-docs-release-1.4/
http://www.javashuo.com/article/p-hfnxkmtc-nd.html
https://baijiahao.baidu.com/s?id=1623279487849430246&wfr=spider&for=pc