[译] 流式处理:使用 Apache Kafka 的 Streams API 实现 Rabobank 的实时财务告警

原文:Real-time Financial Alerts at Rabobank with Apache Kafka’s Streams API
原做者:Jeroen van Disseldorphtml

本文讨论使用 Apache Kafka 的 Streams API 向 Rabobank 的客户发送告警。Rabobank(荷兰合做银行)总部位于荷兰,在全球拥有 900 多个分支机构,48,000 名员工和 681 亿欧元的资产。Rabobank 是一家由客户和银行组成的合做银行,一家对社会负责的银行。其目标是成为荷兰金融市场的领导者。Rabobank 还致力于成为全球食品和农业领域的领先银行。Rabobank 向全球数百万客户提供金融产品和服务。java


在过去的几年中,Rabobank 一直在为成为一家实时的,事件驱动的银行而进行积极的投资。若是你熟悉银行的业务流程,应该会明白这并不能一蹴而就。许多银行业务流程都是在非商用硬件上以批处理做业的形式进行的,所以迁移工做很是艰巨。但如前所述,Rabobank 接受了这一挑战
,并定义了一个业务事件总线(BEB,Business Event Bus),用于应用程序之间共享整个组织架构的业务事件。
Rabobank 选择 Apache Kafka 做为底层的主要引擎,并编写了本身的的 BEB 客户端库,以方便应用程序开发人员使用简单的消息生产/消费以及灾难恢复等功能。git

Rabobank 采用 Active-Active 的 Kafka 设置,Kafka 集群在多个数据中心进行对称镜像。当数据中心出现故障或由操做人员干预后,BEB 客户端(包括本文讨论的基于 Kafka Streams 的应用程序)会切换到另外一个 Kafka 集群,而无需从新启动。实如今灾难情景和计划维护时段内的 24×7 不间断运行。BEB 客户端库为生产者、消费者以及流式应用提供了这种切换机制。github

Rabo Alerts 是一个由一系列生产、消费、流式消息等微服务组成的系统,基于 BEB 实现。下面讨论的全部数据类型和代码均可以在 GitHub 中找到。本文将在必定程度上简化源码清单(如删除未使用的字段),但这些清单仍反映了生产中实际运行的代码。apache

案例:Rabo Alerts

Rabo Alerts 服务可让 Rabobank 的客户接收其关注的财务事件告警。例如某笔款项从帐户中扣除或者记入帐户,以及其它更复杂的事件。客户能够根据本身的偏好配置告警,并经过第三方渠道发送:如电子邮件、短信和移动推送通知。值得一提的是,Rabo Alerts 并非一项新的或试用服务,它已经投产十多年,可供数百万帐户持有者使用。api

面临的问题

旧的 Rabo Alerts 实现主要是在大型机系统上。全部的处理步骤都是面向批处理的,大型机会根据告警类型派生告警,并每隔几分钟发送,但天天只发送几回。这种实现很是稳定可靠,但 Rabobank 但愿解决两个问题:(1)灵活性不足;(2)告警发送速度慢。服务器

因为对现有告警进行更改或添加新(更智能)的告警须要很大的工做量,所以旧的 Rabo Alerts 对适应新业务需求的灵活性很低。在过去几年中,Rabobank 在其在线环境中引入新功能的步伐大幅增长,旧有僵化的告警解决方案变得愈来愈成问题。markdown

告警的传递速度也是一个问题,旧的 Rabo Alerts 可能须要 5 分钟到 4-5 小时才能向用户发送告警(取决于告警类型和批处理窗口)。若是在十年前,这个速度可能足够快了,但现在客户的指望值要高得多。如今 Rabobank 向客户提供“相关信息”的时间窗口要比过去十年小得多。架构

所以,如何从新设计现有的机制,使其具备更强的扩展性及更快的速度,即是摆在眼前的问题。固然,从新设计的 Rabo Aerts 也须要稳定可靠,以便可以正确地为现有数百万的用户群提供服务。并发

从小处着手

在过去的一年里,咱们一直使用 Kafka 及其 Streams API 从新设计和实现告警机制。因为整个 Rabo Alerts 服务至关庞大,咱们决定从四个简单但使用率高的告警开始:

  • 余额高于阈值
  • 余额低于阈值
  • 超过阈值的贷记(Credit)
  • 超过阈值的借记(Debit)

这些告警的每个均可以从当前帐户系统的支付信息流中派生出来。例如:“当个人余额低于 100 欧元时向我发送短信”或“当有人给我超过 1000 欧元时向我推送消息”(一般用于存款通知)。

如下截图说明如何经过手机银行 app 配置 Rabo Alerts ——

raboalerts-mobile-banking.png-157.5kB

告警拓扑

咱们的第一步是从新设计告警过程,基本流程以下:

  1. 挖掘来自支付工厂的交易流,产生一连串的 AccountEntry(帐户会计条目)。注意,每个支付交易老是由两个 AccountEntry 组成,即借记(Debit)和贷记(Credit)。
  2. 对每一个 AccountEntry 执行如下步骤:

    • a. 将具备帐户读取权限的帐号转换为一个客户列表。
    • b. 对每一个客户执行如下步骤:

      • i. 查看客户是否为给定帐号配置了 Rabo Alert。
      • ii. 若是是,检查此 AccountEntry 是否符合客户设置的告警条件。
      • iii. 如何符合,经过客户配置的渠道(电子邮件、短信、消息推送)发送告警。

步骤 1 须要与执行交易的核心银行系统创建连接。
步骤 2a 须要创建一个查询表,其中包含全部帐户的全部客户权限。
步骤 2b 须要创建一个查询表,其中包含全部客户的 Rabo Alert 设置。

该流程的使用及其需求见下图:

raboalerts-alerting-topology-flow.jpg-77.8kB

图中全部白色框都是 Kafka 主题(Topic),其中列出了它们的 Avro 键/值数据类型。大部分数据类型都是不言自明的,但如下数据类型值得一提:

  • CustomerAlertSettings:特定客户的告警设置,这些设置包括:

    • CustomerAlertAddresses:客户用于接收告警消息的渠道及地址列表。移动推送地址此处以 CustormerId 表示,由于注册移动设备的实际列表是在消息发送过程当中肯定的。
    • CustomerAccountAlertSettings:客户为特定帐户设定的告警配置列表。这个列表指定了客户但愿接收特定帐户的哪些告警及其阈值。
  • ChannelType:可用的渠道类型枚举,当前为 EMAIL、PUSH 和 SMS。
  • AccountEntry:一条支付帐户的会计记帐。一个记帐条目是一个支付交易的一半,能够是一个借记条目(Debit),也能够是一个贷记条目(Credit)。
  • OutboundMessage:发送给客户的消息内容。包含消息类型和参数,但不包含其寻址。这些信息由 Outbound topic 的 Key 承载。

蓝色框表示独立的应用程序(或称微服务),是使用 Spring Boot 实现的可执行 jar,并部署在托管平台上。它们一块儿组成了实现 Rabo Alerts 的全部必要功能:

  • Alert Settings Manager:告警配置管理器。向一个 compacted topic (开启了 Log Compaction 的 Kafka 主题)发布每一个客户的全部自定义告警设置。
  • Account Authorization Manager:帐户受权管理器。帐户并非和客户一对一绑定,而是能够由不一样的用户查看。例如,配偶之间共享帐户;或企业帐户针对不一样员工的不一样受权。这些状况下可能会产生任意的帐户/用户间的受权关系。该应用程序向一个 compacted topic 发布帐号和受权客户ID的关系。它是实时的,以便受权的变化在发送告警时能当即生效。
  • Account Entry Bridge:经过 IBM MQ 从 Rabobank 基于大型机的支付工厂中检索全部支付流,并转发到 Kafka 的 topic。
  • Alerting:核心告警服务,参见下文。
  • Device Resolver:设备解析器,辅助应用。从外部系统查找全部客户的移动设备,并将相同的告警消息写入各个设备对应的 topic 中(PushId)。客户移动设备的查找能够经过一个 compacted topic 完成,但因为各类不一样的缘由,此处是经过远程服务调用的方式实现的。
  • Senders:每个 Sender 消费其绑定的渠道 topic 的消息,并发送给寻址客户。每种渠道都被分配了各自的 Kafka topic,以使各类渠道的故障能彼此分离。例如,当电子邮件服务器关闭时,告警消息依然能够经过消息推送的方式发送出去。

废话少说,放码过来

使用 Kafka Streams 编码实现告警只须要 2 个类。

第一个类是 BalanceAlertsTopology。这个类使用给定的 KStreamBuilder 定义主要的 Kafka Streams 拓扑(Topology)。它实现了 BEB 的 TopologyFactory,是一个 BEB 客户端库使用的自定义接口,用于在应用程序启动后或 Kafka 集群切换(如数据中心切换/故障转移)时生成新的 Kafka Streams Topology。

KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages =
    builder.<AccountId, AccountEntry>stream(accountEntryStream)
        .leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> {
          if (isNull(customerIds)) {
            return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList();
          } else {
            return customerIds.getCustomerIds().stream()
                .map(customerId -> KeyValue.pair(customerId, accountEntry))
                .collect(toList());
          }
        })
        .flatMap((accountId, accountentryByCustomer) -> accountentryByCustomer)
        .through(customerIdToAccountEntryStream)
        .leftJoin(alertSettings, Pair::with)
        .flatMapValues(
            (Pair<AccountEntry, CustomerAlertSettings> accountEntryAndSettings) ->
                BalanceAlertsGenerator.generateAlerts(
                    accountEntryAndSettings.getValue0(),
                    accountEntryAndSettings.getValue1())
        );

// Send all Email messages from addressedMessages
addressedMessages
    .filter((e, kv) -> kv.key instanceof EmailAddress)
    .map((k, v) -> v)
    .to(emailMessageStream);

// Send all Sms messages from addressedMessages
addressedMessages
    .filter((e, kv) -> kv.key instanceof PhoneNumber)
    .map((k, v) -> v)
    .to(smsMessageStream);

// Send all Push messages from addressedMessages
// (CustomerId is later resolved to a list of customer's mobile devices)
addressedMessages
    .filter((e, kv) -> kv.key instanceof CustomerId)
    .map((k, v) -> v)
    .to(customerPushMessageStream);

该 Topology 定义了如下几个步骤:

  • 1-13 行,从消费 AccountEntry 流开始,当检索到一个 AccountEntry 时,会查找哪些客户有权访问该帐户,并将结果存储在一个中间 topic 中,以 CustomerId 为 Key,AccountEntry 为 Value。该 topic 的意思是“这个客户(Key)的这个 AccountEntry(Value)须要处理”。
  • 14-20 行,针对每一个客户执行。检查客户的告警设置,若是 AccountEntry 符合客户的告警设置,会要求辅助类生成 OutboundMessage
  • 22-39 行,遍历全部的 OutboundMessage,并将它们分配到各自的渠道 topic。

告警消息是在 17 行调用辅助类 BalanceAlertsGenerator 生成的。其主要方法是 generateAlerts(),该方法获取一个 AccountEntry,并从具备该帐户查看权限的客户中获取告警配置。如下是它的代码:

public static List<KeyValue<SpecificRecord, OutboundMessage>> generateAlerts(AccountEntry accountEntry,
                                                                             CustomerAlertSettings settings) {
      /* 使用告警设置为一个 AccountEntry 生成完成寻址的告警,步骤以下:
      *  1) 使用特定帐户的告警设置,过滤掉不属于该帐户的 AccountEntry
      *  2) 匹配告警设置中的每一项设置,生成适当的消息
      *  3) 为生成的消息添加寻址信息
      */

  if (settings == null) {
    return new ArrayList<>();
  }

  return settings.getAccountAlertSettings().stream()
      .filter(accountAlertSettings -> matchAccount(accountEntry, accountAlertSettings))
      .flatMap(accountAlertSettings -> accountAlertSettings.getSettings().stream())
      .flatMap(accountAlertSetting -> Stream.of(
          generateBalanceAbove(accountEntry, accountAlertSetting),
          generateBalanceBelow(accountEntry, accountAlertSetting),
          generateCreditedAbove(accountEntry, accountAlertSetting),
          generateDebitedAbove(accountEntry, accountAlertSetting))
      )
      .filter(Optional::isPresent).map(Optional::get)
      .flatMap(messageWithChannels -> mapAddresses(messageWithChannels.getValue0(), settings.getAddresses())
          .map(address -> KeyValue.pair(address, messageWithChannels.getValue1())))
      .collect(toList());
}

该方法执行如下步骤:

  • 13 行,流化全部帐户相关的告警设置(一个帐户一个对象)。
  • 14 行,将告警设置中的帐号和 AccountEntry 中的帐号进行匹配。
  • 15 行,流化告警设置中的各项设置。
  • 16-21 行,构造要发送的一系列消息,以及用于发送消息的渠道列表(这里对每种告警类型都使用了单独的方法),结果是一个 Pair<List, OutboundMessage> 流。
  • 22 行,过滤空结果。
  • 23-24 行,为指定渠道查找客户地址,并返回一个 KeyValue<address, OutboundMessage> 流。
  • 25 行,收集全部结果,并做为 List 返回。

这个类的其它辅助方法:

  • matchAccount():经过比较帐号和币种,来匹配 AccountEntry 和帐户告警设置。
  • generateBalanceAbove/Below():生成 BalanceAbove/Below 告警消息(余额高于/低于阈值)。
  • generateDebited/CreditedAbove():生成 Debited/CreditedAbove 告警消息(超出阈值的借记/贷记)。
  • mapAddresses():查找指定渠道列表的客户告警接收地址。
  • buildMessage():构建一个 OutboundMessage

再加上一些其它额外的类来将这个功能包装在一个独立的应用程序中,这就是它的所有功能!

第一次测试运行

在第一次的初步实现后,咱们进行了测试运行。事实令咱们惊讶,而且指望值高涨。从支付订单确认到移动设备收到告警的整个过程只须要一到两秒钟,且一秒的状况居多。这个过程还包括了支付工厂所花费的时间(验证支付订单,交易处理),所以响应时间可能会依当时的支付工厂工做量而有所不一样。整个告警链——从 AccountEntry 在 Kafka 上发布,到将消息推送给客户——一般在 120 毫秒内完成。在发送阶段,推送(PUSH)告警是最快的,仅需 100-200 毫秒便可到达客户的移动设备。电子邮件(EMAIL)和短信(SMS)稍慢,一般在发出消息后的 2-4 秒到达。相比之下,旧有的体系一般须要几分钟的时间才能提供告警。

下面的视频演示了使用个人我的测试帐户进行告警传输的速度。请注意,虽然是测试用的,但这也是一个正常运行的 Rabobank 支付帐户!

【只是一段演示视频,markdown 插入视频比较麻烦,原文看吧,或看下面的文字解说】

首先我在个人设备上启用了告警,并配置了阈值为 0 的 DebitedAboveThreshold 告警(“More withdrawn than”)。这意味着超过 0 欧元的任何支付都会向我发送告警。我设置了 PUSH 和 SMS 两种渠道告警(视频中未演示),所以告警会经过两个渠道发给我。保存设置并返回主屏幕后,我开始向个人同事 Joris Meijer 转帐 1 欧元,并经过指纹验证。以后付款订单被发送到支付工厂进行处理。在订单确认关闭以前,推送(PUSH)告警已经在屏幕顶部弹出,如通知窗口所示。几秒钟后,相同的告警消息也以 SMS 的方式到达。

回顾

新机制简洁而优雅,只须要少数 Java 类组成。这个逻辑大约四个星期写完,但要使整个拓扑工做须要大约六个月的时间。这主要是由于 Alert Settings ManagerAccount Authorizations Manager 以及 Account Entry Bridge 须要和银行的其它业务模块达成一致。

在团队内部的告警测试以后,须要更完全更大规模的测试。毕竟咱们但愿确保客户不会错过告警或接收到不应接收告警。咱们选用了 25,000 名 Rabobank 的员工做为试点小组,对这个新机制进行了为期两个月的试用。这样能够更好的观察系统在生产数据及高负载下的运行表现。另外,Rabobank 的员工比付费客户更能容忍告警失败(有时确实会失败)的状况。在试用期间,咱们优化了告警生成并消除了一些外围应用的边界错误。

经批准,新体系于 6 月 8 日上线为数百万 Rabobank 客户提供服务。这对咱们来讲是很是激动人心的时刻——不只由于它有效,并且由于咱们永远不可能回头。咱们经过延迟几秒而不是几分钟或几小时的告警,有效提高了客户的指望值。若是因为某种缘由致使某个组件服务失败,客户会当即注意到,由于告警会延迟。所以咱们密切关注这套体系,但到目前为止,它一直运行良好且可预测。

下一步

新体系提供了实时告警,且易于扩展,知足了 Rabobank 对于速度和灵活性的要求。但这里提到的四种告警类型并非所有。客户还能够配置其它约 10 种告警,例如“当我收到来自指定帐户的付款时提醒我”和“当付款单没法执行时提醒我”。下一步是将这些告警从大型机迁移到新体系,但这须要链接更多的支付系统,例如支付订单执行引擎。咱们将在将来的几个月为此努力,且不会止于此,新的实现也激发了大量新的想法,咱们将很快公开讨论(甚至展现)。

关于做者

Jeroen van Disseldorp 是位于荷兰的 Confluent 合做伙伴公司 Axual 的创始人。Axual 为企业应用提供基于 Apache Kafka 的实时数据解决方案。
Email:jeroen@axual.io
Twitter:@axual
LinkedIn:http://linkedin.com/in/dizzl

关于 Apache Kafka Streams API

若是你喜欢本文,可能会但愿继续使用如下资源了解有关 Apache Kafka Streams API 的更多信息:

译者按

本文讨论了一种架构思路的实现案例:数据的流式处理。这种思路比如将数据放到一条流水线中,通过一道道环节的加工处理,每一道环节均可能从数据中提取须要的信息,或向数据中写入一些特定的内容。而这条流水线中的每一个环节均可能造成一条子流水线,将该环节关心的数据从原始数据中提取出来,放到子流水线中加工处理。当数据从流水线的终点出来后,将会是一个包含更丰富内容的数据,或者其中的有价值内容已经被各个环节提取完毕。

关于译文

本译文经原做者受权后,首发于 K栈。转载请注明原做者以及原文译文出处。

clipboard.png

相关文章
相关标签/搜索