做为专业的编程人员,咱们常常会由于工做须要创建一些工具库。所谓工具库就是针对工做上常常会遇到的一些共性问题预先编制的由一整套函数所组成的函数库。一般这些工具库的功能都是在特别定制的一些数据类型支持下由一系列函数围绕着这些数据类型进行运算而实现的。在泛函编程范畴内也不例外。但在泛函工具库里的函数则更重视函数的组合能力(functional composition);于是泛函的工具库通常称为组件库(combinator library),库内函数则被称之为组件(combinator)。组件库的设计者对函数设计有着共通的最基本目标:经过对组件进行各类函数组合能够实现更大的功能。泛函组件库设计通常针对特别的功能需求或课题:首先尝试用一些数据类型来表述课题需求,而后围绕这些特制的数据类型设计一系列函数针对课题各个最基本需求范畴提供解决方法。咱们在这节讨论中从一个并行运算组件库的设计过程来介绍泛函组件库设计模式。java
咱们设计这个并行运算组件库的目的:能够把一个普通运算放到另一个独立的线程(thread)中去运行。这样咱们能够同时把多个运算分别放到多个线程中同时运行从而达到并行运算的目的。问题简单明确,但如何对这些在各自独立运行空间的运算进行组合(composition)、变形(transformation)则值得仔细思量。程序员
先从数据类型着手:一个并行运算应该像是一个容器,把一个普通运算封装在里面。咱们来随便造个结构出来:Par[A],A是普通运算返回的结果类型。这个Par类型很像咱们前面接触的高阶类型,那个承载A类型元素的管子类型。若是这样去想的话,咱们能够用前面全部针对高阶类型的函数对管子内的元素A进行操做处理。那么若是一个运算是封装在Par里在另外一个线程中运算完成后老是须要一个方法把结果取出来。这样咱们能够先得出两个最基本的函数:算法
1 def unit[A](a: A): Par[A] //把一个普通运算注入Par。把A升格到一个并行运算
2 def get[A](pa: Par[A]): A //把并行运行结果抽取出来
下一个问题是运行线程控制:是由程序员来决定一个运算该放到一个新的线程里仍是固定每个运算都用新的独立线程?假设咱们选择用由程序员调用一个函数来肯定产生新线程。这样有两个优越:一、能够有更灵活的并行运算策略(有些已经肯定很快完成的运算可能没有必要用新的线程,独立线程运算可能消耗更多的资源);二、独立线程机制和并行运算是松散耦合的:Par的实现中不须要了解线程管理机制。这个函数的款式以下:编程
def fork[A](pa: Par[A]): Par[A] //为pa设定一个新的运行空间。并不改变pa,仍是返回Par[A]
那么把一个运算放到一个新的线程里运行能够用这个函数表达:设计模式
def async[A](a: => A): Par[A] = fork(unit(a)) //不须要了解任何关于Par的信息。知道fork会为这个运算设定新的运行空间。注意仍是返回Par[A]
由于咱们追求的是线程机制和并行运算的松散耦合,那么咱们就不会在Par里实际进行并行运算的运行,那么Par就只是对一个并行运算的描述。fork的返回仍是Par,只是增长了对运算环境的描述,也不会真正运行算法。这样来讲Par若是是一个运算描述,那么咱们就须要一个真正的运行机制来获取运算结果了:数组
1 def run[A](pa: Par[A]): A //因为Par的意义从容器变成运算描述,咱们把get从新命名为run
咱们就须要在run的函数实现方法里进行线程管理、计算运行等真正Par的运行了。多线程
如今Par的表达形式包括以下:async
1 def unit[A](a: A): Par[A] //把一个普通运算注入Par。把A升格到一个并行运算描述
2 def fork[A](pa: Par[A]): Par[A] //为pa设定一个新的运行空间。返回的结果Par必须经run来运行并获取结果
3 def async[A](a: => A): Par[A] = fork(unit(a)) //不须要了解任何关于Par的信息。注意仍是返回Par[A]
4 def run[A](pa: Par[A]): A //运行pa并抽取运算结果
应该是在v1.6之后吧,java API包含了java.util.concurrent包,其中包括了ExecutorService类提供线程管理方面的支持。ExecutorService和Future类翻译成scala以下: 函数
class ExecutorService { def submit[A](a: Callable[A]): Future[A] } trait Future[A] { def get: A def get(timeout: Long, unit: TimeUnit): A def cancel(evenIfRunning: Boolean): Boolean def isDone: Boolean def isCancelled: Boolean }
咱们不须要进入多线程编程底层细节,用java Concurrent ExecutorService足够了。ExecutorService提供了以Callable形式向系统提交需运算任务方式;系统当即返回Future,咱们能够用Future.get以锁定线程方式读取运算。因为运算结果读取是以锁定线程(blocking)形式进行的,那么使用get的时间节点就很重要了:若是提交一个运算后下一步直接get就会当即锁定线程直至运算完成,那咱们就没法获得任何并行运算效果了。Future还提供了运行状态和中断运行等功能为编程人员提供更强大灵活的运算控制。为了获取更灵活的控制,Par的返回值应该从直接锁定线程读取A改为不会产生锁定线程效果的Future:工具
1 type Par[A] = ExecutorService => Future[A] 2 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)
如今Par的含义又从一个数据类型变成了一个函数描述了:传入一个ExecutorService,返回Future。咱们能够用run来运行这个函数,系统会当即返回Future,无需任何等待。
下面让咱们把这些最基本的函数都实现了:
1 object par { 2 import java.util.concurrent._ 3
4 type Par[A] = ExecutorService => Future[A] 5 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es) 6 //> run: [A](es: java.util.concurrent.ExecutorService)(pa: ch7.par.Par[A])java.u 7 //| til.concurrent.Future[A]
8
9 def unit[A](a: A): Par[A] = es => { 10 new Future[A] { 11 def get: A = a 12 def isDone = true
13 def isCancelled = false
14 def get(timeOut: Long, timeUnit: TimeUnit): A = get
15 def cancel(evenIfRunning: Boolean): Boolean = false
16 } 17 } //> unit: [A](a: A)ch7.par.Par[A]
18 def fork[A](pa: Par[A]): Par[A] = es => { 19 es.submit[A](new Callable[A] { 20 def call: A = run(es)(pa).get
21 }) 22 } //> fork: [A](pa: ch7.par.Par[A])ch7.par.Par[A]
23 def async[A](a: => A): Par[A] = fork(unit(a)) //> async: [A](a: => A)ch7.par.Par[A]
24
25 val a = unit(4+7) //> a : ch7.par.Par[Int] = <function1>
26 val b = async(2+1) //> b : ch7.par.Par[Int] = <function1>
27 val es = Executors.newCachedThreadPool() //> es : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPool 28 //| Executor@71be98f5[Running, pool size = 0, active threads = 0, queued tasks = 29 //| 0, completed tasks = 0]
30 run(es)(b).get //> res0: Int = 3
31 run(es)(a).get //> res1: Int = 11
32 es.shutdown() 33
34 }
从应用例子里咱们能够了解线程的管理是由现有的java工具提供的(Executors.newCachedThreadPool),咱们无须了解线程管理细节。咱们同时肯定了线程的管理机制与咱们设计的并行运算Par是松散耦合的。
注意:unit并无使用ExecutorService es, 而是直接返回一个注明完成运算(isDone=true)的Future,这个Future的get就是unit的传入参数a。若是咱们再用这个Future的get来得取表达式的运算结果的话,这个运算是在当前主线程中运行的。async经过fork选择新的线程;并向新的运行环境提交了运算任务。咱们来分析一下运算流程:
一、val a = unit(4+7),unit构建了一个完成的 new Future; isDone=true,设置了 Future.get = 4 + 7,run(es)(a)在主线程中对表达式 4+7 进行了运算并得取结果 11。
二、val b = async(2+1) >>> fork(unit(2+1)), run(es)(b) >>> submit(new Callable), 注意 def call = run(es)(b).get : 这里提交的运算run(es)(b).get实际上又提交了一次运算并直接锁定线程(blocking)等待读取运算结果。第一次提交Callable又须要锁定线程等待提交运算完成计算。若是线程池只能提供一个线程的话,第一次提交了Callable会占用这个惟一的线程并等待第二次提交运算得出的结果,因为没有线程能够提供给二次提交运算,这个运算永远没法获得结果,那么run(es)(b).get就会产生死锁了(dead lock)。
咱们在这节介绍了一个简单的泛函并行组件库设计,能够把一个运算放到主线程以外的另外一个新的线程中计算。可是抽取运算结果却仍是会锁定线程(blocking)。咱们下一节将会讨论如何经过一些算法函数来实现并行运算。