初识Akka Stream

背景

响应式编程

响应式编程目前在编程领域都算式流行词汇了,做为Scala的创造公司Lightbend 公司(前身是 Typesafe)发起了响应式宣言(Reactive Manifesto)。Akka、Rx系列甚至Spring最新版本的WebFlux都加入到这场流行文化中。react

响应式编程用一句话总结可能也没法立刻理解,其实它自己也没有新的东西,主要式关注基于事件驱动的快速异步响应,固然为达到目的也得考虑如何容错(响应式四准则:灵敏性、伸缩性、容错性、事件驱动)。简单能够类比下咱们经常使用MVC模式,Model的变化会通知View层的快速改变(事件通知),而无需View不停的去查询Model层的变化(轮询)。这就是一种经过事件机制快速做出响应,固然响应式模式不仅简单的关注事件驱动和快速响应,也关注应用的对不一样负载的伸缩扩展,还有在异步模式下的高容错性。 编程

响应式编程准则

响应流

响应式编程中有个核心的问题要处理,就是响应流。Netflix、Pivotal、Typesafe等公司的工程师们在2013年共同发起了关于制定“响应式流规范(Reactive Stream Specification)”的倡议。其中描述了响应流的特色:缓存

  • 具备处理无限数量的元素的能力
  • 按序处理
  • 异步地传递元素
  • 必须实现非阻塞的背压

Akka Stream就彻底实现了“响应式流规范(Reactive Stream Specification)”。bash

任何东西均可以成为一个响应流,例如变量、用户输入、属性、缓存、数据结构等等。你能够建立、合并、过滤响应流,一个响应流能够做为另外一个响应流的输入,甚至多个响应流也能够做为另外一个响应流的输入。举个例子咱们要从一个点击的响应流中处理双击或着屡次点击事件: 网络

点击响应流

背压

背压(backpressure)是为了解决响应流源和接收者处理速度不一致而采起的一种由处理者告诉上游进行速度调整的一种方式。在没有背压的状况下,响应流可能出现以下状况: 数据结构

没有背压
那么使用背压有两种,一种是源发送数据过快,另外一种是发送过忙。接收一方都会往上汇报以采起合适的发送速度。固然具体的处理有多种策略,这里不细讲,以下有个简单的例子:
背压
背压
实际上背压很早就出现了,之前背压一般经过阻塞生产者来实现,而等待消费者以本身的速度处理消息。这种依赖于系统间消息同步处理的方法是很是低效的,而且否认了异步处理的优点(更大的可扩展性和更好的资源利用率),所以须要一种用于实现背压的非阻塞解决方案。在响应流的背景下,背压是异步处理模型的一个组成部分,并经过异步消息传递来实现。

开始

咱们开始写一些Akka Stream的相关代码来了解它。先创建一个sbt工程,在build中加入:异步

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.12"
复制代码

为了可以运行全部的Stream,咱们先加入两个ActorSystemActorMaterializer的隐式变量:oop

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object MyFirstStream {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("MyActorSystem")
    implicit val materializer = ActorMaterializer()
    
  }
}
复制代码

构建Stream基础构件

Akka Stream包含了三大基础构件Source、Sink、Flow。ui

Source

Source
Source即响应流的源头,源头具备一个数据出口,如上图比较形象的描述了Source。咱们能够经过各类数据来建立一个Source:

val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1, 2, 3))
val sourceFromFuture = Source.fromFuture(Future.successful("hello"))
val sourceWithSingleElement = Source.single("just one")
val sourceEmittingTheSameElement = Source.repeat("again and again")
复制代码

Sink

Sink
Sink就是流的最终目的地,包含一个数据入口,咱们能够以下来建立Sink:

val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)
val sinkReturningTheFirstElement = Sink.head
val sinkNoop = Sink.ignore
复制代码

Flow

Flow
Flow就是流的中间组件,包含一个数据入口和数据出口。咱们能够这样来建立Flow:

val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // 当buffer满了后进行背压
复制代码

定义流

流能够经过基础组件构成的图和网络来表示,咱们先从最简单的方式来定义,将一个Source和Sink连起来就能够造成一个流: spa

Source Sink

val streamCalculatingSumOfElements: RunnableGraph[Future[Int]] = sourceFromIterable.toMat(sinkCalculatingASumOfElements)(Keep.right)
复制代码

这里面的Keep.right说明咱们只关心Sink最后获得的值。

咱们能够在者中间加上Flow,造成一个稍微复制的流:

Source Flow Sink

val streamCalculatingSumOfDoubledElements: RunnableGraph[Future[Int]] = sourceFromIterable.via(flowDoublingElements).toMat(sinkCalculatingASumOfElements)(Keep.right)
复制代码

执行流

咱们如今可使用run方法执行前面建立的流,结果会放到Future中。

val sumOfElements: Future[Int] = streamCalculatingSumOfElements.run()
sumOfElements.foreach(println) // 打印出6
val sumOfDoubledElements: Future[Int] = streamCalculatingSumOfDoubledElements.run()
sumOfDoubledElements.foreach(println) // 打印出12
复制代码

咱们可使用更简单的方式来定义并执行流,不须要中间量:

// 使用指定的sink来执行流
sourceFromIterable.via(flowDoublingElements).runWith(sinkCalculatingASumOfElements).foreach(println)
// 使用Fold全部元素的sink来执行流
Source(List(1,2,3)).map(_ * 2).runFold(0)(_ + _).foreach(println)
复制代码
相关文章
相关标签/搜索