若是scalaz-stream真的是一个实用的数据流编程工具库的话,那它应该能处理同时从多个数据源获取数据以及把数据同时送到多个终点(Sink),最重要的是它应该能够实现高度灵活的多线程运算。可是:咱们说Process表明了一串多是无穷的元素。这个一串的意思是多个按序排列的元素。也就是说若是咱们有一个Process(a,b,c),那么咱们只能按顺序来进行运算:咱们只能在完成了对a的运算后才能运算b。这样也说得过去:它让咱们更容易理解scalaz-stream Process的运算过程。面对scalaz-stream这样的特性咱们应该怎样去实现它的并行运算呢?实际上在不少应用场景中咱们对运算结果的排列顺序并不关心,咱们只对运算结果内容感兴趣。如:从数据库库存表中查询商品价格大于100的全部商品,这时咱们对读出商品记录的顺序并不关心,咱们只对每条记录的价格感兴趣。若是咱们从不少源头(数据表)读取商品信息的话,能够同时对这些源头进行并行读取。scalaz-stream是经过merge来实现并行运算的。merge能够同时读取多个数据源而后产生一个合并的数据流。因为各个源头的滞后状况有所不一样,因此merge产生结果的顺序是不可预测的(nondeterministic)。咱们用个例子来示范有那些方法能够同时从三个文件中逐行读取文字而后再合并成一个多行文件:数据库
1 al p1 = io linesR s"/Users/Tiger/Process.scala"
2 //> p1 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@7494e528,<function1>,<function1>)
3 val p2 = io linesR s"/Users/Tiger/Wye.scala"
4 //> p2 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@1f554b06,<function1>,<function1>)
5 val p3 = io linesR s"/Users/Tiger/Tee.scala"
6 //> p3 : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@694e1548,<function1>,<function1>)
p1,p2,p3是三个Source。它们分别从Process.scala, Wye.scala, Tee.scala中读取数据。咱们能够模拟读取数据时可能遇到的延迟:编程
1 //假定读取数据形成不肯定延迟
2 def readDelay(i: Int) = Thread.sleep( i/10 ) //> readDelay: (i: Int)Unit
3 val pa = p1.map{ s => readDelay(s.length); s} //> pa : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@7494e528,<function1>,<function1>)
4 val pb = p2.map{ s => readDelay(s.length); s} //> pb : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@1f554b06,<function1>,<function1>)
5 val pc = p3.map{ s => readDelay(s.length); s} //> pc : scalaz.stream.Process[scalaz.concurrent.Task,String] = Await(scalaz.concurrent.Task@694e1548,<function1>,<function1>)
如今pa,pb,pc都按照所读文件中每行文字长度来产生滞延。下面咱们先统计一下每一个Process运算所须要的时间:多线程
1 val pa_start = System.currentTimeMillis //> pa_start : Long = 1470051661503
2 val palines= pa.runFoldMap(_ => 1).run //> palines : Int = 1616
3 println(s"reading p1 $palines lines in ${System.currentTimeMillis - pa_start}ms") 4 //> reading p1 1616 lines in 6413ms
5 val pb_start = System.currentTimeMillis //> pb_start : Long = 1470051667917
6 val pblines=pb.runFoldMap(_ => 1).run //> pblines : Int = 901
7 println(s"reading p2 $pblines lines in ${System.currentTimeMillis - pb_start}ms") 8 //> reading p2 901 lines in 3275ms
9 val pc_start = System.currentTimeMillis //> pc_start : Long = 1470051671192
10 val pclines=pc.runFoldMap(_ => 1).run //> pclines : Int = 306
11 println(s"reading p3 $pclines lines in ${System.currentTimeMillis - pc_start}ms") 12 //> reading p3 306 lines in 1181ms
13 println(s"reading all ${palines+pblines+pclines} lines in ${System.currentTimeMillis - pa_start}ms") 14 //> reading all 2823 lines in 10870ms
三个文件总共有2823行,读取时间为10870ms。咱们用append方式来连续运算:app
1 val pl_start = System.currentTimeMillis //> pl_start : Long = 1470051672373
2 val plines = (pa ++ pb ++ pc).runFoldMap(_ => 1).run 3 //> plines : Int = 2823
4 println(s"continue reading $plines in ${System.currentTimeMillis - pl_start}ms") 5 //> continue reading 2823 in 10501ms
连续运算所需时间10501ms,稍微短于分开运算结果。那么若是咱们用merge来并行运算呢?ide
1 val par_start = System.currentTimeMillis //> par_start : Long = 1470051682874
2 val parlines = (pa merge pb merge pc).runFoldMap(_ => 1).run 3 //> parlines : Int = 2823
4 println(s"parallel reading $parlines in ${System.currentTimeMillis - par_start}ms") 5 //> parallel reading 2823 in 6278ms
如今整个运算只须要6278ms,约莫是连续运算所需时间的60%。固然,若是咱们须要从更多的源头读取数据的话,那么merge方法能够实现更高的效率提高。可是,因为stream多是一串无穷的元素,咱们更须要对一个stream无穷的元素实现并行运算。在上面的例子里咱们用merge把三个源头的数据合并成为一个更长的数据串,若是咱们对其中每条记录进行运算如抽取、对比筛选等的话,那么运算时间仍然与数据串的长度成直线正比。好比:在以上例子的基础上,咱们须要对合并的数据进行统计:计算出使用元音(vowl)的频率的。咱们能够先把每条记录中的vowl过滤出来;而后把全部筛选出来的记录加起来就能得出这个统计结果了:函数
1 /c 是个vowl 2 def vowls(c: Char): Boolean = List('A','E','I','O','U').contains(c) 3 //> vowls: (c: Char)Boolean 4
5 //返回Map表明每一个字符频率, 测试使用了scalaz.Lens
6 def vowlCount(text: String): Map[Char,Int] = { 7 text.toUpperCase.toList.filter(vowls).foldLeft(Map[Char,Int]()) { (b,a) =>
8 if ((Lens.mapVLens(a) get b) == None) Lens.mapVLens(a) set(b,1.some) 9 else Lens.mapVLens(a).set(b, (Lens.mapVLens(a) get b).map(_ + 1)) 10 } 11 } //> vowlCount: (text: String)Map[Char,Int] 12 //直接用scala标准库实现
13 def stdVowlsCount(text: String): Map[Char,Int] =
14 text.toUpperCase.toList.filter(vowls).groupBy(s => s).mapValues(_.size) 15 //> stdVowlsCount: (text: String)Map[Char,Int]
咱们先按序运算结果:工具
1 //为runFoldMap提供一个Map[Char,Int]Monoid实例
2 implicit object mapMonoid extends Monoid[Map[Char,Int]] { 3 def zero: Map[Char,Int] = Map() 4 def append(m1: Map[Char,Int], m2: => Map[Char,Int]): Map[Char,Int] = { 5 (m1.keySet ++ m2.keySet).map { k =>
6 (k, m1.getOrElse(k,0) + m2.getOrElse(k,0)) 7 }.toMap 8 } 9 } 10
11 val cnt_start = System.currentTimeMillis //> cnt_start : Long = 1470197392016
12 val merged = (pa merge pb merge pc) 13 .map(vowlCount) 14 .runFoldMap(identity).run //> merged : Map[Char,Int] = Map(E -> 7330, U -> 1483, A -> 4531, I -> 4393, O-> 3748)
15 println(s"calc vowl frequency in ${System.currentTimeMillis - cnt_start}ms") 16 //> calc vowl frequency in 28646ms
整个运算须要28646ms。实际上这些运算不会依赖每条记录的排列位置,那么若是可以实现并行运算的话可能会提升效率。scalaz-stream提供了merge.mergeN方法来支持对一顺数据流进行并行运算。merge.mergeN函数的款式以下:测试
/** * Merges non-deterministically processes that are output of the `source` process. * * Merging stops when all processes generated by source have stopped, and all source process stopped as well. * Merging will also stop when resulting process terminated. In that case the cleanup of all `source` * processes is run, followed by cleanup of resulting process. * * When one of the source processes fails the mergeN process will fail with that reason. * * Merging is non-deterministic, but is fair in sense that every process is consulted, once it has `A` ready. * That means processes that are `faster` provide it's `A` more often than slower processes. * * Internally mergeN keeps small buffer that reads ahead up to `n` values of `A` where `n` equals to number * of active source streams. That does not mean that every `source` process is consulted in this read-ahead * cache, it just tries to be as much fair as possible when processes provide their `A` on almost the same speed. * */ def mergeN[A](source: Process[Task, Process[Task, A]])(implicit S: Strategy): Process[Task, A] = scalaz.stream.nondeterminism.njoin(0, 0)(source)(S) /** * MergeN variant, that allows to specify maximum of open `source` processes. * If, the maxOpen is <= 0 it acts like standard mergeN, where the number of processes open is not limited. * However, when the maxOpen > 0, then at any time only `maxOpen` processes will be running at any time * * This allows for limiting the eventual concurrent processing of opened streams not only by supplied strategy, * but also by providing a `maxOpen` value. * * * @param maxOpen Max number of open (running) processes at a time * @param source source of processes to merge */ def mergeN[A](maxOpen: Int)(source: Process[Task, Process[Task, A]])(implicit S: Strategy): Process[Task, A] = scalaz.stream.nondeterminism.njoin(maxOpen, maxOpen)(source)(S)
mergeN的入参source类型款式是这样的:Process[Task,Process[Task,A]],意思是在Process里还有一个Process。这个内部Process是并行运算的。这样的类型款式也能够被理解为:内部的Process是读取数据库的记录(data),咱们能够同时从多个源头读取数据,外部Process是数据库链接(connection)。应用在咱们上面的例子里:内部Process就是vowlCount做业,由于咱们但愿对每条记录的vowlCount并行处理。那么咱们先要进行类型款式转换:从Process[Task,A] 转换到 Process[Task,Process[Task,A]]:this
1 val merged = (pa merge pb merge pc) //> merged : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Hal 2 //| t(End),Vector(<function1>))
3 val par = merged.map {text => Task {vowlCount(text)} } 4 .map {task => Process.eval(task)} //> par : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.stream.Process[scalaz.concurrent.Task,Map[Char,Int]]] = Append(Halt(End),Vector(<function1>))
这个par的类型是咱们但愿的了。如今咱们能够看看mergeN运算的效率:spa
1 val cnt_start = System.currentTimeMillis //> cnt_start : Long = 1470204623562
2 val merged = (pa merge pb merge pc) //> merged : scalaz.stream.Process[scalaz.concurrent.Task,String] = Append(Halt(End),Vector(<function1>))
3 val par = merged.map {text => Task {vowlCount(text)} } 4 .map {task => Process.eval(task)} //> par : scalaz.stream.Process[scalaz.concurrent.Task,scalaz.stream.Process[scalaz.concurrent.Task,Map[Char,Int]]] = Append(Halt(End),Vector(<function1>))
5 val resm = merge.mergeN(par).runFoldMap(identity).run 6 //> resm : Map[Char,Int] = Map(E -> 7330, U -> 1483, A -> 4531, I -> 4393, O -> 3748)
7 println(s"parallel calc vowl frequency in ${System.currentTimeMillis - cnt_start}ms") 8 //> parallel calc vowl frequency in 6922ms
看看这个结果:从28646ms降到6922,约莫4倍效率的提升,够显著的了。若是咱们把上面这个例子用在实际的数据库操做上:好比对几个数据库表里的全部在必定价格范围内商品购买次数进行统计等,咱们是能够在scalaz-stream里实现这个场景并行运算的。