Akka 是 Scala 语言实现的一套基于 Actor 模型的异步通讯框架,可用于构建高并发、分布式、可容错、事件驱动的基于 JVM 的应用,在 Spark 中曾被用于实现进程、节点间通讯,在实际项目中协助咱们成功搭建了知足业务需求的模型部署平台。算法
项目背景json
某国内大型连锁餐饮企业旗下拥有大量门店。餐厅门店的每日生产、定货、排班都依赖于每日客单量预估的合理性,其内部数据团队实现了一套预估模型,须要 TalkingData 帮助构建一个工程化平台以支撑模型的训练和部署,从而将模型真正地应用到实际生产环节中。bash
通过交流,咱们发如今实际生产环境中,在各方面存在一些问题:网络
异步:全部门店的前日销售、业务等数据均由各自门店的店长负责整合上传。上传的开始时间、结束时间、数据的完整性等均不肯定。而模型训练和预测均依赖这部分数据,这就意味这没法为模型训练和预测设置统一的开始入口。
并发
高并发:除了一些特殊类型的门店,绝大多数门店的营业时间相对固定,从店长决定整理上传销售数据,到准备物料、排班准备第二天营业,留给模型训练和模型预测回吐预测结果的时间大概为 3 小时。若是每一个门店的预测指标有 2 至 3 项,那么须要有足够的调度能力在规定时间内完成大概 2 万次模型训练加预测流程。app
容错:因为门店数量众多且状况各不相同,仍然有不少潜在的因素可能致使流程出错或失败。原则上,某次流程的失败不该该对其余流程形成任何影响,每一个流程在平台层面应该成为互相独立的任务。
框架
所以,咱们须要一套轻量化的分布式服务框架,来实现知足上述需求的模型训练预测平台,并在必定程度上保证平台的可拓展性。结合此前团队内的技术积累,最终选择了 Akka 框架用于实现平台的内部通讯。
异步
选型过程tcp
消息驱动方式——流程异步化分布式
一次完整的预测任务包括:训练数据准备→模型训练→模型结果导出→预测数据准备→预测结果导出,其中数据准备步骤在时间上不肯定,模型相关步骤在执行结果上不肯定,若是采用同步模型,将会产生大量的等待线程,占用浪费大量资源。在 Actor 模型中,每一个 Actor 做为一个基本计算单元,回应接收到的消息,同时并行的:
发送有限数量的消息给其余 Actor
建立有限数量的新 Actor
指定接受到下一个消息时的行为
上述操做没有顺序执行的假设,所以能够并行进行。发送者与已经发送的消息解耦,能够进行无需等待的异步通讯。
Actor 模型通讯方式
Akka 中的 Actor 本质上就是接收消息并采起行动处理消息的对象,是封装状态和行为的对象,它们惟一的通讯方式是交换消息——把消息存放在接收方的邮箱里。Actor 天然造成树形结构,这种结构的精髓在于任务被拆开、委托,直到任务小到能够被完整地处理。所以,咱们将预测任务的各个步骤拆分抽象,并建立类型消息与步骤对应,将每一个步骤交给线程级别的 Actor 执行处理,经过发送不一样类型的消息来触发建立不一样操做的 Actor,让整个预测流程无需等待。
结构——应对高并发
因为绝大多数门店的营业时间大体相同,平台在流量上会有明显的峰值和低谷,在低谷期间平台须要尽量减小资源占有量,而在流量峰值来临时平台要可以及时响应,保证足够的可用性。
通过讨论,咱们肯定了采用 Master-Worker 模式的平台结构,Master 负责接收与分配任务,Worker 负责处理执行具体的模型任务。
Master 和 Worker 均为独立的 ActorSystem,管理内部不不同操做逻辑的 Actor,在空闲状态下占有资源很小。Actor 为线程级别,一样仅占用极少许资源,生命周期由 ActorSystem 统一管理。少许请求时,Actor 线程具备很高的复用率,请求并发高时,ActorSystem 会建立大量的 Actor 线程用来承接请求,保证可用性。
Akka 中 Actor 的生命周期
子 Actor——模块化提升容错
每一个预测任务的模型相关步骤均存在失败的可能性,此外,数据准备过程当中的网络波动、内容校验出错等状况,都会致使当前预测任务的失败。对于失败的任务,咱们但愿可以尽量记录错误信息,为重跑提供先决条件。
在 Akka 中,构建了父子 Actor 的树形监督结构,提供 Actor 的监督机制以保证容错性,把处理响应错误的责任交给出错对象之外的实体。父 Actor 建立子 Actor 来委托处理子任务,同时便会自动地监管它们。子 Actor 列表维护在父 Actor 的上下文中,父 Actor 能够访问它。
Akka 中的 Actor 结构
经过更进一步的拆分细化,咱们将 Worker 端的 Actor 分为 Prepare 和 Executor 两种,Prepare 为主要负责数据准备步骤,Executor 负责模型相关步骤,统一由 Worker 端的父 Actor 管理,错误和异常均向上层抛出,由 Worker 端的父 Actor 记录并发送给的错误收集模块统一处理。
实践应用
ActorSystem
建立 ActorSystem 时,默认将在 classpath 中寻找 application.conf、 application.json 和 application.properties,并自动加载:
val system=ActorSystem("RsModelActorSystem")
val system=ActorSystem("RsModelActorSystem", ConfigFactory.load()) // 同上复制代码
若是想要使用本身的配置文件,能够经过 ConfigFactory 来配置加载:
val system = ActorSystem("UniversityMessageSystem",
ConfigFactory.load("own-application.conf"))
val config = ConfigFactory.parseString(
s""" |akka.remote.netty.tcp.hostname = $host |akka.actor.provider = akka.remote.RemoteActorRefProvider |akka.remote.enabled-transport = akka.remote.netty.tcp |akka.remote.netty.tcp.port = 2445 """.stripMargin)
val system = ActorSystem("RsModelActorSystem",
config.withFallback(ConfigFactory.load())) // 同上
复制代码
ActorSystem 的配置参数中有大量参数能够自定义,须要根据实际须要修改,例如在该项目中,后期单个算法任务对象大小超过了 Akka remote 默认包大小 128000 bytes,须要修改参数 akka.remote.netty.tcp.maximum-frame-size
Actor
一个 Actor 包含了状态、行为、一个邮箱、子 Actor 和一个监管策略,全部这些封装在一个 Actor 引用里。Actor 对象一般包含一些变量来反映其所处的可能状态,Akka-actor 自身的轻量线程与系统的其余部分彻底隔离,所以无须担忧并发问题。每当一个消息被处理,它会与 Actor 的当前行为进行匹配。行为是一个函数,它定义了在某个时间点处理当前消息所要采起的动做,须要结合实际需求编写具体逻辑。Actor 的邮箱是链接发送者与接收者的纽带,每一个 Actor 有且仅有一个邮箱,全部的发来的消息都在邮箱里排队。能够有不一样策略的邮箱实现供选择,缺省时为 FIFO。
编写逻辑
在 Actor 类中,主要逻辑均在 receive 方法中实现,经过偏函数方法,执行并返回对应的逻辑:
//ActorLogging 提供 Actor 内部的日志输出
class RsActor extends Actor with ActorLogging {
override def receive: Receive = {
case MapMessage(parameters) =>
println(parameters.get("code"))
case MapKeyMessage(parameters, key) =>
println(parameters.get(key))
case StringMessage(msg) =>
println(msg.getBytes().length)
case o: Object =>
println(o.getClass)
case _: AnyRef =>
println("233")
}
}复制代码
生成引用
生成一个能够接收消息的 Actor 实例主要有两个方法:
// 生成一个基于本地类的 Actor 实例
val rsActor = system.actorOf(Props[RsActor], "rsActor")
// 生成一个基于远程地址的 Actor 实例
val rmActor =
system.actorSelection("akka.tcp://RsModelAkkaSystem@192.168.1.9:2445/user/rsActor")
// 使用! 向对应的 Actor 实例发送消息
rsActor ! StringMessage("test")
rmActor ! MapMessage(Map("code"->"233"))复制代码
Message
Akka 中对传递的消息内容并无太严格要求,能够是基本数据类型,也能够是支持序列化的对象:
//scala 的 case class 便于简洁地建立消息类
case class StringMessage(msg: String) extends Serializable
case class MapMessage(parameters: Map[String, String]) extends Serializable
case class MapKeyMessage(parameters: Map[String, String], key: String) extends Serializable复制代码
其余
Akka 做为一款被普遍使用的开源工具,在实际项目中体现出了不少的优点,异步的消息驱动方式也给咱们提供了一套新的思路和实现方法。
做者介绍:李天烨,TalkingData 数据科学家。毕业于东北大学,任职于 TalkingData 数据科学团队,从事数据科学自动化相关工做。
本文转自:InfoQ