360 政企安全集团基于 Flink 的 PB 级数据即席查询实践

简介:Threat Hunting 平台的架构与设计,及以下降 IO 为目标的优化与探索。为何以及如何使用块索引。前端

本文整理自 360 政企安全集团的大数据工程师苏军以及刘佳在 Flink Forward Asia 2020 分享的议题《基于 Flink 的 PB 级数据即席查询实践》,文章内容为:git

  1. Threat Hunting 平台的架构与设计(苏军)
  2. 以下降 IO 为目标的优化与探索(刘佳)
  3. 将来规划

GitHub 地址
https://github.com/apache/flink
欢迎你们给 Flink 点赞送 star~github

 title=

首先作一个简单的我的以及团队介绍。咱们来自 360 政企安全集团,目前主要从事 360 安全大脑的 “威胁狩猎“ 项目的开发工做。咱们团队接触 Flink 的时间比较早,在此期间,咱们基于 Flink 开发出了多款产品,并在 2017 年和 2019 年参加了于柏林举办的 Flink Forward 大会,分别介绍了咱们的 “UEBA” 以及 “AutoML” 两款产品。算法

 title=

本次分享主要分为两块内容:数据库

  • 第一部分 “Threat Hunting 平台的架构与设计” 将由苏军来为你们分享;
  • 第二部分 “以下降 IO 为目标的优化与探索” 将由刘佳来为你们分享。

1、Threat Hunting 平台的架构与设计 (苏军)

第一部份内容大体分为三个部分,分别是:apache

  • 平台的演进
  • 架构设计
  • 深刻探索索引结构

1. 平台的演进

 title=

咱们认为全部技术的演化和革新都须要具体的商业问题来驱动,如下是咱们团队近几年基于 Flink 开发的几款产品:segmentfault

  • 2017 年咱们基于 Flink DataStream 开发了用户行为分析系统 UEBA,它是经过接入企业 IT 拓扑的各种行为数据,好比身份认证数据、应用系统访问数据、终端安全数据、网络流量解析数据等等,以用户 / 资产为核心来进行威胁行为的实时检测,最后构建出用户威胁等级和画像的系统;
  • 2018 年基于 UEBA 的实施经验,咱们发现安全分析人员每每须要一种手段来获取安全事件对应的原始日志,去进一步确认安全威胁的源头和解决方式。因而咱们基于 Spark 开发了 HQL 来解决在离线模式下的数据检索问题,其中 HQL 能够认为是表达能力比 SQL 更加丰富的查询语言,大体能够看做是在 SQL 能力的基础上增长了算法类算;
  • 2019 年随着离线 HQL 在客户那边的使用,咱们发现其自己就可以快速定义安全规则,构建威胁模型,若是在离线模式下写完语句后直接发布成在线任务,会大大缩短开发周期,加上 Flink SQL 能力相对完善,因而咱们基于 Flink SQL + CEP 来升级了 HQL 的能力,产生了 HQL RealTime 版本;
  • 2020 年随着客户数据量的增大,不少已经达到了 PB 级,过往的解决方案致使离线的数据检索性能远远低于预期,安全分析人员习惯使用 like 和全文检索等模糊匹配操做,形成查询延时很是大。因而从今年开始,咱们着重优化 HQL 的离线检索能力,并推出了全新的 Threat Hunting 平台。

 title=

经过调查发现,拥有 PB 级数据规模的客户每每有如下几个商业需求:api

  • 第一是低成本的云原生架构。咱们知道目前大部分的大数据架构都是基于 hadoop 的,其特色是数据就在计算节点上,可以减小大量网络开销,加速计算性能。可是整个集群为了作到资源均衡,每每须要相同的资源配置,且为了可以存储尽可能多的数据,集群规模会很大, 因此这类架构在前期须要投入大量硬件成本。缓存

    而存算分离和弹性计算则可以解决这一问题,由于磁盘的价格是远低于内存和 CPU 的,因此用廉价的磁盘存储搭配低配 CPU 和内存来存储数据,用少许高配机器来作计算,能够在很大程度上下降成本。安全

  • 第二是低延时的查询响应。安全分析人员在作威胁检测时,大部分时间是即席查询,即经过过滤、join 来作数据的检索和关联。为了可以尽快的获取查询结果,对应的技术方案是:列存/索引/缓存。

    • 列存不用多说了,是大数据领域常见的存储方案;
    • 在列存的基础上,高效的索引方案可以大量下降 io,提升查询性能;
    • 而存算分析带来的网络延时能够由分布式缓存来弥补。
  • 第三是须要丰富的查询能力,其中包括单行的 fields/filter/udf 等,多行的聚合 /join,甚至算法类的分析能力,这部分咱们主要依赖于本身开发的分析语言 HQL 来提供。

2. 架构设计

 title=

首先,数据是来自于已经存储在 ES 中的历史数据和 kafka 里的实时数据,其中 ES 里的历史数据咱们经过本身开发的同步工具来同步,kafka 里的实时数据咱们则经过 Streaming File Sink 写 orc 文件到存储集群。在数据同步的同时,咱们会将这批数据的索引信息更新到数据库中。

安全分析人员会从前端页面经过写交互式分析语言 HQL 发起数据检索的请求,此时请求会进入调度系统,一旦开始执行做业,首先会将分析语句解析成算子列表,算子缓存算法会判断该次查询是否能够命中缓存系统中已有的缓存数据。

  • 若是分析语句的输入是已经算好而且 cache 好了的中间结果,那么直接读取缓存来继续计算;
  • 若是不能命中,证实咱们必须从 orc 文件开始从新计算。

咱们会先提取出查询语言的过滤条件或者是 Join 条件来作谓词下推,进入索引数据库中得到目前符合该查询的文件列表,随后将文件列表交给计算引擎来进行计算。计算引擎咱们采用双引擎模式,其中复杂度高的语句咱们经过 Flink 引擎来完成,其它较为简单的任务咱们交给平台内部的 “蜂鸟引擎”。“蜂鸟引擎” 基于 Apache arrow 作向量化执行,加上 LLVM 编译,查询延迟会很是小。

因为整个系统的存算分离,为了加速数据读取,咱们在计算集群节点上增长了 alluxio 来提供数据缓存服务,其中不只缓存 remote cluster 上的数据,同时会缓存部分历史做业结果,经过算子缓存的算法来加速下次计算任务。

这里还须要强调两点:

  • 第一点是索引数据库会返回一批符合该条件的文件列表,若是文件列表很是大的话,当前的 Flink 版本在构建 job graph 时,在获取 Filelist Statistics 逻辑这里在遍历大量文件的时候,会形成长时间没法构建出 job graph 的问题。目前咱们对其进行了修复,后期会贡献给社区。
  • 第二点是数据缓存那一块,咱们的 HQL 以前是经过 Spark 来实现的。用过 Spark 的人可能知道,Spark 会把一个 table 来作 cache 或 persist。咱们在迁移到 Flink 的时候,也沿用了这个算子。Flink 这边咱们本身实现了一套,就是用户在 cache table 时,咱们会把它注册成一个全新的 table source,后面在从新读取的时候只会用这个新的 table source 来打通整个流程。

3. 深刻探索索引结构

 title=

数据库为了加速数据检索,咱们每每会事先为数据建立索引,再在扫描数据以前经过索引定位到数据的起始位置,从而加速数据检索。而传统数据库常见的是行索引,经过一个或若干字段建立索引,索引结果以树形结构存储,此类索引可以精确到行级别,索引效率最高。

某些大数据项目也支持了行索引,而它所带来的弊端就是大量的索引数据会形成写入和检索的延时。而咱们平台处理的是机器数据,例如终端/网络这类数据,它的特色是重复度很是高,而安全分析的结果每每很是少,极少数的威胁行为会隐藏在海量数据里,占比每每会是 1/1000 甚至更少。

因此咱们选择性价比更高的块索引方案,已经可以支撑目前的应用场景。目前经过客户数据来看, 索引可以为 85% 的语句提供 90% 以上的裁剪率,基本知足延时要求。

 title=

某些大数据平台是将索引数据以文件的形式存储在磁盘上,外加一些 cache 机制来加速数据访问,而咱们是将索引数据直接存在了数据库中。主要有如下两个方面的考虑:

  • 第一是 transaction。咱们知道列存文件每每是没法 update 的,而咱们在按期优化文件分布时会作 Merge File 操做,为了保证查询一致性,须要数据库提供 transaction 能力。
  • 第二是性能。数据库拥有较强的读写和检索能力,甚至能够将谓词下推到数据库来完成,数据库的高压缩比也能进一步节省存储。

 title=

上图为块索引的设计。在咱们的索引数据库中,咱们把这些数据分为不一样类别数据源,好比终端数据为一类数据源,网络数据为一类数据源,咱们分类数据源的逻辑是他们是否拥有统一的 Schema。就单个数据源来讲,它以日期做为 Partition,Partition 内部是大量的 ORC 小文件,具体到索引结构,咱们会为每个字段建 min/max 索引,基数小于 0.001 的字段咱们建 Bloom 索引。

上文提到过,安全人员比较喜欢用 like 和全文检索。对于 like 这一块,咱们也作了一些优化。全文检索方面,咱们会为数据来作分词,来构建倒排索引,同时也会对于单个分词事后的单个 item 来作文件分布层面的位图索引。

 title=

上图是一个索引大小的大体的比例假设,JSON 格式的原始日志大有 50PB,转化成 ORC 大概是 1PB 左右。咱们的 Index 数据是 508GB, 其中 8GB 为 Min/Max 索引,500GB 为 Bloom。加上上文提到的位图以及倒排,这个索引数据的占比会进一步加大。基于此,咱们采用的是分布式的索引方案。

 title=

咱们知道日志是在不断的进行变化的,对于有的数据员来讲,他有时会增长字段或者减小字段,甚至有时字段类型也会发生变化。

那么咱们采起这种 Merge Schema 模式方案,在文件增量写入的过程当中,也就是在更新这批数据的索引信息的同时来作 Schema Merge 的操做。如图所示,在 block123 中,文件 3 是最后一个写入的。随着文件的不断写入,会组成一个全新的 Merge Schema。能够看到 B 字段和 C 字段实际上是历史字段,而 A\_V 字段是 A 字段的历史版本字段,咱们用这种方式来尽可能多的让客户看到比较全的数据。最后基于本身开发的 Input format 加 Merge Schema 来构建一个新的 table source ,从而打通整个流程。

2、以下降 IO 为目标的优化与探索 (刘佳)

上文介绍了为何要选择块索引,那么接下来将具体介绍如何使用块索引。块索引的核心能够落在两个字上:“裁剪”。裁剪就是在查询语句被真正执行前就将无关的文件给过滤掉,尽量减小进入计算引擎的数据量,从数据源端进行节流。

 title=

这张图展现了整个系统使用 IndexDB 来作裁剪流程:

  • 第一步是解析查询语句。获取到相关的 filter
,能够看到最左边的 SQL 语句中有两个过滤条件, 分别是 src\_address = 某个 ip,occur\_time > 某个时间戳。
  • 第二步将查询条件带入 Index DB 对应数据源的 meta 表中去进行文件筛选
。src\_address 是字符串类型字段,它会联合使用 min/max 和 bloom 索引进行裁剪。occur\_time 是数值类型字段而且是时间字段,咱们会优先查找 min/max 索引来进行文件裁剪。须要强调的是, 这里咱们是将用户写的 filter 封装成了 index db 的查询条件,直接将 filter pushdown 到数据库中完成。
  • 第三步在获取到文件列表后,这些文件加上前面提到的 merged schema 会共同构形成一个 TableSource 来交给 Flink 进行后续计算。

同时,构建 source 的时候,咱们在细节上作了一些优化。好比在将 filter 传给 ORC reader 的时候,清除掉已经 pushdown 了的 filter, 避免在引擎侧进行二次过滤。固然, 这里并非将全部 filter 都清除掉了,咱们保留了 like 表达式,关于 like 的 filter pushdown 会在后文介绍。

 title=

接下来着重介绍一下四大优化点:

  • 第一点,数据在未排序的状况下,裁剪率是有理论上限的,咱们经过在数据写入的时候使用 hilbert 曲线排序原始数据来提高裁剪率;
  • 第二点,由于安全领域的特殊性,作威胁检测严重依赖 like 语法,因此咱们对 orc api 进行了加强,使其支持了 like 语法的下推;
  • 第三点,一样是由于使用场景严重依赖 join,因此咱们对 join 操做也作了相应的优化;
  • 第四点,咱们的系统底层支持多种文件系统,因此咱们选取 Alluxio 这一成熟的云原生数据编排系统来作数据缓存,提升数据的访问局部性。

1. 裁剪率的理论上限及 Hilbert 空间填充曲线

 title=

裁剪能够抽象成 N 个球扔进 M 个桶的几率问题,在这里咱们直接说结论。假设行在块中随机均匀分布,全部块的总行数固定,查询条件命中的总行数也固定,则块命中率直接与 “命中的总行数 / 总块数” 正相关。

结论有两个:

  • 第一点,若是命中总行数 = 总块数,即 X 轴值为 1 的时候,命中率为 2/3, 也就是 2/3 的块,都包含命中的行,对应的块修剪率的上限是 1/ 3。1/3 是一个很低数值,可是因为它的前提是数据随机均匀分布,因此为了让数据分布更好,咱们须要在数据写入时对原始数据进行排序。
  • 第二点,假设命中总行数固定,那么大幅度减小每块中的行数来增长总块数,也能提高块修剪率。因此咱们缩小了块大小。根据测试结果,咱们设定每一个文件的大小为:16M。缩小文件大小是很简单的。针对排序,咱们引入了 hilbert 空间填充曲线。

 title=

为何使用 hilbert 曲线?主要是基于两点:

  • 首先是,以什么路径遍历 2 维空间,使路径的地址序列对其中任一维度都基本有序?为何要对每一列或者说子集都有序?由于系统在使用的过程当中,查询条件是不固定的。数据写入时排序用到了 5 个字段,查询的时候可能只用到了其中的一个或两个字段。Hilbert 排序能让多个字段作到既总体有序,又局部有序。
  • 另外,空间填充曲线有不少,还有 Z 形曲线、蛇形曲线等等,你们能够看看右边这两张对比图。直观的看,曲线路径的长跨度跳跃越少越好,点的位置在迭代过程当中越稳定越好。 而 hilbert 曲线在空间填充曲线里面综合表现最好。

hilbert 用法,就是实现一个 UDF,输入列值,输出坐标值,而后根据坐标值排序。

 title=

咱们抽样了客户环境所使用的 1500 条 SQL 语句,过滤掉了其中裁剪率为分之 100% 的相关语句,也就是没有命中文件的无效语句。而后还剩下 1148 条,咱们使用这些语句作了裁剪率排序后,对裁剪率进行了对比,裁剪率 95 百分位从以前的 68% 提高到了 87%,提高了 19%。可能你们会以为 19% 这个数值不是特别高,但若是咱们带上一个基数,好比说 10 万个文件,这样看的话就会很可观了。

2. 字典索引上 Like 的优化

 title=

以前也有讲到安全行业的特殊性,咱们作威胁检测的时候会严重依赖 like 查询。鉴于此,咱们也对它作了优化。

  • 首先咱们为 ORC api 添加了 like 条件表达式,保证 SQL 中的 like 能下推到 orc record reader 中。
  • 其次,重构了 orc record reader 的 row group filter 逻辑,若是发现是 like 表达式,首先读取该字段的 dict steam,判断 dict stream 是否包含 like 目标字符串,若是字典中不存在该值,直接跳过该 row group,不用读取 data stream 和 length steam,能大幅提升文件读取速度。后期咱们也考虑构建字典索引到索引数据库中,直接将字典过滤 pushdown 到数据库中完成。

例如图上所示,最左边的 SQL 中有三个表达式。前两个在上文中已经提到了,是将 filter 直接 pushdown 到 index db 中完成,咱们交给 orc reader 的 filter 只有最后一个 attachment\_name like '%投标%',真正须要读取的记录只是 dict 包含 ”投标“ 的 row group,也就是作到了 row group 级别的过滤,进一步减小了须要进入计算引擎的数据量。

3. 基于索引对 join 的优化

 title=

威胁情报的匹配中大量使用 join 操做,若是要加速 join 的性能,仅仅是 where 条件的 filter pushdown 是远远不够的。

Flink 中已经内置了许多 join 算法,好比 broadcast join, hash join 和 sort merge join。其中,sort merge join 对预先排好序的表 join 很是友好,而上文有提到咱们使用 Hilbert 曲线来对多字段进行联合排序,因此 sort merge join 暂时不在咱们的优化范围以内。

另外,咱们知道 join 的性能和左右表的大小正相关,而威胁情报 join 的稀疏度很是高,因此事先对左右表作裁剪,可以大幅减小进入 join 阶段的数据。

上文提到过咱们已经为常见字段创建了 bloom 索引。那么利用这些已经建立好的 bloom,来进行文件预过滤,就变得瓜熟蒂落,而且省掉了构建 bloom 的时间开销。

对于 broadcast join,咱们直接扫描小表,将小表记录依次进入大表所属文件的 bloom,判断该数据块是否须要, 对数据量大的表作预裁剪。

对于 hash join,正如咱们看到的,咱们能够预先对 join key 的文件级 bloom 作 “预 join” 操做,具体就是将左表所属的某个文件的 bloom 依次与右表所属文件的 bloom 作 “与” 操做,只保留左右表能 ”与后结果条数不为 0“ 的文件,再让各表剩余的文件进入引擎作后续计算。

 title=

好比说图上的这三张表,分别是 table一、 table2 和 table3 。咱们能够从 index DB 中获取到表的统计信息,也就是文件个数或者说是文件表的大小。图上就直接列的是文件个数:table 1 是 1000 个, 而后 table 2 是 5 万个文件, table 3 是 3 万个文件。

咱们就是参照上一张图片里面的逻辑进行预 join,而后预估 join 的成本。咱们会让成本低的预 join 先进行,这样的话就可以大幅度减小中间结果,提高 join 的效率。

4. Alluxio 做为对象存储的缓存

 title=

由于底层文件存储系统的多种多样,因此咱们选取了 Alluxio 数据编排系统,Alluxio 的优势是让数据更靠近计算框架,利用内存或者 SSD 多级缓存机制加速文件访问,若是在彻底命中 cache 的状况下,可以达到内存级 IO 的文件访问速度,减小直接从底层文件系统读文件的频次,很大程度上缓解了底层文件系统的压力。

对咱们系统来讲就是它带来了更高的并发,并且对低裁剪率的查询更友好,由于低裁剪率的话就意味着须要读取大量的文件。

若是这些文件在以前的查询中已经被 load 到 cache 里面,就可以大幅度的提高查询速度。

 title=

在作完这些优化之后,咱们作了性能对比测试。咱们选取了一个规模为 249TB 的 es 集群。它使用了 20 台服务器,Flink 使用了两台服务器,为了在图标上看到更直观的对比效果,咱们选取了 16 条测试结果。

图表上红橙色的是 es,蓝色的是 HQL 优化前,绿色的是 HQL 优化后。上面的数字标签是与 es 相比,HQL 的性能差值。好比第一个标签就意味着 HQL 的性能五倍于 es,其中 6 号和 7 号比 es 慢,主要是由于 HQL 是块索引,es 是行索引,全在内存里面,因此能够作到超快的检索速度。13 号是由于 HQL 在使用 not equal 的状况下,裁剪率相对较差。

整体说,优化效果是很明显的,大部分语句在与 es 查询速度相比是持平甚至略优的。彻底知足客户对长周期数据存储和查询的指望。

3、将来规划

 title=

上图是将来规划。由于客户现场常常会涉及到不少的 BI Dashboard 运算和长周期运算报告的需求,因此咱们下一步会考虑作 BI 预算,以及苏军提到的容器化和 JVM 预热,固然还有对标 es,以及提高多用户并发查询的能力。


第一时间获取最新技术文章和社区动态,请关注公众号~

本文内容由阿里云实名注册用户自发贡献,版权归原做者全部,阿里云开发者社区不拥有其著做权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。若是您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将马上删除涉嫌侵权内容。
相关文章
相关标签/搜索