响应式编程目前在编程领域都算式流行词汇了,做为Scala的创造公司Lightbend 公司(前身是 Typesafe)发起了响应式宣言(Reactive Manifesto)。Akka、Rx系列甚至Spring最新版本的WebFlux都加入到这场流行文化中。react
响应式编程用一句话总结可能也没法立刻理解,其实它自己也没有新的东西,主要式关注基于事件驱动的快速异步响应,固然为达到目的也得考虑如何容错(响应式四准则:灵敏性、伸缩性、容错性、事件驱动)。简单能够类比下咱们经常使用MVC模式,Model的变化会通知View层的快速改变(事件通知),而无需View不停的去查询Model层的变化(轮询)。这就是一种经过事件机制快速做出响应,固然响应式模式不仅简单的关注事件驱动和快速响应,也关注应用的对不一样负载的伸缩扩展,还有在异步模式下的高容错性。 编程
响应式编程中有个核心的问题要处理,就是响应流。Netflix、Pivotal、Typesafe等公司的工程师们在2013年共同发起了关于制定“响应式流规范(Reactive Stream Specification)”的倡议。其中描述了响应流的特色:缓存
Akka Stream就彻底实现了“响应式流规范(Reactive Stream Specification)”。bash
任何东西均可以成为一个响应流,例如变量、用户输入、属性、缓存、数据结构等等。你能够建立、合并、过滤响应流,一个响应流能够做为另外一个响应流的输入,甚至多个响应流也能够做为另外一个响应流的输入。举个例子咱们要从一个点击的响应流中处理双击或着屡次点击事件: 网络
背压(backpressure)是为了解决响应流源和接收者处理速度不一致而采起的一种由处理者告诉上游进行速度调整的一种方式。在没有背压的状况下,响应流可能出现以下状况: 数据结构
咱们开始写一些Akka Stream的相关代码来了解它。先创建一个sbt工程,在build中加入:异步
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12"
复制代码
为了可以运行全部的Stream,咱们先加入两个ActorSystem
和ActorMaterializer
的隐式变量:oop
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
object MyFirstStream {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("MyActorSystem")
implicit val materializer = ActorMaterializer()
}
}
复制代码
Akka Stream包含了三大基础构件Source、Sink、Flow。ui
val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1, 2, 3))
val sourceFromFuture = Source.fromFuture(Future.successful("hello"))
val sourceWithSingleElement = Source.single("just one")
val sourceEmittingTheSameElement = Source.repeat("again and again")
复制代码
val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)
val sinkReturningTheFirstElement = Sink.head
val sinkNoop = Sink.ignore
复制代码
val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // 当buffer满了后进行背压
复制代码
流能够经过基础组件构成的图和网络来表示,咱们先从最简单的方式来定义,将一个Source和Sink连起来就能够造成一个流: spa
val streamCalculatingSumOfElements: RunnableGraph[Future[Int]] = sourceFromIterable.toMat(sinkCalculatingASumOfElements)(Keep.right)
复制代码
这里面的Keep.right说明咱们只关心Sink最后获得的值。
咱们能够在者中间加上Flow,造成一个稍微复制的流:
val streamCalculatingSumOfDoubledElements: RunnableGraph[Future[Int]] = sourceFromIterable.via(flowDoublingElements).toMat(sinkCalculatingASumOfElements)(Keep.right)
复制代码
咱们如今可使用run方法执行前面建立的流,结果会放到Future中。
val sumOfElements: Future[Int] = streamCalculatingSumOfElements.run()
sumOfElements.foreach(println) // 打印出6
val sumOfDoubledElements: Future[Int] = streamCalculatingSumOfDoubledElements.run()
sumOfDoubledElements.foreach(println) // 打印出12
复制代码
咱们可使用更简单的方式来定义并执行流,不须要中间量:
// 使用指定的sink来执行流
sourceFromIterable.via(flowDoublingElements).runWith(sinkCalculatingASumOfElements).foreach(println)
// 使用Fold全部元素的sink来执行流
Source(List(1,2,3)).map(_ * 2).runFold(0)(_ + _).foreach(println)
复制代码