Future/Promise 执行逻辑数组
scala Future 有几个要点,第一个是 tryAwait 须要借助 CowndownLatch 实现,第二个是能够在 Promise 挂载回调函数promise
首先,大体看下 Scala concurrent 的架构架构
DefaultPromise -> AbstractPromise -> Promise(concurrent) -> Promise[Trait] -> Future[Trait] -> Awaitableapp
在 package 外使用的 Promise 是 Promise[Trait], 其实 DefaultPromise 也是有 map, flatMap 方法的,只不过不能用而已,DefaultPromise 是 scala promise 的惟一实现类异步
我没能彻底理解 link promise 怎么实现垃圾回收的,
在 flatMap 中有一个 linkRootOf 函数,从 Promise 的注释中也能够看到 promise link 是一个很重要的概念,它解决了 flatMap 函数组合造成无限长的链后的 memory leak 问题ide
The ability to link DefaultPromises is needed to prevent memory leaks when using Future.flatMap. The previous implementation of Futhre.flatMap used onComplete handlers to propagate to the ultimate value of a flatMap operation to its promise. Recursive calls to flatMap built a chain of onComplete handlers and promises. Unfortunately none of the handlers or promises in the chain could be collected until the handers has been called detached, which only happended when the final flatMap future was completed. (In some situations, such as infinte streams, this would never actually happen.) Because of the fact that the promise implementation internally created references between promises, and these reference were invisible to user code, it was easy for user code to accidentally build large chains of promises and thereby leak memory.函数
结合 flatMap 函数理解ui
def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = { import impl.Promise.DefaultPromise val p = new DefaultPromise[S]() onComplete { case f: Failure[_] => p complete f.asInstanceOf[Failure[S]] case Success(v) => try f(v) match { // If possible, link DefaultPromises to avoid space leaks case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p) case fut => fut.onComplete(p.complete)(internalExecutor) } catch { case NonFatal(t) => p failure t } } p.future }
每次 flatMap 函数都会建立 DefaultPromise 变量,这个变量经过返回值传递到函数外,使它在上一层 scope 可见,若是无限建立不能被 GC 回收,那么内存很快就会被占满,而 stream 类型的数据流极可能就是无限长的,因此这个 DefaultPromise 变量必定要回收掉。this
// 添加 sleep 对分析控制流走向颇有帮助 Future { Thead.sleep(3000), 1 } .flatMap { x => { Thread.sleep(20000), 2} } .flatMap { y => { Thread.sleep(50000), 3} }
Stage 1:
spa
Future { Thread.sleep(3000); 1}
第一个 Future 调用 object Future.apply 方法,建立 PromiseCompletingRunnable, 放到线程池里运行,运行完毕后(几秒以后),会调用 promise complete Try 方法,此时还没调用。
Stage 2:
.flatMap { x => {Thread.sleep(20000), 2}}
complete 逻辑先不分析,而后是第一个 flatMap 方法,flatMap 方法在上面已经给出,不过我这里先把 flatMap 方法展开,去掉不重要或无关的代码
def flatMap(f: T => Future[S]): Future[S] val p = DefaultPromise[S] val callBackFunction = { case Success(v) => f(v) match case dp: DefaultPromise => dp.linkRootOf(p) } val runnable = new CallbackRunnable(callbackFunction) getState match case r: Try => runnable(r) case DefaultPromise => compressRoot().dispatcherOrCallback(runnable) case listener: List[] => updateState(listenr, runnable::listener) p.future
flatMap 实际上只作了回调函数注册的功能,在上面的 promise complete 执行时,会调用这些 callbackFunction.
DefaultPromise 初始化时,State = Nil, 因此注册回调函数的时候,state 会被设置成 runnable.
Stage 3:
第一个 flatMap 函数执行,假设
f0 = Future{} f1 = f0.flatMap {} // f0.state = runnable f2 = f1.flatMap {} // f1.state = runnable
那么 stage 3 就是在 f1 上添加回调函数
Stage 3:
假设,第一个 Future 运算完毕,开始返回,promise complete result 开始执行了,complete 调用 tryComplete 函数
def tryComplete(r: Try) getState match case list: List[] => updateState(list, r); list.foreach(exec) case DefaultPromise => ...
返回值为 Success(1), 执行刚才注册的回调函数 callBackFunction, f(v) 返回 Future 类型,其实是 DefaultPromise 类型,这个操做也是经过线程池调用,异步执行,而后走到 dp.linkRootOf(p)
,注意,这个 dp 再也不是 this 了,而是新产生的 Future, 而 p2 是 flatMap 里新建立的。
Stage 4:
private def link(target: DefaultPromise[T]): Unit = if (this ne target) { getState match { case r: Try[_] => if (!target.tryComplete(r.asInstanceOf[Try[T]])) { // Currently linking is done from Future.flatMap, which should ensure only // one promise can be completed. Therefore this situation is unexpected. throw new IllegalStateException("Cannot link completed promises together") } case _: DefaultPromise[_] => compressedRoot().link(target) case listeners: List[_] => if (updateState(listeners, target)) { if (!listeners.isEmpty) listeners.asInstanceOf[List[CallbackRunnable[T]]].foreach(target.dispatchOrAddCallback(_)) } else link(target) } }
由于 dp 是新建立的,且当前值还未返回(异步执行中),state = Nil, 因此这里会把状态更新为 target 也就是 p2, 没有须要执行的回调函数。
Stage 5:
当 f2 返回了,会执行 promise complete try, 进入 tryComplete 逻辑,上一次,tryComplete 走的是 List() 分支,而此次,由于 state 上 Stage 4 换成了 target, 也就是 P2, 因此此次改走 DefaultPromise 分支,调用 P2 上的 Listener 也就是第三个 flatMap 的逻辑。这样,chain 就跑起来了
Stage 6:
第二个 flatMap 依然执行建立 DefaultPromise, 注册回调函数的逻辑,