Actor 模型中的通讯模式

在 Actor 模型中全部的 Actor 之间有且只有一种通讯模式,那就是 tell 的方式,也就是 fire and forget 的方式。可是在实际的开发过程当中工程师们逐渐总结出了一些经常使用的通讯模式。本文以 akka-typed(2.6.0-M8) 框架为例,介绍存在于 actor 模型中最基本的一些通讯模式,讨论消息是如何在 actor 之间流动的。前端

Fire and Forget

咱们首先看到最基础的模式 fire and forget 即发送后遗忘。这种模式是直观地尝试向一个 actor 发送一条消息后就再也不关心此次通讯。此处的发送消息仅保证到 fire 的发送,即消息以发出,可是可能因为网络缘由被丢失。这种通讯模式使用的是所谓的 at most once 送达语义。java

在给出一个可运行的代码例子以前,咱们简单介绍一下 akka-typed 框架这个 actor 模型的特定实现。在 akka-typed 中,消息的传输是经过 ActorRef<T> 的引用来进行的,ActorRef 是为位置透明的 actor 的引用,T 表示了 actor 能接收的消息类型。这点跟典型的 actor 模型有所不一样。这是由于典型的 actor 模型是非肯定性的,实现为 akka untyped 那样的框架,即任意 actor 均可以接受任意类型的信息,而且所以可以经过 become/unbecome 来改变本身的行为(behavior)。在 akka-typed 中经过带有类型标签的引用,保证调用 actorRef.tell(message) 的时候不会意外的传递错误的信息,同时类型信息有助于推理代码,而不是在一堆任意引用任意类型的 ActorRef 和 Object 类型的消息中费力的分析。一个对应的缺点就是没法实现任意灵活度的 become/unbecome,若是须要实现相似状态机的功能,须要加入额外的域而且在 Behaviors 中针对状态分别讨论,但全部能接受的消息都一定是 T 的类型。数据库

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import java.util.concurrent.CountDownLatch;

public class FireAndForget {
    public static final CountDownLatch LATCH = new CountDownLatch(2);

    public static class PrintMe {
        public final String message;
        public PrintMe(String message) {
            this.message = message;
        }
    }

    public static final Behavior<PrintMe> printerBehavior =
        Behaviors.receive((ctx, printMe) -> {
           ctx.getLog().info(printMe.message);
           LATCH.countDown();
           return Behaviors.same();
        });

    public static void main(String[] args) throws Exception {
        ActorSystem<Void> system = ActorSystem.create(Behaviors.setup(ctx -> {
            ActorRef<PrintMe> printer = ctx.spawn(printerBehavior, "printer");
            printer.tell(new PrintMe("message 1"));
            printer.tell(new PrintMe("message 2"));
            return Behaviors.ignore();
        }), "fire-and-forget");

        LATCH.await();

        system.terminate();
    }
}

这里须要用 CountDownLatch 来同步保证消息送达后再关闭 ActorSystem,不然因为执行顺序没有保证,颇有可能在看到 log 打印以前因为 ActorSystem 的关闭 printer actor 被中止,从然后续被调度的消息传递失败。由此也能够看出 fire and forget 的方式仅保证在 tell 的调用点发出消息,至于消息能不能送到,会被怎么处理,发送方就无论了。编程

Request-Response

请求-响应模式是另外一种很是常见的通讯模式,即 actor-1 向 actor-2 发送消息,actor-2 收到消息后向 actor-1 返回相应的消息。注意这里咱们只提到了消息的发送,两个方向均是 fire and forget 的,actor-1 收到的 actor-2 的消息跟它收到的任何一条其余信息没有任何区别。除非 actor-2 发回的消息中携带有跟回应相关的信息,不然 actor-1 并不知道这条消息就是回应刚才发送的某一条信息的。此外,特化到 akka-typed 中,甚至因为消息中再也不自然包括 sender 信息,actor-1 并不知道这条消息是 actor-2 发过来的,必须在消息中显式的包含全部须要的信息。后端

因为 request 和 response 是松耦合的,这和咱们下面要讲到的使用 ask pattern 的模式不一样,不能做为面向对象编程在 actor 模型中的投影。设计模式

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.Terminated;
import akka.actor.typed.javadsl.Behaviors;
import scala.runtime.Nothing$;

@SuppressWarnings("WeakerAccess")
public class RequestResponseTypedActors {

    public static final class Request {
        public final String message;
        public final ActorRef<Response> replyTo;

        public Request(String message, ActorRef<Response> replyTo) {
            this.message = message;
            this.replyTo = replyTo;
        }
    }

    public static final class Response {
        public final String message;

        public Response(String message) {
            this.message = message;
        }
    }

    private static final Behavior<Request> responder = Behaviors
            .receiveMessage(request -> {
                System.out.println("got request: " + request.message);
                request.replyTo.tell(new Response("got it!"));
                return Behaviors.same();
            });

    private static Behavior<Response> requester(ActorRef<Request> responder) {
        return Behaviors
                .setup(ctx -> {
                    responder.tell(new Request("hello", ctx.getSelf()));
                    return Behaviors.receiveMessage(response -> {
                        System.out.println("got message " + response.message);
                        return Behaviors.stopped();
                    });
                });
    }

    public static void main(String[] args) {
        ActorSystem.create(Behaviors.setup(ctx -> {
            ActorRef<Request> res = ctx.spawn(responder, "responder");
            ActorRef<Response> req = ctx.spawn(requester(res), "requester");
            ctx.watch(req);

            return Behaviors.receiveSignal((context, sig) -> {
                if (sig instanceof Terminated) {
                    return Behaviors.stopped();
                } else {
                    return Behaviors.unhandled();
                }
            });
        }), "ReqResTyped");
    }
}

能够看到,responder 收到 requester 的消息中包含了 requester 的地址,所以能够发送回传信息。requester 之因此知道 responder 的地址是由开发者在建立 requester 的时候直接告诉它的。这样一个显然的问题就是 requester 是怎么天生地知道 responder 的地址的呢?在分布式系统中,这能够经过预配置固定地址或名称服务来达到这个目的。在 akka-typed 中,经过 ActorSelection 使用 actor 地址寻址的方式已经很难作到的(除非先用 untyped 强行获取再转回去,很是复杂并且 hacky),主要是经过 Receptionist 的名称服务来实现的。这个服务在 cluster 模式下要求必须使用 akka-cluster-typed,实在很有些臃肿。缓存

Query

这里提到的 Query 模式便是所谓的 ask 模式,也即发送一个消息后等待一个绑定到该消息上的回应。因为网络的滞后性和不肯定性,阻塞地等待这个回应不只会形成性能问题,更有可能因为回应消息的丢失或目标机器宕机而使得当前系统不可用。为此,咱们须要解耦请求和响应的过程,将本地返回的类型从直接的结果值变为包裹在 Future 中的值。而后,请求方的业务流程提高到 Future 域中,使用变换组合字来组合后续步骤。因为 Future 盒子是当即返回的,当前线程能够继续其余工做,在 Future 值被填充后根据此前组合的步骤执行相应的逻辑。这里顺带提一句,Future 也是函数式编程中著名的概念 Monad 的一个实例,上面这种域的提高和组合的方式是函数式编程中应对状态变化的一种经常使用手法。网络

因为返回的是一个 Future,咱们须要仔细的讨论这个 Future 完成的条件。首先,抽象地说,这个 Future 会在收到回复的时候完成。可是这个请求回复的双方若是是两个业务 actor 之间完成的话,业务 actor 就须要有一张表来缓存全部的 Future,而且在请求和回复中都带有 Future 的惟一标识符,以在接受到回复时自动的完成 Future。因为 Query 模式不是天生就伴随在 actor 模型里的,而是一个后期总结的设计模式,这么作会致使全部 actor 都承担这一没必要要的开销。并发

所以在 akka 当中采用的方法是,对于一个请求回复周期,使用一个专门的 PromiseActor 来负责和远端的 actor 交互,即请求消息由 PromiseActor 发出,收到回复时也由 PromiseActor 完成其中惟一的专用的 Future。在 akka-typed 中,还额外加入了 createRequestapplyToResponse 参数,前者用于将 PromiseActor 的引用传入发送的消息中,后者用于接收回复后适配为上层 actor 能接受的消息类型。app

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AskPattern;
import akka.actor.typed.javadsl.Behaviors;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;

public class Ask {
    public static class Pong { }
    public static class Ping {
        public final ActorRef<Pong> replyTo;
        public Ping(ActorRef<Pong> replyTo) {
            this.replyTo = replyTo;
        }
    }

    public static final Behavior<Ping> pongBehavior =
        Behaviors.receiveMessage(ping -> {
           ping.replyTo.tell(new Pong());
           return Behaviors.stopped();
        });

    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create(Behaviors.setup(ctx -> {
            ActorRef<Ping> ref = ctx.spawn(pongBehavior, "pong");
            CompletableFuture<Pong> future = AskPattern.ask(
                ref,
                Ping::new,
                Duration.ofMillis(2000L),
                ctx.getSystem().scheduler()).toCompletableFuture();
            future.whenComplete((pong, throwable) -> {
                ctx.getLog().info("pong={}, throwable={}", pong, throwable);
                ctx.getSystem().terminate();
            });
            return Behaviors.ignore();
        }), "ask");
    }
}

关于 ask 模式还有一个特别须要注意的点,即若是在 actor 中使用 AskSupport 的 ask,实际上这个 ask 中会有一个 spawn 出来的 PromiseActor。这一点在上面已经提过了,可是绝对值得再强调一遍。由于当你在 actor-1 中使用 AskSupport 应用 ask 模式和 actor-2 通讯时,实际通讯的是 PromiseActor。所以,akka 提供的关于两个 actor 之间消息的有序性对于这第三个 actor PromiseActor 是不成立的。这有可能致使很是微妙的并发竞争。

Pipe 和 Aggregate

最后简单说起两种消息流模式 Pipe 和 Aggregate,这两种模式常常配合 ask 模式使用。

Pipe 模式将一个 Future 在完成时发送到另外一个 actor 上,其基本形式为 pipe(future, replyTo),常常用于 actor-1 向 actor-2 请求,actor-2 进而向 actor-3 请求,随后在 actor-2 中将向 actor-3 请求的 future pipe 到 actor-1 的回复中。举个例子,actor-1 是 client,actor-2 是数据库前端,actor-3 是数据库后端。

Aggregate 模式能够视为 Ask 模式的一个天然扩展。Ask 模式一次只能处理一个请求及其回应,而 Aggregate 模式简单的扩展到一个显式建立的子 actor 来处理多个请求响应,很是直观。

相关文章
相关标签/搜索