单机每秒最多可处理10亿条数据!eBay开源数据处理框架Accelerator

策划编辑 | Natalie
编译 | 无明
编辑 | Emily
AI 前线导读:近日,eBay 宣布正式开源 Accelerator,一款久经考验的数据处理框架,提供快速的数据访问,并行执行以及自动组织源码、输入数据和结果。它能够用于平常数据分析,也能够用在包含数十万大型数据文件的实时推荐系统上。

Accelerator 可运行在笔记本电脑或机架式服务器上,轻松处理数十亿行数据,井井有理地处理成千上万的输入文件、计算和结果。

Accelerator 的数据吞吐量一般在每秒数百万行。若是运行在高速计算机上,每秒最多可处理几十亿行数据。

更多干货内容请关注微信公众号“AI 前线”,(ID:ai-front)

Accelerator 最初由瑞典人工智能公司 Expertmaker 开发,于 2012 年正式发布,从那之后,它一直是众多研究项目和实时推荐系统的核心工具。 2016 年,Expertmaker 被 eBay 收购,而 eBay 目前正在基于 Apache 许可协议第 2 版开源 Expertmaker Accelerator。git

设计目标

Accelerator 的主要设计目标以下:github

  • 简化在多个 CPU 上并行处理数据。数据库

  • 数据吞吐量应尽量快,即便一台小型笔记本电脑也能轻松处理数百万行数据。json

  • 若是可能,尽可能重用计算结果,而不是从新计算。一样,在多个用户之间共享结果应该是绝不费力的。小程序

  • 数据科学项目可能会有不少(数十万)输入文件和大量的源码和中间结果。vim

  • Accelerator 应该避免手动管理和记录数据文件、计算、结果以及它们之间的关系。缓存

主要功能

Accelerator 主要的原子操做是建立做业。 建立做业是用输入数据和参数执行一些程序并将结果(即输出)以及计算所需的全部信息存储到磁盘上的过程。 做业目录将包含计算结果和计算结果所需的全部信息。bash

做业能够是简单或复杂的计算,也能够是大型数据集的容器。 做业之间能够彼此连接,新做业能够依赖于一个或多个旧做业。服务器

关键特性

Accelerator 提供了两个关键功能,结果重用和数据流。微信

结果重用

在建立新做业以前,Accelerator 会检查以前是否已经跑过相同的做业。若是已经存在,Accelerator 不会建立这个做业,而是将现有做业的连接返回。这样不只节省了执行时间,并且有助于在用户之间共享结果。更重要的是,它提供了可见性和肯定性。

Accelerator 提供了一种机制,将会话中的做业信息保存到数据库中,这样有助于管理做业和它们相互之间的关系。

数据流

将连续的数据流从磁盘传输到 CPU 比在数据库中执行随机查询效率更高。流式传输是实现从磁盘到 CPU 高带宽的最佳途径。它不须要缓存,能够很好地利用操做系统的基于 RAM 的磁盘缓冲区。

总体架构

如今让咱们来看看 Accelerator 的总体架构。

是一个基于客户端 / 服务器的应用程序。在左侧有一个 runner 客户端,在右边有两台服务器,称为 daemon 和 urd,其中 urd 是可选的。runner 经过执行脚本(构建脚本)在 daemon 服务器上启动做业。此服务器将加载和存储使用基于 workdirs 文件系统的数据库执行的全部做业的信息和结果。同时,构建脚本中全部有关做业的信息将由 urd 服务器存储到做业日志文件系统的数据库中。 urd 负责管理做业,包括存储和检索以前执行过的相关做业的会话或清单。

做业

做业是经过执行称为 method 的小程序来建立的。method 用 Python 2 或 Python 3 编写,有时也用 C 语言。

最简单的做业:“Hello, World”

咱们经过一个简单的“Hello World”程序来讲明如何建立一个做业(method):

def synthesis():
  return "hello world"复制代码

这个程序不须要任何输入参数,只是返回一个字符串并退出。要执行它,咱们还须要建立一个构建脚本,以下所示:

def main(urd):
  jid = urd.build('hello_world')复制代码

当执行完这个方法以后,用户会获得一个叫做 jobid 的连接。jobid 指向存储执行结果的目录,以及运行做业所需的全部信息。

若是咱们尝试再次执行这个做业,它将不会被执行,而是返回指向上一次执行做业的 jobid,由于 Accelerator 记得以前已经执行过与此相似的做业。 要再次执行做业,咱们必须更改源代码或输入参数。

连接做业

咱们假设刚刚建立的 hello_world 做业很是耗费计算资源,并已经返回了咱们想要的结果。为了简单起见,咱们经过建立一个名为 print_result 的方法来演示其中的原理,该方法只读取前一个做业的结果并将结果打印到 stdout。

import blob

jobids = ('hello_world_job',)

def synthesis(): 
  x = blob.load(jobid=jobids.hello_world_job)
  print(x)复制代码

要建立这个做业,咱们须要扩展构建脚本:

def main(urd):
  jid = urd.build('hello_world') 
  urd.build('print_result', jobids=dict(hello_world_job=jid))复制代码

在执行构建脚本时,只会建立 print_result 做业,由于 hello_world 做业以前已经建立过了。

做业执行流程和结果传递

到目前为止,咱们已经知道如何建立、连接和执行简单的做业。如今咱们将重点转向 method。在执行 method 时,Accelerator 会调用三个函数,它们分别是 prepare()、analysis() 和 synthesis()。一个 method 能够同时调用这三个函数,或者至少调用一个。

三个函数的返回值均可以存储在做业的目录中,并被用在其余做业上。

数据集

数据集是 Accelerator 默认的存储类型,专为并行处理和高性能而设计。数据集创建在做业之上,所以数据集经过 method 来建立,并存储在做业目录中。单个做业能够包含任意数量的数据集。

在内部,数据集中的数据以行列格式存储。全部列均可以被独立访问,避免读取到没必要要的数据。数据也被分红固定数量的片断,提供并行访问能力。数据集可能会被散列,散列函数将具备相同散列值的数据行组合到同一个片断中。

导入数据

让咱们来看看导入文件(建立数据集)的常见操做。csvimport 方法可用于导入许多不一样的文件类型,它能够解析大量的 CSV 格式的文件,并将数据存储为数据集。建立的数据集存储在结果做业中,数据集的名称默认为 jobid 加上字符串 default,也可使用自定义字符串。

连接数据集

就像做业同样,数据集也能够相互连接。因为数据集是创建在做业之上的,因此连接数据集就很简单。例如,假设咱们刚刚将 file0.txt 导入 imp-0,而且 file1.txt 中存储了更多数据。咱们能够导入后一个文件并提供一个指向前一个数据集的连接。因为数据集已连接,如今可使用 imp-1(或 imp-1/default)数据集引用来访问从这两个数据集导入的全部数据文件。

在处理随时间增加的数据(如日志数据)时,使用连接十分方便。咱们能够经过连接扩展具备更多行的数据集,这是一个很是轻量级的操做。

将新列添加到数据集

添加列是很经常使用操做,Accelerator 经过连接来处理新列。

原理很简单,假设咱们有一个“源”数据集,咱们要添加一个新列,只须要建立一个只包含新列的新数据集,并在建立它时让 Accelerator 将全部源数据集的列连接到新数据集。

并行执行

Accelerator 专为并行处理而设计,主要经过分片数据集和并行 analysis() 调用组合来实现并行处理。

迭代器在 analysis() 函数内部运行,该函数为每一个数据集片断 fork 一次。analysis() 函数的返回值将做为 synthesis() 函数的输入。咱们能够显式地合并结果,不过 analysis_res 带有一个至关神奇的方法 merge_auto(),它根据数据类型将全部片断的结果合并为一个。

urd

咱们已经看到 Accelerator 如何跟踪已经建立好的做业,并在必要时重用做业。这样节省了时间并将相关的计算连接在一块儿,不过,在这之上还有另外一个层能够进一步提升可视性和做业重用性,它就是 urd 服务器。

urd 将做业清单及其依赖关系存储在基于日志文件的数据库中。在构建脚本中发生的全部事情均可以记录到 urd 中。为了作到这一点,咱们须要一个名单来存储信息,还须要一个密钥,而在大多数状况下还须要一个日期,方便往后查找。

性能测试

新做业的启动时间只有几分之一秒。如下是一些不一样做业类型的处理时间。

准备数据:导入、类型转换和散列

示例数据文件大小为 1.1TB(压缩后 280GB),包含 63 亿行和 14 列。Accelerator 运行在具备 72 核心和快速磁盘的大型机上。

上述数值是基于所有数据得出的。导入做业(A):导入 gz 压缩文件。有趣的是,导入比普通的 zcat file.gz> /dev/null 要快 30%。在 FreeBSD 上,zcat 速度更快。类型转换做业(B):5 个 json-list、5 个数字、2 个日期和 2 个 unicode 列,每行平均有 172 个字节。该做业的读取速度超过每秒半千兆字节,同时保存几乎相同数量的数据到磁盘上,所以磁盘带宽高于每秒 1 千兆字节。因为散列速度取决于被散列的列,所以显示的值(C)是四个散列做业的平均值。

处理数据

为了计算Σ(a×b×c),咱们经过一个 method 读取三个列、将它们的值相乘并将结果写入新列。第二个做业为新列添加值。

能够看到,将三个 float64 相乘并写回到磁盘其实是很快的——每秒 7 千 7 百万行。将这些值汇总在一块儿甚至更快——每秒超过十亿个值。而在 Python 中,执行一样操做须要 6 秒。

结论

Accelerator 是一款用于快速数据处理的工具。在单机上,每秒能够处理数百万行数据,若是任务简单,能够每秒处理 10 亿行。除了速度快以外,Accelerator 还能够减小手动管理源文件、数据文件、计算以及相关结果的工做。它已被成功用在多个项目中,eBay 如今正式将其开源。

相关连接:

ExpertMaker Accelerator 代码仓库 (https://github.com/eBay/accelerator)

Installer 仓库 (https://github.com/eBay/accelerator-project_skeleton)

Accelerator 用户参考手册 (https://berkeman.github.io/pdf/acc_manual.pdf)


更多干货内容请关注微信公众号“AI 前线”,(ID:ai-front)

相关文章
相关标签/搜索