从上面多篇的讨论中咱们了解到scalaz-stream表明一串连续无穷的数据或者程序。对这个数据流的处理过程就是一个状态机器(state machine)的状态转变过程。这种模式与咱们一般遇到的程序流程很类似:经过程序状态的变化来推动程序进展。传统OOP式编程多是经过一些全局变量来记录当前程序状态,而FP则是经过函数组合来实现状态转变的。这个FP模式讲起来有些模糊和抽象,但实际上经过咱们前面长时间对FP编程的学习了解到FP编程讲究避免使用任何局部中间变量,更不用说全局变量了。FP程序的数据A是包嵌在算法F[A]内的。FP编程模式提供了一整套全新的数据更新方法来实现对F[A]中数据A的操做。对许多编程人员来说,FP的这种编程方式会显得很别扭、不容易掌握。若是咱们仔细观察分析,会发觉scalaz-stream就是一种很好的FP编程工具:它的数据也是不可变的(immutable),而且是包嵌在高阶类型结构里的,是经过Process状态转变来标示数据处理过程进展的。scalaz-stream的数据处理是有序流程,这样能够使咱们更容易分析理解程序的运算过程,它的三个大环节包括:数据源(source),数据传换(transducer)及数据终点(Sink/Channel)能够很形象地描绘一个程序运算的全过程。scalaz-stream在运算过程当中的并行运算方式(parallel computaion)、安全资源使用(resource safety)和异常处理能力(exception handling)是实现泛函多线程编程最好的支持。咱们先来看看scalaz-stream里的一个典型函数:java
/** * Await the given `F` request and use its result. * If you need to specify fallback, use `awaitOr` */ def await[F[_], A, O](req: F[A])(rcv: A => Process[F, O]): Process[F, O] = awaitOr(req)(Halt.apply)(rcv) /** * Await a request, and if it fails, use `fb` to determine the next state. * Otherwise, use `rcv` to determine the next state. */ def awaitOr[F[_], A, O](req: F[A])(fb: EarlyCause => Process[F, O])(rcv: A => Process[F, O]): Process[F, O] = Await(req,(r: EarlyCause \/ A) => Trampoline.delay(Try(r.fold(fb,rcv))))
这个await函数能够说是一个表明完整程序流程的典范。注意,awaitOr里的Await是个数据结构。这样咱们在递归运算await时能够避免StackOverflowError的发生。req: F[A]表明与外界交互的一个运算,如从外部获取输入、函数rcv对这个req产生的运算结果进行处理并设定程序新的状态。算法
1 import scalaz.stream._ 2 import scalaz.concurrent._ 3 object streamApps { 4 import Process._ 5 def getInput: Task[Int] = Task.delay { 3 } //> getInput: => scalaz.concurrent.Task[Int]
6 val prg = await(getInput)(i => emit(i * 3)) //> prg : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@4973813a,<function1>,<function1>)
7 prg.runLog.run //> res0: Vector[Int] = Vector(9)
8 }
这是一个一步计算程序。咱们能够再加一步:编程
1 val add10 = await1[Int].flatMap{i => emit(i + 10)} 2 //> add10 : scalaz.stream.Process[[x]scalaz.stream.Process.Env[Int,Any]#Is[x],Int] = Await(Left,<function1>,<function1>)
3 val prg1 = await(getInput)(i => emit(i * 3) |> add10) 4 //> prg1 : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@6737fd8f,<function1>,<function1>)
5 prg1.runLog.run //> res0: Vector[Int] = Vector(19)
add10是新增的一个运算步骤,是个transducer因此调用了Process1的函数await1,并用pipe(|>)来链接。实际上咱们能够用组合方式(compose)把add10和prg组合起来:数组
1 val prg3 = prg |> add10 //> prg3 : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End) ,Vector(<function1>))
2 prg3.runLog.run //> res1: Vector[Int] = Vector(19)
咱们一样能够增长一步输出运算:安全
1 val outResult: Sink[Task,Int] = sink.lift { i => Task.delay{println(s"the result is: $i")}} 2 //> outResult : scalaz.stream.Sink[scalaz.concurrent.Task,Int] = Append(Emit(Vector(<function1>)),Vector(<function1>))
3 val prg4 = prg1 to outResult //> prg4 : scalaz.stream.Process[[x]scalaz.concurrent.Task[x],Unit] = Append(Halt(End),Vector(<function1>, <function1>))
4 prg4.run.run //> the result is: 19
scalaz-stream的输出类型是Sink,咱们用to来链接。那么若是须要不断重复运算呢:数据结构
1 import scalaz._ 2 import Scalaz._ 3 import scalaz.concurrent._ 4 import scalaz.stream._ 5 import Process._ 6 object streamAppsDemo extends App { 7 def putLine(line: String) = Task.delay { println(line) } 8 def getLine = Task.delay { Console.readLine } 9 val readL = putLine("Enter:>").flatMap {_ => getLine} 10 val readLines = repeatEval(readL) 11 val echoLine = readLines.flatMap {line => eval(putLine(line))} 12 echoLine.run.run 13 }
这是一个无穷运算程序:不停地把键盘输入回响到显示器上。下面是一些测试结果:多线程
1 Enter:>
2 hello world!
3 hello world!
4 Enter:>
5 how are you?
6 how are you?
7 Enter:>
固然,咱们也能够把上面的程序表达的更形象些:app
1 val outLine: Sink[Task,String] = constant(putLine _).toSource 2 val echoInput: Process[Task,Unit] = readLines to outLine 3 //echoLine.run.run
4 echoInput.run.run
用to Sink来表述可能更形象。这个程序没有任何控制:甚至没法有意识地退出。咱们试着加一些控制机制:框架
1 def lines: Process[Task,String] = { 2 def go(line: String): Process[Task,String] =
3 line.toUpperCase match { 4 case "QUIT" => halt 5 case _ => emit(line) ++ await(readL)(go) 6 } 7 await(readL)(go) 8 } 9
10 val prg = lines to outLine 11 prg.run.run
在rcv函数里检查输入是否quit,若是是就halt,不然重复运算await。如今能够控制终止程序了。函数
下面再示范一下异常处理机制:看看能不能有效的捕捉到运行时的错误:
1 def mul(i: Int) = await1[String].flatMap { s => emit((s.toDouble * i).toString) }.repeat 2 val prg = (lines |> mul(5)) to outLine 3 prg.run.run
加了个transducer mul(5),若是输入是可转变为数字类型的就乘5否者会异常退出。下面是一些测试场景:
1 Enter:>
2 5
3 25.0
4 Enter:>
5 6
6 30.0
7 Enter:>
8 six 9 Exception in thread "main" java.lang.NumberFormatException: For input string: "six"
10 at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)
咱们能够用onFailure来捕捉任何错误:
1 def mul(i: Int) = await1[String].flatMap { s => emit((s.toDouble * i).toString) }.repeat 2 //val prg = (lines |> mul(5)) to outLine
3 val prg = (lines |> mul(5)).onFailure { e => emit("invalid input!!!") } to outLine 4 prg.run.run
如今运算结果变成了下面这样:
1 Enter:>
2 5
3 25.0
4 Enter:>
5 6
6 30.0
7 Enter:>
8 six 9 invalid input!!!
证实咱们捕捉并处理了错误。一个完整安全的程序还必须具有自动过后清理的功能。这项能够经过onComplete来实现:
1 def mul(i: Int) = await1[String].flatMap { s => emit((s.toDouble * i).toString) }.repeat 2 //val prg = (lines |> mul(5)) to outLine
3 val prg = (lines |> mul(5)).onFailure { e => emit("invalid input!!!") } 4 val prg1 = prg.onComplete{ Process.eval(Task.delay {println("end of program"); ""}) } to outLine 5 prg1.run.run
测试结果以下:
1 Enter:>
2 5
3 25.0
4 Enter:>
5 6
6 30.0
7 Enter:>
8 six 9 invalid input!!!
10 end of program
再有一个值得探讨的就是这些程序的组合集成。scalaz-stream就是存粹的泛函类型,那么基于scalaz-stream的程序就天然具有组合的能力了。咱们能够用两个独立的程序来示范Process程序组合:
1 import scalaz._ 2 import Scalaz._ 3 import scalaz.concurrent._ 4 import scalaz.stream._ 5 import Process._ 6 object prgStream extends App { 7 def prompt(prmpt: String) = Task.delay { print(prmpt) } 8 def putLine(line: String) = Task.delay { println(line) } 9 def getLine = Task.delay { Console.readLine } 10 val readLine1 = prompt("Prg1>:").flatMap {_ => getLine} 11 val readLine2 = prompt("Prg2>:").flatMap {_ => getLine} 12 val stdOutput = constant(putLine _).toSource 13 def multiplyBy(n: Int) = await1[String].flatMap {line =>
14 if (line.isEmpty) halt 15 else emit((line.toDouble * n).toString) 16 }.repeat 17 val prg1: Process[Task,String] = { 18 def go(line: String): Process[Task,String] = line.toUpperCase match { 19 case "QUIT" => halt 20 case _ => emit(line) ++ await(readLine1)(go) 21 } 22 await(readLine1)(go) 23 }.onComplete{ Process.eval(Task.delay {println("end of program1"); ""}) } 24 val prg2: Process[Task,String] = { 25 def go(line: String): Process[Task,String] = line.toUpperCase match { 26 case "QUIT" => halt 27 case _ => emit(line) ++ await(readLine2)(go) 28 } 29 await(readLine2)(go) 30 }.onComplete{ Process.eval(Task.delay {println("end of program2"); ""}) } 31 val program1 = (prg1 |> multiplyBy(3) to stdOutput) 32 val program2 = (prg2 |> multiplyBy(5) to stdOutput) 33
34 (program1 ++ program2).run.run 35
36 }
由于program的类型是Process[Task,String],因此咱们能够用++把它们链接起来。同时咱们应该看到在program的造成过程当中transducer multiplyBy是如何用|>与prg组合的。如今咱们看看测试运算结果:
1 Prg1>:3
2 9.0
3 Prg1>:4
4 12.0
5 Prg1>:quit 6 end of program1 7 Prg2>:5
8 25.0
9 Prg2>:6
10 30.0
11 Prg2>:quit 12 end of program2
咱们看到程序是按照流程走的。下面再试个流程控制程序分发(dispatching)的例子:
1 val program1 = (prg1 |> multiplyBy(3) observe stdOutput) 2 val program2 = (prg2 |> multiplyBy(5) observe stdOutput) 3
4 //(program1 ++ program2).run.run
5 val getOption = prompt("Enter your choice>:").flatMap {_ => getLine } 6 val mainPrg: Process[Task,String] = { 7 def go(input: String): Process[Task,String] = input.toUpperCase match { 8 case "QUIT" => halt 9 case "P1" => program1 ++ await(getOption)(go) 10 case "P2" => program2 ++ await(getOption)(go) 11 case _ => await(getOption)(go) 12 } 13 await(getOption)(go) 14 }.onComplete{ Process.eval(Task.delay {println("end of main"); ""}) } 15
16 mainPrg.run.run
咱们先把program1和program2的终点类型Sink去掉。用observe来实现数据复制分流。这样program1和program2的结果类型才能与await的类型相匹配。咱们能够测试运行一下:
1 Enter your choice>:p2 2 Prg2>:3
3 15.0
4 Prg2>:5
5 25.0
6 Prg2>:quit 7 end of program2 8 Enter your choice>:p1 9 Prg1>:3
10 9.0
11 Prg1>:6
12 18.0
13 Prg1>:quit 14 end of program1 15 Enter your choice>:wat 16 Enter your choice>:oh no 17 Enter your choice>:quit 18 end of main
scalaz-stream是一种泛函类型。咱们在上面已经示范了它的函数组合能力。固然,若是程序的类型是Process,那么咱们能够很容易地用merge来实现并行运算。
scalaz-stream做为一种程序运算框架能够轻松实现FP程序的组合,那么它成为一种安全稳定的泛函多线程编程工具就会是很好的选择。