支持流式处理ACID事务!Flink团队开源新做Streaming Ledger

流式处理的下一个演化步骤git

 

在 data Artisans,咱们目击了数据流式处理的疯狂式增加,从早期阶段到快速增加的市场,预计到 2025 年将达到近 500 亿美圆的规模。从 Apache Flink 建立之初,咱们就坚信流式处理是一项能够为企业的任务关键型应用程序提供强大支持的技术。github

随着愈来愈多的行业采用流式处理,并为愈来愈多的任务关键型应用程序提供支持,这项技术自己也在不断发展,以便为数据和计算的正确性提供更好的保证。数据库

 

 第 1 步:数据分析的分布式流式处理(“至少一次保证”)架构

 

第一个分布式流式处理器的主要目标是数据分析应用程序,提供了一种实时非精确和异步精确的分析方法。这种方式被称为“Lambda 架构”,流式处理器基于到达的数据提供不精确的分析结果,同时批处理器以小时或天为单位提供精确的分析结果。这种流式处理保证被称为“至少一次处理”。这是流式处理系统可以提供的最弱的正确性保证,是流式处理技术的第一步。并发

 

 第 2 步:单键应用程序的分布式流式处理(“刚好一次保证”)app

 

Apache Flink 率先采用了真正的有状态流式处理,提供了大规模的“刚好一次保证”。这样就能够经过能过提供强正确性保证的流式处理技术来构建分析类型和事务类型的应用程序,减小对 Lambda 架构和批处理器的需求。框架

现在,市场上有不少流式处理器都提供了强大的一致性保证,但仅适用于特定类型的应用程序,这些应用程序每次只更新单个键。也就是说,若是一个应用程序每次只更新单个银行帐户余额,那就能够经过流式处理来实现,但若是应用程序要将资金从一个银行帐户转移到另外一个银行帐户,就难以实现强一致性。机器学习

 

 第 3 步:通常应用程序的分布式流式处理(“ACID 保证”)异步

 

随着 Streaming Ledger 成为 data Artisans 平台的一部分,用户如今能够构建同时读取或更新多条记录和多张表的应用程序,并实现 ACID 事务支持。分布式

Streaming Ledger 在提供这些保证的同时,还能保持流式处理(刚好一次保证)的伸缩能力,并且不会影响应用程序的速度、性能、可伸缩性或可用性。

咱们能够将 Lambda 架构的至少一次保证视为最终一致性的一种形式(由于批处理系统最终会遇上来)。Flink 提供的“刚好一次保证”相似于分布式键值存储系统为单键操做提供的一致性保证,而 Streaming Ledger 提供的保证相似于关系型数据库提供的 ACID 保证。

咱们相信这是流式处理的下一个演化步骤,它为以正确、可伸缩和灵活的方式实现基于流式架构的应用程序打开了大门。

 

多键和多表事务简介

 

大部分关系型数据库管理系统会执行 ACID 事务,每一个事务经过 ACID 语义在串行化的隔离级别下修改数据表,以此来实现完整的数据一致性。根据 ACID 语义,全部事务都是 Atomic、Consistent、Isolated 和 Durable 的。ACID 语义在金融服务或电子商务等行业中扮演着重要的角色。

让咱们举一个经典的例子,假设有一个账户余额表,基于这个表将一个账户的钱转到另外一个账户上。为了确保事务的正确性,转帐操做必须同时修改两个账户或都不作修改(原子性),并且只有在源账户有足够资金时才能进行转帐(一致性),而且不存在其余可能致使错误结果的操做(隔离、无异常)。任何违反这些条件的操做都会致使资金丢失,最终致使帐户余额不正确。

上图显示了如何在有状态流式处理器中实现这个示例。其中的账户 ID 就是键,并且两个账户位于不一样的分片中。这两个分片对对方都没有访问权限或者对对方的状态持有一致的视图,这使得实现这一的框架变得至关复杂,由于须要经过下列方式在两个分片之间传递状态:

  1. 提供一致的状态视图

  2. 可以管理并发修改

  3. 确保修改的原子性

 

超越流式处理框架的刚好一次语义

 

data Artisans Streaming Ledger 基于 Apache Flink 构建,为跨多个表和单表多行的多个数据流提供执行串行化事务的能力。它能够被视为等效于键值存储系统(甚至是跨多个键值存储系统)的多行事务。Streaming Ledger 使用 Flink 的状态来存储表,因此就不须要额外的存储或系统配置。Streaming Ledger 应用程序的构建块由表、事务事件流、事务函数和可选的结果流组成。

有关更多信息,请下载白皮书(https://data-artisans.com/download-the-data-artisans-streaming-ledger-whitepaper )。

data Artisans Streaming Ledger 为使用流式处理构建新的应用程序类型打开了一个大门,这类应用程序在之前只能依赖关系型数据库。如今,数据密集型实时应用程序(如欺诈检测、机器学习和实时交易订价)能够绝不费力地迁移到流式处理平台上。在下一节中,咱们将经过具体的示例演示使用 data Artisans Streaming Ledger 进行开发的必要步骤。

 

Streaming Ledger 用例演示

 

data Artisans Streaming Ledger 很是适合用于处理涉及多个状态的用例,它支持对多个状态进行事务性的修改,这些修改彼此隔离,并遵循串行化一致性原则。

咱们假设有一个实时应用程序,它负责在账户和分类账条目之间识别汇款模式。

这个应用程序须要维护两张 Flink 状态表:第一张表叫做“Accounts”,第二张叫做“Asset Ledger”。应用程序消费事务事件流,例如账户之间、分类账条目之间或两者之间的转帐。当有事件进入时,不一样的事务类型会被应用在每一个事件类型上,而后访问相关行,检查前置条件,并决定是处理仍是拒绝转帐操做。对于帐户之间的转帐,它会更新表中的各个行。对于帐户和分类帐之间的转帐,它会生成结果事件,表示转帐是被接受仍是被拒绝。下图显示了架构细节:

Streaming Ledger 公开了一个易于使用的 API,对于有过流式处理使用经验的用户和熟悉关系型数据库的用户来讲,这个 API 能够轻松上手。在咱们的例子中,咱们使用了如下假设:

  • 两个表:Accounts 和 Asset Ledger

  • 三个事件流:存款、转帐和余额查询

  • 存款时将值写入 Accounts 和 Asset Ledger 表中

  • 转帐原子操做在 Accounts 和 Asset Ledger 之间转移值。

接下来,咱们将逐步使用 Streaming Ledger API 来完成这个示例。Ledger 的 API 是开源的,还包含了一个串行(单节点)的实现(https://github.com/dataArtisans/da-streamingledger )。

如下是经过 Flink DataStream 来建立事件源的方法:

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<DepositEvent> deposits = env.addSource(…); 
DataStream<TransferEvent> transfers = env.addSource(…);

And this is how you can define the scope and tables of your programme:

// 定义事务范围
TransactionalStreams txStreams =
TransactionalStreams.create(“simple example”);

// 定义事务表
TransactionalStreams.State<String, Long> accounts = txStreams.declareState(“accounts”)
        .withKeyType(String.class)
        .withValueType(Long.class);

TransactionalStreams.State<String, Long> asset ledger = 
txStreams.declareState(“AssetLedger”)
       .withKeyType(String.class)
       .withValueType(Long.class);

接下来,你能够为每一个流和表指定事务函数,以及选择将要访问到的数据行的键。

.apply(…) 函数自己包含事务的业务逻辑。

对于被访问的每一个数据行,为它们添加一个额外的调用:.on(table,key,name,type):

  • 'table'表示要访问的表

  • 'key'是一个函数,能够经过这个函数从输入事件中获取这一行数据的键

  • 'name'是该行数据的逻辑名称(稍后可能会用到)

  • 'type’限定对该行的访问属性,如只读、只写或读写访问。这是一种优化,其中 READ_WRITE 是最通用的选项。

 

// 定义存款事务
txStreams.usingStream(deposits, “deposits”)
  .apply(new DepositsFunction())
  .on(accounts, DepositEvent::getAccountId, “account”, READ_WRITE)
  .on(assetledger,DepositEvent::getassetledgerEntryId, “asset”, READ_WRITE);

// 定义转帐

// 将句柄保存在结果流中,以备后用
OutputTag<TransferResult> result = txStreams.usingStream(transfers, “transfers”)
  .apply(new TransferFunction())
  .on(accounts, TransferEvent::getSourceAccountId, “source-account”, READ_WRITE)
  .on(accounts, TransferEvent::getTargetAccountId, “target-account”, READ_WRITE)
  .on(assetledger, TransferEvent::getSourceAssetledgerEntryId, “source-asset”, READ_WRITE)
  .on(Assetledger, TransferEvent::getTargetAssetledgerEntryId, “target-asset”, READ_WRITE)
  .output();

 

而后实现了包含业务逻辑的事务,决定是否以及如何更新数据行,以及生成哪些结果。

咱们传给这些事务函数一个状态访问对象,访问对象负责读取或跟新每一行数据。为了将状态访问与数据行和键关联起来,对这些函数进行了与前一步相同的注解。

为简单起见,咱们只给出'TransferFunction'的实现。

 

 

public class TransferFunction extends 
TransactionProcessFunction<TransferEvent, TransferResult> { 
@ProcessTransaction 
public void process(
   TransferEvent txn,
   Context<TransferResult> ctx,
   @State(“source-account”) StateAccess<Long> sourceAccount,
   @State(“target-account”) StateAccess<Long> targetAccount,
   @State(“source-asset”) StateAccess<Long> sourceAsset,
   @State(“target-asset”) StateAccess<Long> targetAsset) {

// 访问当前值的数据行
long sourceBalance = sourceAccount.read();
long sourceAssetValue = sourceAsset.read();
long targetBalance = targetAccount.read();
long targetAssetValue = targetAsset.read();

// 检查前置条件: 正余额和最小余额
if (sourceBalance > txn.getMinAccountBalance()
&& sourceBalance > txn.getAccountTransfer()
&& sourceAssetValue > txn.getAssetledgerEntryTransfer()) {

// 计算新的余额
long newSourceBalance = sourceBalance - 
txn.getAccountTransfer();
long newTargetBalance = targetBalance + 
txn.getAccountTransfer();
long newSourceAssets = sourceAssetValue - 
txn.getAssetledgerEntryTransfer();
long newTargetAssets = targetAssetValue + 
txn.getAssetledgerEntryTransfer();

// 写入更新过的值
sourceAccount.write(newSourceBalance);
targetAccount.write(newTargetBalance);
sourceAsset.write(newSourceAssets);
targetAsset.write(newTargetAssets);

// 触发包含新余额的正结果事件
ctx.emit(new TransferResult(txn, SUCCESS, 
newSourceBalance, newTargetBalance));
}
else {
// 触发包含未更新余额的负结果事件
ctx.emit(new TransferResult(txn, REJECT, 
sourceBalance, targetBalance));
   } 
 }
}

 

结    论

data Artisans Streaming Ledger 将以前依赖关系型数据库的应用程序带入到了流式处理时代,进一步扩展了流式处理技术的应用范围!借助 Streaming Ledger,咱们正在开启流式处理的新篇章,咱们很高兴如今愈来愈多的任务关键型应用程序能够充分利用流式处理的实时、异步、灵活等优点。

开源项目地址

https://github.com/dataArtisans/da-streamingledger

相关文章
相关标签/搜索