akka版本2.5.8
版权声明:本文为博主原创文章,未经博主容许不得转载。程序员
在以前的话题中,咱们解释了如何在高层次来看待actor系统,即要如何去表示组件,如何安排actor的层次结构。在本节中,咱们会看到如何实现其中的设备actor。数据库
若是咱们使用对象,咱们会将API设计为接口,并拥有一组会被实现类实现的抽象的方法。可是在actor的世界里,协议(protocols)取代了接口。虽然咱们不能在编程语言内形式化通用协议,可是咱们能够编写它们最基本的元素——消息。所以,咱们会从定义咱们但愿发给设备的消息开始咱们的程序。编程
设备actor的工做很简单:安全
一、收集温度测量信息
二、当被查询时,报告最后一次的测量值网络
然而,在设备启动时不会马上就得到温度测量信息,所以,咱们须要考虑温度测量信息不存在的状况。这也容许咱们的actor在没有写模块的时候来测试读模块,由于设备能够简单地报告一个空结果。架构
从设备获取但前温度的协议很简单,actor须要:框架
一、等待取当前温度的请求
二、回应这个请求:编程语言①拥有当前的温度数据
②标识当前温度数据还不可用分布式
咱们须要两个消息,一个用来请求,一个用来回复。咱们的第一次尝试可能以下所示:ide
final case object ReadTemperature
final case class RespondTemperature(value: Option[Double])
这两条消息貌似涵盖了全部咱们所须要的功能,然而,咱们选择方法的时候必需要考虑应用程序的分布式特性。虽然actor在JVM本地通讯与远程通讯的基本机制相同,可是咱们须要牢记如下几点:
一、本地信息与远程信息的传输延迟有很大的不一样,有些因素,如网络带宽、信息大小都会产生做用。
二、可靠性必须被重视,由于在远程信息传递中会涉及到不少的步骤,这也会增大失败的概率。
三、本地消息仅仅是在JVM内部传递引用,所以不会对消息有不少的限制,可是远程传输可能会限制消息的大小。
另外,在JVM内部传递消息显然是可靠性很高的,可是当actor由于程序员的错误而在处理信息时失败了,那么系统的表现就会和远程网络请求中远程处理消息崩溃一致。尽管这是两个场景,服务一会就会被恢复(actor会被监管者重启,主机会被操做员或监控系统重启),可是个别的请求可能会在故障中丢失。所以,咱们要悲观一些,在丢失任何信息的状况下都要保证系统安全
进一步理解协议中的灵活性需求,将有助于咱们去考虑Akka消息顺序和消息传递保证。Akka为消息发送提供了如下行为:
一、最多只有一次传递,即不保证送达
二、信息是被每一个发送者接收者对来维护的
如下章节将讨论行为中的更多细节:
一、信息传递
二、信息排序
消息传递子系统提供的消息传递语义一般分为如下几类:
一、最多传递一次(At-most-once delivery),每一个消息被发送零或一次,这意味着信息可能会丢失,但永远不会被重复接收到
二、至少传递一次(At-least-once delivery),每一个消息均可能被潜在地发送不少次,直到有一次成功。这意味着信息可能会被重复接收,但永远不会丢失
三、准确地发送一次(Exactly-once delivery),每一个消息都被精准地发送给接收者一次,消息不会丢失也不会重复接收
Akka使用第一种行为,它是最节省资源的,而且性能最好。它拥有最小的实现开销,由于可使用发送即忘(fire-and-forget)策略,而不用在发送者内保存发送状态。第二点,也不须要对传输丢失进行计数。这些增长了发送结束后保持状态、发送完毕确认的开销。准确地发送一次信息的方式开销是最大的,因为其不好的性能表现,除了在发送端增长上述所说的开销外,还须要在接收端增长过滤重复消息的机制。
在actor系统中,咱们须要肯定一个消息被保证的含义,在哪一种状况下认为传输已经完成:
一、当消息被送出到网络上时?
二、当消息被接收者主机接收到时?
三、当消息被放到接收者actor的邮箱里时?
四、当消息接收者actor开始处理这个信息时?
五、当消息接受者actor处理完这个消息时
大多数框架和协议声称保证传输,实际上它们提供了相似于4和5的东西。虽然这听起来是合理的,可是实际上真的有用吗?要理解其中的含义,请考虑一个简单的问题:用户尝试下一个订单,而且咱们认为一旦它进入了订单数据库,就表明它已经被成功处理了。
若是咱们依赖于第五点,即消息被成功处理,那么actor须要尽快在处理完后报告成功状态,这个actor就有义务在订单被提交到它的API后进行校验、处理,而后放入订单数据库。不幸的是,当API被调用后,这些状况可能会发生:
一、主机崩溃
二、反序列化失败
三、校验失败
四、数据库不可访问
五、发生程序错误
这说明传输保证不能被认为是领域级别的保证。咱们只想让它在彻底处理完订单并将其持久化后报告成功状态。惟一能报告成功状态的实体是应用程序自己,由于只有它了解领域内保证传输须要有哪些需求。没有一个通用的系统能够搞清楚某个特定领域中什么状况才会被认为是成功。
在这个特定的例子中,咱们只想在成功写入数据库以后发出成功信号,数据库确认已经安全地将订单存储起来。因为这些缘由,Akka将保证程序的责任提高给了应用程序自己,即你必须本身去实现这些。这给了你彻底的控制权,让你能够保护你须要保护的内容。如今,让咱们考虑下Akka为咱们提供的消息排序,以便轻松推理应用程序逻辑。
在Akka里对于一个给定的发送接收actor对。直接从A到B的消息不会被无序接收。直接这个词强调这只适用于直接向接收者发动消息,而不包括中间有协调员的状况。
若是:
一、actor
A1
向A2
发送了信息M1
,M2
,M3
二、actorA3
向A2
发送了信息M4
,M5
,M6
这意味着对于Akka消息:
一、
M1
必须在M2
和M3
前被发送
二、M2
必须在M3
前被发送
三、M4
必须在M5
和M6
前被发送
四、M5
必须在M6
前被发送
五、A2
看到的A1
和A3
的信息多是交错出现的
六、当前咱们没有保证传输,全部消息都有可能会被丢弃,好比没有到达A2
这些保证达到了一个很好的平衡:从一个actor接收到有序的消息使咱们能够方便地构建易于推理的系统。另外一方面,容许不一样actor的消息交错接受给了咱们足够的自由度,让咱们能够实现高性能的actor系统。
有关传输保证的完整细节,弃权那个参考参考页面。
咱们的第一个查询协议是正确的,可是没有考虑分布式应用程序的执行。若是咱们想在actor中实现重传(由于请求超时),以便查询设备actor,或者咱们想在查询多个actor时关联请求和回复。所以,咱们在消息里添加了一个字段,以便请求者能够提供一个ID(咱们会在接下来的步骤里把代码添加到应用程序里):
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
正如咱们在Hello World实例里学习到的,每一个actor定义了其能接受到的消息种类。咱们的设备actor有义务使用相同的ID参数来回应请求,这将看起来以下所示:
import akka.actor.{ Actor, ActorLogging, Props }
object Device {
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
}
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
import Device._
var lastTemperatureReading: Option[Double] = None
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
override def receive: Receive = {
case ReadTemperature(id) ⇒
sender() ! RespondTemperature(id, lastTemperatureReading)
}
}
注意代码中的:
一、伴生对象定义了如何建立
Device
actor,期中props
方法的参数包含设备的ID和所属的组ID,这在以后将会用到。
二、伴生对象包含了咱们以前所述的消息的定义。
三、在Device
类里,lastTemperatureReading
的值初始化为None
,而且actor能够简单地将它返回。
基于上面的简单actor,咱们能够写一个简单的测试用例。在测试代码路径下的com.lightbend.akka.sample
包里添加DeviceSpec.scala
文件。(咱们使用ScalaTest,你也可使用其余测试框架)
你能够经过在sbt提示符下运行test
来运行测试。
"reply with empty reading if no temperature is known" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref)
val response = probe.expectMsgType[Device.RespondTemperature]
response.requestId should ===(42)
response.value should ===(None)
}
如今当actor接收到传感器的信息时,须要一种方式来改变其温度状态。
写入协议的目的是在接受到包含温度的信息时更新currentTemperature
字段。一样,咱们使用一个简单的消息来定义写入协议,就像这样:
final case class RecordTemperature(value: Double)
然而,这种方式没有考虑让发送者知道温度记录是否被处理,咱们已经看到Akka并不保证消息传输,而且把提供消息成功提示留给了应用程序来作。在咱们的场景下,咱们但愿在更新温度以后给发送者一个确认消息。例如:final case class TemperatureRecorded(requestId: Long)
。就像以前场景中温度的请求和回应同样,添加一个ID字段提供了极大的灵活性。
将读写协议放在一块儿,设备actor看起来就会像这样:
import akka.actor.{ Actor, ActorLogging, Props }
object Device {
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
final case class RecordTemperature(requestId: Long, value: Double)
final case class TemperatureRecorded(requestId: Long)
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
}
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
import Device._
var lastTemperatureReading: Option[Double] = None
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
override def receive: Receive = {
case RecordTemperature(id, value) ⇒
log.info("Recorded temperature reading {} with {}", value, id)
lastTemperatureReading = Some(value)
sender() ! TemperatureRecorded(id)
case ReadTemperature(id) ⇒
sender() ! RespondTemperature(id, lastTemperatureReading)
}
}
咱们如今还须要写一个新的测试用例,同时执行读/请求和写/记录:
"reply with latest temperature reading" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref)
val response1 = probe.expectMsgType[Device.RespondTemperature]
response1.requestId should ===(2)
response1.value should ===(Some(24.0))
deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 3))
deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref)
val response2 = probe.expectMsgType[Device.RespondTemperature]
response2.requestId should ===(4)
response2.value should ===(Some(55.0))
}
到目前为止,咱们已经开始设计咱们的总体架构,而且咱们编写了与领域直接对应的第一个actor。咱们以后须要建立一个用来维护设备组和设备actor的组件。