akka是一系列框架,包括akka-actor, akka-remote, akka-cluster, akka-stream等,分别具备高并发处理模型——actor模型,远程通讯,集群管理,流处理等功能。html
akka支持scala和java等JVM编程语言。java
akka actor是一个actor模型框架。actor模型是一种将行为定义到actor,actor间经过消息通讯,消息发送异步进行,消息处理(在actor内)同步有序进行的一种高并发、非阻塞式编程模型。node
Actor模型优:git
actor的层级结构;actor名字与路径、地址;actor的消息收件箱;发送消息的异步性;actor消息处理的有序性;actor按序挨个处理消息(而非并发);github
case class MsgA(data:Type) case class MsgB(x:X) class SomeActor extends Actor { def receive()={ case MsgA(d)=> case MsgB(x)=> } } val system=ActorSystem("sysname") val act:ActorRef=system.actorOf(p:Props[], "act-name") act ! MsgA("data") act ! MsgB(xxx)
.tell() :Fire-Forgetspring
.ask() : Send-And-Receive-Futureapache
阻塞程序等待消息返回结果:编程
import akka.pattern._ import scala.concurrent._ implicit val akkaAskTimeout:Timeout = Timeout(5 seconds) val awaitTimeout= 10 seconds val res=Await.result(actor ? MessageXxx, awaitTimeout) println(res)
akka programming general rules: (通常编码规则,该建议来自官网)数组
.props()
: Props[?] method in actor companion object to construct the actor.actorOf()
建立Actor,返回ActorRef。 .actorSelect()
查找actor,返ActorRef。网络
配置键akka.remote.netty.tcp.hostname
定义remoting模块通讯的网口,akka.remote.netty.tcp.port
定义通讯端口。通讯网口在未配置或配置为空串时(不能配置为null)默认监听局域网网口(不是回环网口127.0.0.1)。通讯端口在未配置默认监听2552端口,在配置为0时会监听一个随机端口。网口名与ActorSystem地址中的主机名严格对应,不能试图以回环地址链接本机上监听局域网网口的actor。好比,本机上运行的单节点集群,集群即只有自身,其集群种子节点配置为akka.cluster.seed-nodes=["akka.tcp://xx@127.0.0.1:2551"]
,则其remoting通讯网口配置akka.remote.netty.tcp.hostname
只能是127.0.0.1
,不能是局域网网口。
……
actor的建立和部署不单只是在本地,还有可能涉及远程部署(如集群)。涉及远程部署时牵涉到序列化问题。
若是能确信actor的建立只会涉及本地,则可经过配置关闭actor建立器的序列化行为akka.actor.serialize-creators=off
(默认关闭)。
akka cluster是……集群,做用……,特色……TODO
akka cluster由多个ActorSystem(节点)构成,actorsystem根据配置指定的种子节点组建集群。
论及集群时,所谓节点不必定指一台物理机,通常指ActorSystem实例,不一样的ActorSystem实例对应不一样的<主机+端口>。一台物理机可运行多个ActorSystem实例。论及集群成员时,这里的成员指的是节点,不是集群内的actor。
集群种子节点配置akka.cluster.seed-nodes
中的节点没必要都启动,但节点列表中第一个必须启动,不然其余节点(不管是否在seed-nodes中)不能加入集群。也就说,若是不启动列表第一个节点,启动多个其余节点,不能组成集群(也就不能产生/接收到集群事件),直到启动列表第一个节点才将已启动的多个节点组成集群。种子列表第一个节点可组建一个只有本身的集群。 任意节点都可加入集群,并不是必定得是种子节点。节点加入的只能是集群(不能是另外一个节点,即联系的其余节点必须处于集群中),也就说有个节点得先组建一个仅包含本身的集群,以使得其余节点有集群可加入。 集群内节点ActorSystem的名字要求一致,由组建过程可知,即种子列表第一个节点的名字。
若是集群含超过2个节点,那么列表第一个节点能够不存活。若是种子节点全都同时不存活,那么以相同配置再次启动节点将组建不一样于之前的新集群,即不能进入之前的集群。
经过接口Cluster.joinSeedNodes(List[Address])
可动态添加种子节点,节点可经过接口cluster.join(Address)
加入集群。
配置项akka.cluster.seed-node-timeout
指启动中的节点试图联系集群(种子节点)的超时时间,若是超时,将在akka.cluster.retry-unsuccessful-join-after
指定的时间后再次重试联系,默认无限次重试联系直到联系上,经过配置akka.cluster.shutdown-after-unsuccessful-join-seed-nodes
指定一个超时时间使得本节点在联系不上集群种子节点超过该时间后再也不继续联系,终止联系后执行扩展程序CoordinatedShutdown
以停掉本节点actor相关行为(即关停ActorSystem),若是设置akka.coordinated-shutdown.terminate-actor-system = on
(默认开启)将致使扩展程序关停ActorSystem后退出JVM。
配置项akka.cluster.min-nr-of-members
指定集群要求的最小成员个数。
集群成员状态:
集群成员在加入集群、存在于集群、到退出集群整个生命周期中的变化有对应状态,状态包括joining, up, leaving, exiting, remoed, down, unreachable。。TODO
集群成员状态转移图:
图片来源:akka官网
图中的框表示成员状态,边表示驱动状态转换的相关操做。"(leader action)"表示该操做由集群首领驱使完成。“(fd*)”表示该操做由系统失败检测器(failure detector)驱使完成,失败检测器是一个监测集群成员通讯状态的后台程序。
网络中的节点随时可能没法通讯(通讯不可达),针对这个问题,集群系统设有失败检测器(Failure Detector),发现成员异常不可达时将广播UnreachableMember事件。须要在程序中显式调用接口Cluster.down(Address)
来改变成员状态为Down,集群以后会广播MemberDowned事件。节点正常退出时不会转入unreachable状态,而是进入leaving(事件MemberLeft)。
自动down:经过配置可以使成员自动从unreachable转入down,经过配置akka.cluster.auto-down-unreachable-after
开启并指定成员自动从状态unreachable转入down的时长,官方提供此功能仅为测试,并告知不要在生产环境使用。
集群事件:
集群可订阅的事件(cluster.subscribe(,Class[_]*)方法)要求事件类型实参是ClusterDomainEvent
类型的(ClusterDomainEvent或其子类),ClusterDomainEvent是一个标记型接口。事件相关的类型定义在akka.cluster.ClusterEvent中,ClusterEvent只是object,没有伴生对象。
伴随集群成员生命周期的事件:
MemberJoined
- 新节点加入,其状态被改成了 Joining。MemberUp
- 新节点成功加入,成为了集群成员,其状态被改成了 Up。MemberExited
- 集群成员正要离开集群,其状态被改成了 Exiting。注意当其余节点收到此事时,事件主体节点可能已经关停。MemberDowned
- 集群成员状态为down。MemberRemoved
- 节点已从集群移除。UnreachableMember
- 成员被认为不可达(失败检测器询问完其余全部成员均认为该节点不可达)。ReachableMember
- 以前的不可达成员从新变得可达。事件的发生要求全部以前认为该节点不可达的成员如今均可达该节点。WeaklyUp
事件:当部分节点不可达时gossip不收敛,没有集群首领(leader),首领行为没法实施,但此时咱们仍但愿新节点可加入集群,这时须要WeaklyUp状态特性。特性使得,若是集群不能达到gossip收敛,Joining成员将被提高为WeaklyUp,成为集群的一部分,当gossip收敛,WeaklyUp成员转为Up,可经过配置akka.cluster.allow-weakly-up-members = off
关闭这种特性。
CurrentClusterState
:新节点加入集群后,在其收到任何集群成员事件前,集群会向其推送一个CurrentClusterState消息(此class并不是ClusterDomainEvent子类,即不能成为cluster.subscribe()中事件类型的参数),表示新成员加入时(订阅集群事件时)集群中的成员状态快照信息,特别地,MemberUp事件伴随的集群状态快照可能没有任何集群成员,避免此状况下CurrentClusterState事件被发送的方法是在cluster.registerOnMemberUp(...)参数中提交集群事件订阅行为。接收CurrentClusterState消息是基于订阅时参数cluster.subscribe(,initialStateMode=InitialStateAsSnapshot,)的条件下,CurrentClusterState将成为actor接收到的第一条集群消息,可不接收快照,而是将现有成员的存在视为相关事件,这时应使用订阅参数(initialStateMode=InitialStateAsEvents)。
val cluster = Cluster(context.system) cluster.registerOnMemberUp{ cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember]) }
Terminated
消息是被监控(.watch()
)节点在被下线(down)或移除(remove)后发送的消息,类继承结构上不是集群事件(ClusterDomainEvent
)。
actor需订阅事件才能接收到(cluster.subscribe(self,,Class[EvenetType]*)
),接收到的消息包括订阅时提供的事件类型的子类。
<font color='silver'> actor接收到的事件有时是基于本身看到的集群状态的,不是全部actor收到的事件都相同,尤为注意,当节点本身停机时,会收到集群内全部节点(包括本身以及其余没停机节点)的MemberRemoved事件,也就至关于在actor本身看来这个集群停机了。而其余存活的节点只会收到停机节点的MemberRemoved事件。有时本身退群又只收到本身的MemeberLeft事件,并且没有MemberRemoved事件,没有其余成员的离群事件。(意外出现过,需重现以及分析缘由) <p/> </font>
节点获取自身地址的方式:
val cluster=Cluster(actorSystem) cluster.selfAddress // 节点自身地址,信息包括协议+system名+主机+端口 cluster.selfUniqueAdress //节点自身惟一性地址,信息包括自身地址+自身UID
节点角色 node roles:
集群节点可标记若干自定义角色类型,经过配置项akka.cluster.roles
指定。
每种角色群有一个首领节点,以执行角色相关操做,通常无需感知。
配置项akka.cluster.role.<role-name>.min-nr-of-members
定义集群中角色为<role-name>的节点的最小个数。
集群内单例 Cluster Singleton:保证某种Actor类在集群内或集群内某种角色群中只有一个实例。活跃的单例是集群的成员,是成员便可被移出集群,被移出的单例其类型在被管理器建立补充以前活跃单例前存在一小段实例缺失时间,这期间单例的集群成员不可达状态会被失败检测器检测到。
单例模式须要注意的问题:
单例工具依赖库“com.typesafe.akka:akka-cluster-tools”。 建立单例依赖类ClusterSingletonManager
,是一个actor,需在集群内全部节点上(尽早)启动,访问单例依赖类ClusterSingletonProxy
。
给单例actor发送消息时,查找的ActorRef应是单例代理,而不是单例管理器。
集群节点分身 Cluster Sharding:分发actor到多个节点,逻辑标识符是同一个,不用关心其实际位置。
分布式订阅发布 Distributed Publish Subscribe:仅经过逻辑路径实现集群内actor间的订阅发布通讯、点对点通讯,没必要关心物理位置。
集群客户端 Cluster Client:与集群通讯的外部系统称为集群的客户端。
借助akka工具包com.typesafe.akka:akka-cluster-tools,可实现集群客户端与集群间的通讯。在集群中的节点上调用ClusterClientReceptionist(actorSystem).registerService(actorRef)
将actorRef实例 注册为负责链接外部系统通讯请求的接待员,在外部系统中利用 val c = system.actorOf(ClusterClient.props(ClusterClientSettings(system).withInitialContacts(<集群接待员地址列表,可配到配置文件>)))
得到能与集群接待员通讯的ActorRef,再经过c ! ClusterClient.Send("/user/some-actor-path", msg)
发送消息到集群。
集群客户端配置集群接待员经过指定akka.cluster.client.initial-contacts
数组完成,一个接待员地址包含集群成员地址(集群中任意一个可通讯成员)和接待员在集群成员ActorSystem中的路径(系统生成的接待员的服务地址在/system/receptionist),如["akka.tcp://my-cluster@127.0.0.1:2552/system/receptionist"]
。
集群节点协议不能是akka,需是akka.tcp,配置项akka.remote下需配置属性netty.tcp,而不是artery(对应akka协议),不然集群客户端链接时会因Connection reset by peer而失败(固然,集群节点akka.actor.provider显然仍是cluster)。
集群节点路由 Cluster Aware Routers:容许路由器对集群内的节点进行路由,自动管理路由对象表(routees),涉及相关成员的进群和退群行为。
两种路对象(routee)管理策略的路由类型,Group和Pool:
Goup 群组 共用集群成员节点做为路由对象(routee)。
router转发策略有多种,对于消息一致性转发的router来讲转的消息必须可一致性散列(ConsistentHashable),或者用ConsitentHashEnvolope包装消息使其变得可一致散列,不然routee收不到消息,而定义routee的接收方法receive
时,获得的数据对象是拆包了的,也就说若是路由器转发的消息通过一致性散列信封包装,routee获得的消息已被自动去掉信封。
Pool 池 每一个router各自自动建立、管理本身的的routees。 池中的routee actor消亡不会引起路由器自动建立一个来替补,路由器将在全部routee actor消亡会随即消亡,动态路由器,如使用了数量调整策略的路由器,会改变这种行为,动态调整routee。
akka.actor.deployment.<router-path>.cluster.max-nr-of-instances-per-node
定义每一个节点上routee的个数上限(默认1个),因为所有routee在节点启动时即被启动(而不是按需延迟启动),所以该上限同时定义了节点上的routee个数。 router在集群中能够有多个,同一逻辑路径下也容许有多个router(即非单例router,就像actor)。单节点routee个数配置max-nr-of-instances-per-node
是对每一个router而言,也就说,若是集群router个数是m
,其类型、配置相同,单节点routee个数为n
,集群节点数为c
,集群中该类型router管理的routee类型实例数为m*n*c
。
集群管理 Management:经过HTTP、JMX查看管理集群。
<br><br><br>
集群相关例子(包含集群、集群事件、集群客户端、集群节点路由、集群内单例、角色):
/*集群有一个单例Master,有路由功能,管理多个WorkerActor,负责接收集群外部消息以及转发给worker。集群外部系统(集群客户端)给集群内master发送消息。 */ //对应.conf配置文件内容在代码后面 //↓↓↓↓↓↓↓↓↓↓集群节点程序代码↓↓↓↓↓↓↓↓↓(对应配置文件node.conf) //程序入口、建立actor system、建立&启动节点worker、建立 object ClusterNodeMain { def main(args: Array[String]): Unit = { val conf=ConfigFactory.load("node") val system=ActorSystem("mycluster", conf) val log = Logging.getLogger(system, this) val manager = system.actorOf( ClusterSingletonManager.props(Props[Master], PoisonPill, ClusterSingletonManagerSettings(system).withRole("compute")), "masterManager") //这里的名字要和配置中部署路径中的保持一致 log.info(s"created singleton manager, at path: {}", manager.path) val proxy = system.actorOf(ClusterSingletonProxy.props(singletonManagerPath = "/user/masterManager", settings = ClusterSingletonProxySettings(system).withRole("compute")), name = "masterProxy") //单例代理类,准备让单例接收的消息都应发到代理 ClusterClientReceptionist(system).registerService(proxy) log.info("created singleton proxy, at path: {}", proxy.path) } } class Master extends Actor with ActorLogging{ //下述FromConf.xxx从配置文件中读取部署配置,本Master actor咱们以单例模式建立,其路径为/user/masterManager/singleton,所以部署的配置键为"/masterManager/singleton/workerRouter"(配置键需省掉/user) private val router = context.actorOf(FromConfig.props(Props[WorkerActor]), name = "workerRouter") override def receive: Receive = { case x => log.info(s"got message on master, msg: $x, actor me: ${self.path}") router forward ConsistentHashableEnvelope(x, x) //转发给路由器,路由器将选择一个worker让其接收消息 } } class WorkerActor extends Actor with ActorLogging { private val cluster = Cluster(context.system) override def preStart(): Unit = { // actor成员需向集群订阅事件才能接收到集群事件消息 cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[ReachabilityEvent]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive:Receive = { case MemberWeaklyUp(member)=> log.info(s"Member is WeaklyUp: $member, actor me: $self") case MemberUp(member) => //集群成员状态刚设为了Up log.info(s"Member is Up: $member, actor me: $self") case MemberJoined(member)=> //集群成员状态刚设为了Joining log.info(s"Member is Joined: $member, actor me: $self") case MemberLeft(member)=> // 状态刚设为了leaving log.info(s"Member is Left: $member, actor me: $self") case MemberExited(member)=> // 成员本身正常退出 log.info(s"Member is Exited: $member, actor me: $self") // 状态成了Down,down状态通常由unreachable状态以后转移过来, // 由编程者本身显式设置cluster.down(member) // (auto-down特性可自动转移unreachable成员到down,官方不建议在生成环境中启用) case MemberDowned(member)=> log.info(s"Member is Downed: $member, actor me: $self") case MemberRemoved(member, previousStatus) => //成员被移出集群 log.info(s"Member is Removed: $member after $previousStatus, actor me: $self") case UnreachableMember(member) => //failure-detector检测器发现了一个通讯不可达的成员 log.info(s"Member detected as unreachable: $member, actor me: $self") cluster.down(member.address) // context.actorSelection(RootActorPath(member.address) / "user" / "otherActor") ! SomeMessage //共7种MemberEvent case ReachableMember(member) => //不可达成员从新变得可达 log.info(s"Member is reachable again: $member, actor me: $self") case n:Int => log.info(s"got an int: $n, actor me: ${self.path}") case s:String => log.info(s"sender said: $s, actor me: ${self.path}") case any=> log.info(s"what is it? :$any, actor me: ${self.path}") } } //↑↑↑↑↑↑↑↑↑↑↑集群节点程序代码↑↑↑↑↑↑↑↑↑↑在将部署的节点机器上运行(注意修改对应hostname&port配置) //↓↓↓↓↓↓↓↓↓↓集群客户端点程序代码(另外一个项目)↓↓↓↓↓↓↓↓↓↓(对应配置文件client.conf) object ClusterClientMain { def main(args: Array[String]): Unit = { val conf = ConfigFactory.load("client") val system = ActorSystem("myclient", conf) //actor system名字随意,和集群名无关 val clusterClient = system.actorOf(ClusterClient.props(ClusterClientSettings(system)), "clusterClient") // 向集群某个路径下的actor发送消息,对应路径的actor需在集群端提早向集群接待员注册好服务 clusterClient ! ClusterClient.Send("/user/masterProxy", 100, localAffinity = true) // 注意,不是简单的: clusterClient ! 100 clusterClient ! ClusterClient.Send("/user/masterProxy", 1, localAffinity = true) clusterClient ! ClusterClient.Send("/user/masterProxy", "Hi", localAffinity = true) println("main done") } }
节点程序和客户端程序使用的配置文件分别以下:
//↓↓↓↓↓↓↓↓↓↓ node.conf akka { actor { provider = "cluster" //有3种provider:local, remote, cluster,集群成员actor用cluster } remote { log-remote-lifecycle-events = off netty.tcp { //使用artery便可组建集群系统(对应协议akka://),但只有使用netty.tcp(对应协议akka.tcp://)才能和集群外部通讯 hostname = "127.0.0.1" //节点主机 port = 2551 //节点actor system的端口。主机和端口根据部部署的机器及想要对外开放的端口而变 } } cluster { seed-nodes = [ //种子节点必须启动第一个节点,其余种子节点不要求必定启动,加入集群的节点不限于必须在种子列表中(但必须设置正确的种子节点以便能链接进集群) //节点部署彻底没必要在同个主机 "akka.tcp://mycluster@127.0.0.1:2551", // "akka.tcp://mycluster@127.0.0.1:2552" ] roles=["compute"] //为集群中这一类actor打上一种自定义标签 // 官方不建议生产环境中启用auto-down特性。 //auto downing is NOT safe for production deployments. // you may want to use it during development, read more about it in the docs. // auto-down-unreachable-after = 10s } } akka.extensions=[ "akka.cluster.client.ClusterClientReceptionist" //集群客户端接待员扩展 ] akka.actor.deployment { //想要部署的actor的路径做为配置键 "/masterManager/singleton/workerRouter" { //路由器actor路径做为配置键,配置路由器相关属性 router = consistent-hashing-pool cluster { enabled = on max-nr-of-instances-per-node= 2 allow-local-routees = on use-role = "compute" } } } //↓↓↓↓↓↓↓↓↓↓ client.conf akka { actor { provider = remote // 咱们的客户端是独自成普通actor,没有建新集群,设为remote以提供对外远程通讯 } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { // hostname = "" // 集群客户端部署的主机地址(默认本机局域网地址) // port = 0 //集群客户端actor system的端口,能够不配置(akka系统自动分配端口) } } } akka.cluster.client { initial-contacts = [ //接待员地址。 集群成员中建立了集群客户端接待员的任意节点,地址中actor system名字是集群名,后面的/system/receptionist是固定的(系统自动建立的接待员) "akka.tcp://mycluster@127.0.0.1:2551/system/receptionist", #不要求非得是第一个成员或接待员注册的ActorRef服务所在的节点 // "akka.tcp://mycluster@127.0.0.1:2552/system/receptionist" ] }
An extension is a singleton instance created per actor system.
被spring容器管理的Actor类上必须标注@Scope("prototype")
。(@Scope("prototype")表示spring容器中每次须要时生成一个实例,是否影响集群内单例? <== 不影响)
cluster中涉及actor远程部署,可能会因SpringApplicationContext
不能序列化而失败,可将其设为静态变量,在spring启动后手动初始化,以提供spring管理器。
示例代码:TODO
运行时程序异常终止,提示akka.version
属性没有配置问题
异发生在初始化ActorSystem
过程当中,初始化时会经过加载reference.conf配置文件读取该属性,在运行时没能读取到该属性致使异常。akka-actor jar中有reference.conf、version.conf文件,reference.conf在文件首经过“incluce version"语法导入version.conf,version.conf中定义了akka.version属性,ActorSystem本可读到,但因为akka系列其余jar包中也有reference.conf,内容不一样akka-actor中的,也没有导入version.conf,而打包时akka-actor的reference.conf(有可能)被其余jar包中的reference.conf覆盖,从而致使reference.conf中没有akka.version属性,进而致使程序异终止。
解决方案:定义打包时资源文件处理行为,不覆盖reference.conf,合并全部jar包中reference.conf文件内容到一个文件。以下定义pom.xml中shade插件的行为:
<plugin> <artifactId>maven-shade-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <phase>package</phase> <goals><goal>shade</goal></goals> <configuration> <transformers> <!--合并资源文件,而不是默认的覆盖--> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer>
ERROR akka.remote.EndpointWriter : Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer]. Transient association error (association remains live) akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.remote.DaemonMsgCreate] using serializer [class akka.remote.serialization.DaemonMsgCreateSerializer]. at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:62) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19] at akka.remote.EndpointWriter.$anonfun$serializeMessage$1(Endpoint.scala:906) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19] at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) ~[scala-library-2.12.8.jar!/:na] at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:906) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19] at akka.remote.EndpointWriter.writeSend(Endpoint.scala:793) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19] at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:768) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19] at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:458) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.actor.ActorCell.invoke(ActorCell.scala:557) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] Caused by: java.io.NotSerializableException: No configured serialization-bindings for class [org.springframework.context.annotation.AnnotationConfigApplicationContext] at akka.serialization.Serialization.serializerFor(Serialization.scala:320) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.serialization.Serialization.findSerializerFor(Serialization.scala:295) ~[akka-actor_2.12-2.5.19.jar!/:2.5.19] at akka.remote.serialization.DaemonMsgCreateSerializer.serialize(DaemonMsgCreateSerializer.scala:184) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19] at akka.remote.serialization.DaemonMsgCreateSerializer.$anonfun$toBinary$1(DaemonMsgCreateSerializer.scala:76) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19] at scala.collection.immutable.List.foreach(List.scala:392) ~[scala-library-2.12.8.jar!/:na] at akka.remote.serialization.DaemonMsgCreateSerializer.propsProto$1(DaemonMsgCreateSerializer.scala:75) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19] at akka.remote.serialization.DaemonMsgCreateSerializer.toBinary(DaemonMsgCreateSerializer.scala:86) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19] at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:52) ~[akka-remote_2.12-2.5.19.jar!/:2.5.19]
集成spring方式参考https://www.baeldung.com/akka-with-spring,序列化ERROR日志问题参考https://github.com/akka/akka/issues/15938,其中“patriknw”提出设置akka.actor.serialize-creators=off并设置相关.props(...).withDeploy(Deploy.local),本例在类SpringExt中定义方法def props(beanClass: Class[_ <: Actor]): Props = Props.create(classOf[SpringActorProducer], beanClass)//.withDeploy(Deploy.local),也失败了,仍然报一样ERROR。
用静态变量保存ApplicationContext的方式可暂时解决该问题(由于压根儿不涉及到ApplicationContext的序列化了)。
<!--TODO为何涉及springapplicationcontext的序列化-->
<font color="silver"> (以上知识基于版本akka 2.5.19) </font>