Akka系列(九):Akka分布式之Akka Remote

Akka做为一个天生用于构建分布式应用的工具,固然提供了用于分布式组件即Akka Remote,那么咱们就来看看如何用Akka Remote以及Akka Serialization来构建分布式应用。java

背景

不少同窗在程序的开发中都会遇到一个问题,当业务需求变得愈来愈复杂,单机服务器已经不足以承载相应的请求的时候,咱们都会考虑将服务部署到不一样的服务器上,但服务器之间可能须要相互调用,那么系统必须拥有相互通讯的接口,用于相应的数据交互,这时候一个好的远程调用方案是一个绝对的利器,主流的远程通讯有如下几种选择:git

  • RPC(Remote Procedure Call Protocol)github

  • Web Service设计模式

  • RMI (Remote Method Invocation)服务器

  • JMS(Java Messaging Service)网络

这几种方式都是被采用比较普遍的通讯方案,有兴趣的同窗能够本身去了解一下,这里我会讲一下RMI和JMS。app

JAVA远程调用

RMI和JMS相信不少写过Java程序的同窗都知道,是Java程序用来远程通讯的主要方式,那么RMI和JMS又有什么区别呢?异步

1.RMI

i.特征:
  • 同步通讯:在使用RMI调用远程方法时,线程会持续等待直到结果返回,因此它是一个同步阻塞操做;tcp

  • 强耦合:请求的系统中须要使用的RMI服务进行接口声明,返回的数据类型有必定的约束;分布式

ii.优势:
  • 实现相对简单,方法调用形式通俗易理解,接口声明服务功能清晰。

iii.缺点:
  • 只局限支持JVM平台;

  • 对没法兼容Java语言的其余语言也不适用;

2.JMS

i.特征:
  • 异步通讯:JMS发送消息进行通讯,在通讯过程当中,线程不会被阻塞,没必要等待请求回应,因此是一个异步操做;

  • 松耦合:不须要接口声明,返回的数据类型能够是各类各样,好比JSON,XML等;

ii.通讯方式:

(1)点对点消息传送模型

顾名思义,点对点能够理解为两个服务器的定点通讯,发送者和接收者都能明确知道对方是谁,大体模型以下:

jms-point-to-point

(2)发布/订阅消息传递模型

点对点模型有些场景并非很适用,好比有一台主服务器,它产生一条消息须要让全部的从服务器都能收到,若采用点对点模型的话,那主服务器须要循环发送消息,后续如有新的从服务器增长,还要改主服务器的配置,这样就会致使没必要要的麻烦,那么发布/订阅模型是怎么样的呢?其实这种模式跟设计模式中的观察者模式很类似,相信不少同窗都很熟悉,它最大的特色就是较松耦合,易扩展等特色,因此发布/订阅模型的大体结构以下:

jms-topic

iii.优势:
  • 因为使用异步通讯,不须要线程暂停等待,性能相对较高。

iiii.缺点:
  • 技术实现相对复杂,并须要维护相关的消息队列;

更通俗的说:

RMI能够当作是用打电话的方式进行信息交流,而JMS更像是发短信。

总的来讲两种方式没有孰优孰劣,咱们也不用比较到底哪一种方式比较好,存在即合理,更重要的是哪一种选择可能更适合你的系统。

Akka Remote

上面讲到JAVA中远程通讯的方式,但咱们以前说过Akka也是基于JVM平台的,那么它的通讯方式又有什么不一样呢?

在我看来,Akka的远程通讯方式更像是RMI和JMS的结合,但更偏向于JMS的方式,为何这么说呢,咱们先来看一个示例:

咱们先来建立一个远程的Actor:

class RemoteActor extends Actor {
  def receive = {
    case msg: String =>
      println(s"RemoteActor received message '$msg'")
      sender ! "Hello from the RemoteActor"
  }
}

如今咱们在远程服务器上启动这个Actor:

val system = ActorSystem("RemoteDemoSystem")
val remoteActor = system.actorOf(Props[RemoteActor], name = "RemoteActor")

那么如今咱们假若有一个系统须要向这个Actor发送消息应该怎么作呢?

首先咱们须要相似RMI发布本身的服务同样,咱们须要为其余系统调用远程Actor提供消息通讯的接口,在Akka中,设置很是简单,不须要代码侵入,只需简单的在配置文件里配置便可:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = $localIp  //好比127.0.0.1
      port = $port //好比2552
    }
    log-sent-messages = on
    log-received-messages = on
  }
}

咱们只需配置相应的驱动,传输方式,ip,端口等属性就可简单完成Akka Remote的配置。

固然本地服务器也须要配置这些信息,由于Akka之间是须要相互通讯的,固然配置除了hostname有必定的区别外,其余配置信息可一致,本例子是在同一台机器上,因此这里hostname是相同的。

这时候咱们就能够在本地的服务器向这个Actor发送消息了,首先咱们能够建立一个本地的Actor:

case object Init
case object SendNoReturn

class LocalActor extends Actor{

  val path = ConfigFactory.defaultApplication().getString("remote.actor.name.test")
  implicit val timeout = Timeout(4.seconds)
  val remoteActor = context.actorSelection(path)

  def receive: Receive = {
    case Init => "init local actor"
    case SendNoReturn => remoteActor ! "hello remote actor"
  }
}

其中的remote.actor.name.test的值为:“akka.tcp://RemoteDemoSystem@127.0.0.1:4444/user/RemoteActor”,另外咱们能够看到咱们使用了context.actorSelection(path)来获取的是一个ActorSelection对象,如果须要得到ActorRef,咱们能够调用它的resolveOne(),它返回的是是一个Future[ActorRef],这里是否是很熟悉,由于它跟本地获取Actor方式是同样的,由于Akka中Actor是位置透明的,获取本地Actor和远程Actor是同样的。

最后咱们首先启动远程Actor的系统:

object RemoteDemo extends App  {
  val system = ActorSystem("RemoteDemoSystem")
  val remoteActor = system.actorOf(Props[RemoteActor], name = "RemoteActor")
  remoteActor ! "The RemoteActor is alive"
}

而后咱们在本地系统中启动这个LocalActor,并向它发送消息:

object LocalDemo extends App {

  implicit val system = ActorSystem("LocalDemoSystem")
  val localActor = system.actorOf(Props[LocalActor], name = "LocalActor")

  localActor ! Init
  localActor ! SendNoReturn
}

咱们能够看到RemoteActor收到了一条消息:

send-no-return

从以上的步骤和结果看出能够看出,Akka的远程通讯跟JMS的点对点模式彷佛更类似一点,可是它有不须要咱们维护消息队列,而是使用Actor自身的邮箱,另外咱们利用context.actorSelection获取的ActorRef,能够当作远程Actor的副本,这个又和RMI相关概念相似,因此说Akka远程通讯的形式上像是RMI和JMS的结合,固然底层仍是经过TCP、UDP等相关网络协议进行数据传输的,从配置文件的相应内容即可以看出。

上述例子演示的是sendNoReturn的模式,那么假如咱们须要远程Actor给咱们一个回复应该怎么作呢?

首先咱们建立一个消息:

case object SendHasReturn

 def receive: Receive = {
    case SendHasReturn =>
      for {
        r <- remoteActor.ask("hello remote actor")
      } yield r
  }

咱们从新运行LocalActor并像RemoteActor发送一条消息:

send-has-return

能够看到LocalActor在发送消息后并收到了RemoteActor返回来的消息,另外咱们这里设置了超时时间,若在规定的时间内没有获得反馈,程序就会报错。

Akka Serialization

其实这一部分本能够单独拿出来写,可是相信序列化这块你们都应该有所了解了,因此就不许备讲太多序列化的知识了,怕班门弄斧,主要讲讲Akka中的序列化。

继续上面的例子,假如咱们这时向RemoteActor发送一个自定义的对象,好比一个case class对象,可是咱们这是是在网络中传输这个消息,那么怎么保证这个对象类型和值呢,在同一个JVM系统中咱们不须要担忧这个,由于对象就在堆中,咱们只要传递相应的地址便可就行,可是在不一样的环境中,咱们并不能这么作,咱们在网络中只能传输字节数据,因此咱们必须将对象作特殊的处理,在传输的时候转化成特定的由一连串字节组成的数据,并且咱们又能够根据这些数据恢复成一个相应的对象,这即是序列化。

咱们先定义一个参与的case class, 并修改一下上面发送消息的语句:

case object SendSerialization
case class JoinEvt(
    id: Long,
    name: String
)
def receive: Receive = {
    case SendSerialization =>
      for {
        r <- remoteActor.ask(JoinEvt(1L,"godpan"))
      } yield println(r)
  }

这时咱们从新启动RemoteActor和LocalActor所在的系统,发送这条消息:

send-serialization

有同窗可能会以为奇怪,咱们明明没有对JoinEvt进行过任何序列化的标识和处理,为何程序还能运行成功呢?

其实否则,只不过是有人替咱们默认作了,不用说,确定是贴心的Akka,它为咱们提供了一个默认的序列化策略,那就是咱们熟悉又纠结的java.io.Serializable,沉浸在它的易使用性上,又对它的性能深恶痛绝,尤为是当有大量对象须要传输的分布式系统,若是是小系统,当我没说,毕竟存在即合理。

又有同窗说,既然Akka是一个天生分布式组件,为何还用低效的java.io.Serializable,你问我我也不知道,可能当时的做者偷了偷懒,固然Akka如今可能觉醒了,首先它支持第三方的序列化工具,固然若是你有特殊需求,你也能够本身实现一个,并且在最新的文档中说明,在Akka 2.5x以后Akka内核消息全面废弃java.io.Serializable,用户自定义的消息暂时仍是支持使用java.io.Serializable的,可是不推荐用,由于它是低效的,容易被攻击,因此在这里我也推荐你们再Akka中尽可能不要在使用了java.io.Serializable。

那么在Akka中咱们如何使用第三方的序列化工具呢?

这里我推荐一个在Java社区已经久负盛名的序列化工具:kryo,有兴趣的同窗能够去了解一下:kryo,并且它也提供Akka使用的相关包,这里咱们就使用它做为示例:

这里我贴上整个项目的build.sbt, kryo的相关依赖也在里面:

import sbt._
import sbt.Keys._

lazy val AllLibraryDependencies =
  Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.5.3",
    "com.typesafe.akka" %% "akka-remote" % "2.5.3",
    "com.twitter" %% "chill-akka" % "0.8.4"
  )

lazy val commonSettings = Seq(
  name := "AkkaRemoting",
  version := "1.0",
  scalaVersion := "2.11.11",
  libraryDependencies := AllLibraryDependencies
)

lazy val remote = (project in file("remote"))
  .settings(commonSettings: _*)
  .settings(
    // other settings
  )

lazy val local = (project in file("local"))
  .settings(commonSettings: _*)
  .settings(
    // other settings
  )

而后咱们只需将application.conf中的actor配置替换成如下的内容:

actor {
    provider = "akka.remote.RemoteActorRefProvider"
    serializers {
      kryo = "com.twitter.chill.akka.AkkaSerializer"
    }
    serialization-bindings {
      "java.io.Serializable" = none
      "scala.Product" = kryo
    }
  }

其实其中的"java.io.Serializable" = none能够省略,由于如果有其余序列化的策略则会替换掉默认的java.io.Serializable的策略,这里只是为了更加仔细的说明。

至此咱们就可使用kryo了,整个过程是否是很easy,火烧眉毛开始写demo了,那就快快开始吧。

整个例子的相关的源码已经上传到akka-demo中:源码连接

相关文章
相关标签/搜索