由于我了解Akka-http的主要目的不是为了有关Web-Server的编程,而是想实现一套系统集成的api,因此也须要考虑由服务端主动向客户端发送指令的应用场景。好比一个零售店管理平台的服务端在完成了某些数据更新后须要通知各零售门市客户端下载最新数据。虽然Akka-http也提供对websocket协议的支持,但websocket的网络链接是双向恒久的,适合频繁的问答交互式服务端与客户端的交流,消息结构也比较零碎。而咱们面临的多是批次型的大量数据库数据交换,只须要简单的服务端单向消息就好了,因此websocket不太合适,而Akka-http的SSE应该比较适合咱们的要求。SSE模式的基本原理是服务端统一集中发布消息,各客户端持久订阅服务端发布的消息并从消息的内容中筛选出属于本身应该执行的指令,而后进行相应的处理。客户端接收SSE是在一个独立的线程里不断进行的,不会影响客户端当前的运算流程。当收到有用的消息后就会调用一个业务功能函数做为后台异步运算任务。web
服务端的SSE发布是以Source[ServerSentEvent,NotUsed]来实现的。ServerSentEvent类型定义以下:数据库
/** * Representation of a server-sent event. According to the specification, an empty data field designates an event * which is to be ignored which is useful for heartbeats. * * @param data data, may span multiple lines * @param eventType optional type, must not contain \n or \r * @param id optional id, must not contain \n or \r * @param retry optional reconnection delay in milliseconds */ final case class ServerSentEvent( data: String, eventType: Option[String] = None, id: Option[String] = None, retry: Option[Int] = None) {...}
这个类型的参数表明事件消息的数据结构。用户能够根据实际须要充分利用这个数据结构来传递消息。服务端是经过complete以SeverSentEvent类为元素的Source来进行SSE的,以下:编程
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ complete { Source .tick(2.seconds, 2.seconds, NotUsed) .map( _ => processToServerSentEvent) .keepAlive(1.second, () => ServerSentEvent.heartbeat) }
以上代码表明服务端定时运算processToServerSentEvent返回ServerSentEvent类型结果后发布给全部订阅的客户端。咱们用一个函数processToServerSentEvent模拟重复运算的业务功能:api
private def processToServerSentEvent: ServerSentEvent = { Thread.sleep(3000) //processing delay
ServerSentEvent(SyncFiles.fileToSync) }
这个函数模拟发布事件数据是某种业务运算结果,在这里表明客户端须要下载文件名称。咱们用客户端request来模拟设定这个文件名称:websocket
object SyncFiles { var fileToSync: String = "" } private def route = { import Directives._ import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ def syncRequests = pathPrefix("sync") { pathSingleSlash { post { parameter("file") { filename => complete { SyncFiles.fileToSync = filename s"set download file to : $filename" } } } } }
客户端订阅SSE的方式以下:网络
import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._ import system.dispatcher Http() .singleRequest(Get("http://localhost:8011/events")) .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]) .foreach(_.runForeach(se => downloadFiles(se.data)))
每当客户端收到SSE后即运行downloadFiles(filename)函数。downloadFiles函数定义:数据结构
def downloadFiles(file: String) = { Thread.sleep(3000) //process delay
if (file != "") println(s"Try to download $file") }
下面是客户端程序的测试运算步骤:异步
scala.io.StdIn.readLine() println("do some thing ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders") ).onSuccess { case msg => println(msg) } scala.io.StdIn.readLine() println("do some other things ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items") ).onSuccess { case msg => println(msg) }
运算结果:socket
do some thing ... HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1)) Try to download Orders Try to download Orders do some other things ... HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1)) Try to download Orders Try to download Orders Try to download Items Try to download Items Try to download Items Process finished with exit code 0
下面是本次讨论的示范源代码:函数
服务端:
import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import scala.concurrent.duration.DurationInt import akka.http.scaladsl.model.sse.ServerSentEvent object SSEServer { def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val mat = ActorMaterializer() Http().bindAndHandle(route, "localhost", 8011) scala.io.StdIn.readLine() system.terminate() } object SyncFiles { var fileToSync: String = "" } private def route = { import Directives._ import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ def syncRequests = pathPrefix("sync") { pathSingleSlash { post { parameter("file") { filename => complete { SyncFiles.fileToSync = filename s"set download file to : $filename" } } } } } def events = path("events") { get { complete { Source .tick(2.seconds, 2.seconds, NotUsed) .map( _ => processToServerSentEvent) .keepAlive(1.second, () => ServerSentEvent.heartbeat) } } } syncRequests ~ events } private def processToServerSentEvent: ServerSentEvent = { Thread.sleep(3000) //processing delay
ServerSentEvent(SyncFiles.fileToSync) } }
客户端:
import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.client.RequestBuilding.Get import akka.http.scaladsl.model.HttpMethods import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import akka.http.scaladsl.model.sse.ServerSentEvent import akka.http.scaladsl.model._ object SSEClient { def downloadFiles(file: String) = { Thread.sleep(3000) //process delay
if (file != "") println(s"Try to download $file") } def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val mat = ActorMaterializer() import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._ import system.dispatcher Http() .singleRequest(Get("http://localhost:8011/events")) .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]]) .foreach(_.runForeach(se => downloadFiles(se.data))) scala.io.StdIn.readLine() println("do some thing ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders") ).onSuccess { case msg => println(msg) } scala.io.StdIn.readLine() println("do some other things ...") Http().singleRequest( HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items") ).onSuccess { case msg => println(msg) } scala.io.StdIn.readLine() system.terminate() } }
个人博客即将同步至腾讯云+社区。邀你们一同入驻http://cloud.tencent.com/developer/support-plan