fs2的多线程编程模式不但提供了无阻碍I/O(java nio)能力,更为并行运算提供了良好的编程工具。在进入并行运算讨论前咱们先示范一下fs2 pipe2对象里的一些Stream合并功能。咱们先设计两个帮助函数(helper)来跟踪运算及模拟运算环境:java
1 def log[A](prompt: String): Pipe[Task,A,A] = _.evalMap {a =>
2 Task.delay { println(prompt + a); a}} //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]
3
4 Stream(1,2,3).through(log(">")).run.unsafeRun //> >1 5 //| >2 6 //| >3
log是个运算跟踪函数。编程
1 implicit val strategy = Strategy.fromFixedDaemonPool(4) 2 //> strategy : fs2.Strategy = Strategy
3 implicit val scheduler = Scheduler.fromFixedDaemonPool(2) 4 //> scheduler : fs2.Scheduler = Scheduler(java.util.concurrent.ScheduledThreadPoolExecutor@16022d9d[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0])
5 def randomDelay[A](max: FiniteDuration): Pipe[Task, A, A] = _.evalMap { a => { 6 val delay: Task[Int] = Task.delay { 7 scala.util.Random.nextInt(max.toMillis.toInt) 8 } 9 delay.flatMap { d => Task.now(a).schedule(d.millis) } 10 } 11 } //> randomDelay: [A](max: scala.concurrent.duration.FiniteDuration)fs2.Pipe[fs2.Task,A,A]
12 Stream(1,2,3).through(randomDelay(1.second)).through(log("delayed>")).run.unsafeRun 13 //> delayed>1 14 //| delayed>2 15 //| delayed>3
randomDelay是一个模拟任意延迟运算环境的函数。咱们也能够在链接randomDelay先后进行跟踪: 多线程
1 Stream(1,2,3).through(log("befor delay>")) 2 .through(randomDelay(1.second)) 3 .through(log("after delay>")).run.unsafeRun 4 //> befor delay>1 5 //| after delay>1 6 //| befor delay>2 7 //| after delay>2 8 //| befor delay>3 9 //| after delay>3
值得注意的是randomDelay并不会阻碍(block)当前运算。app
下面咱们来看看pipe2对象里的合并函数interleave:dom
1 val sa = Stream(1,2,3).through(randomDelay(1.second)).through(log("A>")) 2 //> sa : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>)
3 val sb = Stream(1,2,3).through(randomDelay(1.second)).through(log("B>")) 4 //> sb : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>)
5 (sa interleave sb).through(log("AB")).run.unsafeRun 6 //> A>1 7 //| B>1 8 //| AB>1 9 //| AB>1 10 //| A>2 11 //| B>2 12 //| AB>2 13 //| AB>2 14 //| A>3 15 //| B>3 16 //| AB>3 17 //| AB>3
咱们看到合并后的数据发送必须等待sa,sb完成了元素发送以后。这是一种固定顺序的合并操做。merge是一种不定顺序的合并方式,咱们看看它的使用示范:异步
1 (sa merge sb).through(log("AB>")).run.unsafeRun //> B>1 2 //| AB>1 3 //| B>2 4 //| AB>2 5 //| B>3 6 //| AB>3 7 //| A>1 8 //| AB>1 9 //| A>2 10 //| AB>2 11 //| A>3 12 //| AB>3
咱们看到merge不会同时等待sa,sb完成后再发送结果,只要其中一个完成发送就开始发送结果了。换言之merge合并基本上是跟着跑的快的那个,因此结果顺序是不规则不可肯定的(nondeterministic)。那么从运算时间上来说:interleave合并所花费时间就是肯定的sa+sb,而merge则选sa,sb之间最快的时间。固然整体运算所需时间是至关的,但在merge时咱们能够对发出的元素进行并行运算,能大大缩短运算时间。用merge其中一个问题是咱们没法肯定当前的元素是从那里发出的,咱们能够用either来解决这个问题:async
1 (sa either sb).through(log("AB>")).run.unsafeRun //> A>1 2 //| AB>Left(1) 3 //| B>1 4 //| AB>Right(1) 5 //| A>2 6 //| AB>Left(2) 7 //| B>2 8 //| AB>Right(2) 9 //| B>3 10 //| AB>Right(3) 11 //| A>3 12 //| AB>Left(3)
咱们经过left,right分辨数据源头。若是再增多一个Stream源头,咱们仍是能够用merge来合并三个Stream:函数
1 val sc = Stream.range(1,10).through(randomDelay(1.second)).through(log("C>")) 2 //> sc : fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>)
3 ((sa merge sb) merge sc).through(log("ABC>")).run.unsafeRun 4 //> B>1 5 //| ABC>1 6 //| C>1 7 //| ABC>1 8 //| A>1 9 //| ABC>1 10 //| B>2 11 //| ABC>2 12 //| A>2 13 //| ABC>2 14 //| B>3 15 //| ABC>3 16 //| C>2 17 //| ABC>2 18 //| A>3 19 //| ABC>3 20 //| C>3 21 //| ABC>3 22 //| C>4 23 //| ABC>4 24 //| C>5 25 //| ABC>5 26 //| C>6 27 //| ABC>6 28 //| C>7 29 //| ABC>7 30 //| C>8 31 //| ABC>8 32 //| C>9 33 //| ABC>9
若是咱们没法肯定数据源头数量的话,那么咱们能够用如下的类型款式来表示: 工具
Stream[Task,Stream[Task,A]]
这个类型表明的是Stream of Streams。在外部的Stream里包含了不肯定数量的Streams。用具体的例子能够解释:外部的Stream表明客端数据链接(connection),内部的Stream表明每一个客端读取的数据。把上面的三个Stream用这种类型来表示的话:ui
1 val streams:Stream[Task,Stream[Task,Int]] = Stream(sa,sb,sc) 2 //> streams : fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,Int]] = Segment(Emit(Chunk(Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>),Segment(Emit(Chunk(1, 2, 3))).flatMap(<function1>).flatMap(<function1>), S
3 egment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>))))
如今咱们不但须要对内部Stream进行运算还须要把结果打平成Stream[Task,A]。在fs2.concurrent包里就有这样一个组件(combinator):
def join[F[_],O](maxOpen: Int)(outer: Stream[F,Stream[F,O]])(implicit F: Async[F]): Stream[F,O] = {...}
输入参数outer和运算结果类型都对得上。maxOpen表明最多并行运算数。咱们能够用join运算上面合并sa,sb,sc的例子:
1 val ms = concurrent.join(3)(streams) //> ms : fs2.Stream[fs2.Task,Int] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>)
2 ms.through(log("ABC>")).run.unsafeRun //> C>1 3 //| ABC>1 4 //| A>1 5 //| ABC>1 6 //| C>2 7 //| ABC>2 8 //| B>1 9 //| ABC>1 10 //| C>3 11 //| ABC>3 12 //| A>2 13 //| ABC>2 14 //| B>2 15 //| ABC>2 16 //| C>4 17 //| ABC>4 18 //| A>3 19 //| ABC>3 20 //| B>3 21 //| ABC>3 22 //| C>5 23 //| ABC>5 24 //| C>6 25 //| ABC>6 26 //| C>7 27 //| ABC>7 28 //| C>8 29 //| ABC>8 30 //| C>9 31 //| ABC>9
结果就是咱们预料的。上面提到过maxOpen是最大并行运算数。咱们用另外一个例子来观察:
1 val rangedStreams = Stream.range(0,5).map {id =>
2 Stream.range(1,5).through(randomDelay(1.second)).through(log((('A'+id).toChar).toString +">")) } 3 //> rangedStreams : fs2.Stream[Nothing,fs2.Stream[fs2.Task,Int]] = Segment(Emit(Chunk(()))).flatMap(<function1>).mapChunks(<function1>)
4 concurrent.join(3)(rangedStreams).run.unsafeRun //> B>1 5 //| A>1 6 //| C>1 7 //| B>2 8 //| C>2 9 //| A>2 10 //| B>3 11 //| C>3 12 //| C>4 13 //| D>1 14 //| A>3 15 //| A>4 16 //| B>4 17 //| E>1 18 //| E>2 19 //| E>3 20 //| D>2 21 //| D>3 22 //| E>4 23 //| D>4
能够看到一共只有三个运算过程同时存在,如:ABC, ED...
当咱们的程序须要与外界程序交互时,可能会如下面的几种形式进行:
一、产生反作用的运算是同步运行的。这种状况最容易处理,由于直接能够获取结果
二、产生反作用的运算是异步的:经过调用一次callback函数来提供运算结果
三、产生反作用的运算是异步的,但结果必须经过屡次调用callback函数来分批提供
下面咱们就一种一种状况来分析:
一、同步运算最容易处理:咱们只须要把运算包嵌在Stream.eval里就好了:
1 def destroyUniverse: Unit = println("BOOOOM!!!") //> destroyUniverse: => Unit
2 val s = Stream.eval_(Task.delay(destroyUniverse)) ++ Stream("...move on") 3 //> s : fs2.Stream[fs2.Task,String] = append(attemptEval(Task).flatMap(<function1>).flatMap(<function1>), Segment(Emit(Chunk(()))).flatMap(<function1>))
4 s.runLog.unsafeRun //> BOOOOM!!! 5 //| res8: Vector[String] = Vector(...move on)
二、第二种状况:fs2里的Async trait有个async是用来登记callback函数的:
trait Async[F[_]] extends Effect[F] { self =>
/** Create an `F[A]` from an asynchronous computation, which takes the form of a function with which we can register a callback. This can be used to translate from a callback-based API to a straightforward monadic version. */ def async[A](register: (Either[Throwable,A] => Unit) => F[Unit]): F[A] = bind(ref[A]) { ref => bind(register { e => runSet(ref)(e) }) { _ => get(ref) }} ...
咱们用一个实际的例子来作示范,假设咱们有一个callback函数readBytes:
1 trait Connection { 2 def readBytes(onSuccess: Array[Byte] => Unit, onFailure: Throwable => Unit): Unit
这个Connection就是一个交互界面(interface)。假设它是这样实现实例化的:
1 val conn = new Connection { 2 def readBytes(onSuccess: Array[Byte] => Unit, onFailure: Throwable => Unit): Unit = { 3 Thread.sleep(1000) 4 onSuccess(Array(1,2,3,4,5)) 5 } 6 } //> conn : demo.ws.fs2Concurrent.connection = demo.ws.fs2Concurrent$$anonfun$main$1$$anon$1@4c40b76e
咱们能够用async登记(register)这个callback函数,把它变成纯代码可组合的(monadic)组件Task[Array[Byte]]:
1 val bytes = T.async[Array[Byte]] { (cb: Either[Throwable,Array[Byte]] => Unit) => { 2 Task.delay { conn.readBytes ( 3 ready => cb(Right(ready)), 4 fail => cb(Left(fail)) 5 ) } 6 }} //> bytes : fs2.Task[Array[Byte]] = Task
这样咱们才能用Stream.eval来运算bytes:
1 Stream.eval(bytes).map(_.toList).runLog.unsafeRun //> res9: Vector[List[Byte]] = Vector(List(1, 2, 3, 4, 5))
这种只调用一次callback函数的状况也比较容易处理:当咱们来不及处理数据时中止读取就是了。若是须要屡次调用callback,好比外部程序也是一个Stream API:一旦数据准备好就调用一次callback进行传送。这种状况下可能出现咱们的程序来不及处理收到的数据的情况。咱们能够用fs2.async包提供的queue来解决这个问题:
1 import fs2.async
2 import fs2.util.Async 3
4 type Row = List[String] 5 // defined type alias Row
6
7 trait CSVHandle { 8 def withRows(cb: Either[Throwable,Row] => Unit): Unit 9 } 10 // defined trait CSVHandle
11
12 def rows[F[_]](h: CSVHandle)(implicit F: Async[F]): Stream[F,Row] =
13 for { 14 q <- Stream.eval(async.unboundedQueue[F,Either[Throwable,Row]]) 15 _ <- Stream.suspend { h.withRows { e => F.unsafeRunAsync(q.enqueue1(e))(_ => ()) }; Stream.emit(()) } 16 row <- q.dequeue through pipe.rethrow 17 } yield row 18 // rows: [F[_]](h: CSVHandle)(implicit F: fs2.util.Async[F])fs2.Stream[F,Row]
enqueue1和dequeue在Queue trait里是这样定义的:
/** * Asynchronous queue interface. Operations are all nonblocking in their * implementations, but may be 'semantically' blocking. For instance, * a queue may have a bound on its size, in which case enqueuing may * block until there is an offsetting dequeue. */ trait Queue[F[_],A] { /** * Enqueues one element in this `Queue`. * If the queue is `full` this waits until queue is empty. * * This completes after `a` has been successfully enqueued to this `Queue` */ def enqueue1(a: A): F[Unit] /** Repeatedly call `dequeue1` forever. */ def dequeue: Stream[F, A] = Stream.repeatEval(dequeue1) /** Dequeue one `A` from this queue. Completes once one is ready. */ def dequeue1: F[A] ...
咱们用enqueue1把一次callback调用存入queue。dequeue的运算结果是Stream[F,Row],因此咱们用dequeue运算存在queue里的任务取出数据。
fs2提供了signal,queue,semaphore等数据类型。下面是一些使用示范:async.signal
1 Stream.eval(async.signalOf[Task,Int](0)).flatMap {s =>
2 val monitor: Stream[Task,Nothing] =
3 s.discrete.through(log("s updated>")).drain 4 val data: Stream[Task,Int] =
5 Stream.range(10,16).through(randomDelay(1.second)) 6 val writer: Stream[Task,Unit] =
7 data.evalMap {d => s.set(d)} 8 monitor merge writer 9 }.run.unsafeRun //> s updated>0 10 //| s updated>10 11 //| s updated>11 12 //| s updated>12 13 //| s updated>13 14 //| s updated>14 15 //| s updated>15
async.queue使用示范:
1 Stream.eval(async.boundedQueue[Task,Int](5)).flatMap {q =>
2 val monitor: Stream[Task,Nothing] =
3 q.dequeue.through(log("dequeued>")).drain 4 val data: Stream[Task,Int] =
5 Stream.range(10,16).through(randomDelay(1.second)) 6 val writer: Stream[Task,Unit] =
7 data.to(q.enqueue) 8 monitor mergeHaltBoth writer 9
10 }.run.unsafeRun //> dequeued>10 11 //| dequeued>11 12 //| dequeued>12 13 //| dequeued>13 14 //| dequeued>14 15 //| dequeued>15
fs2还在time包里提供了一些定时自动产生数据的函数和类型。咱们用一些代码来示范它们的用法:
1 time.awakeEvery[Task](1.second) 2 .through(log("time:")) 3 .take(5).run.unsafeRun //> time:1002983266 nanoseconds 4 //| time:2005972864 nanoseconds 5 //| time:3004831159 nanoseconds 6 //| time:4002104307 nanoseconds 7 //| time:5005091850 nanoseconds
awakeEvery产生的是一个无穷数据流,因此咱们用take(5)来取前5个元素。咱们也可让它运算5秒钟:
1 val tick = time.awakeEvery[Task](1.second).through(log("time:")) 2 //> tick : fs2.Stream[fs2.Task,scala.concurrent.duration.FiniteDuration] = Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>)
3 tick.run.unsafeRunFor(5.seconds) //> time:1005685270 nanoseconds 4 //| time:2004331473 nanoseconds 5 //| time:3005046945 nanoseconds 6 //| time:4002795227 nanoseconds 7 //| time:5002807816 nanoseconds 8 //| java.util.concurrent.TimeoutException
若是咱们但愿避免TimeoutException,能够用Task.schedule:
1 val tick = time.awakeEvery[Task](1.second).through(log("time:")) 2 //> tick : fs2.Stream[fs2.Task,scala.concurrent.duration.FiniteDuration] = Seg
3 ment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>).flatMap(<function1>) 4 tick.interruptWhen(Stream.eval(Task.schedule(true,5.seconds))) 5 .run.unsafeRun //> time:1004963839 nanoseconds 6 //| time:2005325025 nanoseconds 7 //| time:3005238921 nanoseconds 8 //| time:4004240985 nanoseconds 9 //| time:5001334732 nanoseconds 10 //| time:6003586673 nanoseconds 11 //| time:7004728267 nanoseconds 12 //| time:8004333608 nanoseconds 13 //| time:9003907670 nanoseconds 14 //| time:10002624970 nanoseconds
最直接的方法是用fs2的tim.sleep:
1 (time.sleep[Task](5.seconds) ++ Stream.emit(true)).runLog.unsafeRun 2 //> res14: Vector[Boolean] = Vector(true)
3 tick.interruptWhen(time.sleep[Task](5.seconds) ++ Stream.emit(true)) 4 .run.unsafeRun //> time:1002078506 nanoseconds 5 //| time:2005144318 nanoseconds 6 //| time:3004049135 nanoseconds 7 //| time:4002963861 nanoseconds 8 //| time:5000088103 nanoseconds