SDP(13): Scala.Future - far from completion,毫不能用来作甩手掌柜

  在前面几篇关于数据库引擎的讨论里不少的运算函数都返回了scala.Future类型的结果,由于我觉得这样就能够很方便的实现了non-blocking效果。不管任何复杂的数据处理操做,只要把它们包在一个Future{...}里扔给系统运算就算完事不理了,立刻能够把关注放到编程的其它部分了。在3月17日的深圳scala用户meetup里我作了个关于scala函数式编程的分享,里面我提到如今使用最多的函数组件就是scala.Future了。我想这应该在scala用户群里是个比较广泛的现象:你们都认为这是实现non-blocking最直接的一种方式。不过当我在meetup后回想到scala.Future时忽然意识到它是一种即时运算值strict-value,看看下面这个例子:java

 import scala.concurrent.duration._ val fs = Future {println("run now..."); System.currentTimeMillis() } //> run now... //| fs : scala.concurrent.Future[Long] = List()
  Await.result(fs, 1.second)             //> res0: Long = 1465907784714
  Thread.sleep(1000) Await.result(fs, 1.second)             //> res1: Long = 1465907784714

能够看到fs是在Future构建时即时运算的,并且只会运算一次。若是scala Future中包括了能产生反作用的代码,在构建时就会当即产生反作用。因此咱们是没法使用scala Future来编写纯函数的,以下:git

val progA:Future[A] = for { b <- readFromB _ <- writeToLocationA(a) r <- getResult } yield r /* location A content updated */ ... /* later */ val progB: Future[B] = for { a <- readFromA _ <- updateLocationA c <- getResult } ... val program: Future[Unit] = for { _ <- progA _ <- progB } yield()

在上面这个例子里最终的目的是运算program:由progA,progB两个子程序组成。这两个子程序在构建的时候已经开始了运算,随时都会更新localionA产生反作用。想象一下若是progA,progB是埋藏在其它一大堆源代码里的话program的运算结果确定是没法预测的。换言之用Future来进行函数式组合就是在给本身挖坑嘛,最起码要记住这些Future的构建顺序,而这个要求在大型的协做开发软件工程里基本上是不可能的事。除了没法安全进行函数组合外scala.Future还缺乏运算和线程控制的功能,好比:github

没法控制何时开始运算算法

没法控制在在哪一个线程运算数据库

没法终止开始运算的程序express

缺乏有效的异常处理机制如fallback,retry等编程

scalaz和monix函数组件库里都提供了Task来辅助Future实现函数组合。scalaz.Task是基于scalaz.Future的:数组

sealed abstract class Future[+A] { ... object Future { case class Now[+A](a: A) extends Future[A] case class Async[+A](onFinish: (A => Trampoline[Unit]) => Unit) extends Future[A] case class Suspend[+A](thunk: () => Future[A]) extends Future[A] case class BindSuspend[A,B](thunk: () => Future[A], f: A => Future[B]) extends Future[B] case class BindAsync[A,B](onFinish: (A => Trampoline[Unit]) => Unit, f: A => Future[B]) extends Future[B] ...

scalaz.Future[A]明显就是个Free Monad。它的结构化表达方式分别有Now,Async,Suspend,BindSuspend,BindAsync。咱们能够用这些结构实现flatMap函数,因此Future就是Free Monad:安全

def flatMap[B](f: A => Future[B]): Future[B] = this match { case Now(a) => Suspend(() => f(a)) case Suspend(thunk) => BindSuspend(thunk, f) case Async(listen) => BindAsync(listen, f) case BindSuspend(thunk, g) => Suspend(() => BindSuspend(thunk, g andThen (_ flatMap f))) case BindAsync(listen, g) => Suspend(() => BindAsync(listen, g andThen (_ flatMap f))) }

由于free structure类型支持算式/算法关注分离,咱们能够用scalaz.Future来描述程序功能而不涉及正真运算。这样,在上面那个例子里若是progA,progB是Task类型的,那么program的构建就是安全的,由于咱们最后是用Task.run来真正进行运算产生反作用的。scalaz.Task又在scalaz.Future功能基础上再增长了异常处理等功能。app

monix.Task采起了延迟运算的方式来实现算式/算法分离,下面是这个类型的基础构建结构:

  /** [[Task]] state describing an immediate synchronous value. */
  private[eval] final case class Now[A](value: A) extends Task[A] {...} /** [[Task]] state describing an immediate synchronous value. */
  private[eval] final case class Eval[A](thunk: () => A) extends Task[A] /** Internal state, the result of [[Task.defer]] */
  private[eval] final case class Suspend[+A](thunk: () => Task[A]) extends Task[A] /** Internal [[Task]] state that is the result of applying `flatMap`. */
  private[eval] final case class FlatMap[A, B](source: Task[A], f: A => Task[B]) extends Task[B] /** Internal [[Coeval]] state that is the result of applying `map`. */
  private[eval] final case class Map[S, +A](source: Task[S], f: S => A, index: Int) extends Task[A] with (S => Task[A]) { def apply(value: S): Task[A] =
      new Now(f(value)) override def toString: String = super[Task].toString } /** Constructs a lazy [[Task]] instance whose result will * be computed asynchronously. * * Unsafe to build directly, only use if you know what you're doing. * For building `Async` instances safely, see [[create]]. */
  private[eval] final case class Async[+A](register: (Context, Callback[A]) => Unit) extends Task[A] 

下面的例子里示范了若是用这些结构来构件monix.Task: 

object Task extends TaskInstancesLevel1 {
  /** Returns a new task that, when executed, will emit the result of
    * the given function, executed asynchronously.
    *
    * This operation is the equivalent of:
    * {{{
    *   Task.eval(f).executeAsync
    * }}}
    *
    * @param f is the callback to execute asynchronously
    */
  def apply[A](f: => A): Task[A] =
    eval(f).executeAsync

  /** Returns a `Task` that on execution is always successful, emitting
    * the given strict value.
    */
  def now[A](a: A): Task[A] =
    Task.Now(a)

  /** Lifts a value into the task context. Alias for [[now]]. */
  def pure[A](a: A): Task[A] = now(a)

  /** Returns a task that on execution is always finishing in error
    * emitting the specified exception.
    */
  def raiseError[A](ex: Throwable): Task[A] =
    Error(ex)

  /** Promote a non-strict value representing a Task to a Task of the
    * same type.
    */
  def defer[A](fa: => Task[A]): Task[A] =
    Suspend(fa _)
...}
    source match {
      case Task.Now(v) => F.pure(v)
      case Task.Error(e) => F.raiseError(e)
      case Task.Eval(thunk) => F.delay(thunk())
      case Task.Suspend(thunk) => F.suspend(to(thunk()))
      case other => suspend(other)(F)
    }

这个Suspend结构就是延迟运算的核心。monix.Task是一套新出现的解决方案,借鉴了许多scalaz.Task的概念和方法同时又加入了不少优化、附加的功能,而且github更新也很近期。使用monix.Task应该是一个正确的选择。

首先咱们必须解决scala.Future与monix.Task之间的转换:

 import monix.eval.Task import monix.execution.Scheduler.Implicits.global final class FutureToTask[A](x: => Future[A]) { def asTask: Task[A] = Task.deferFuture[A(x) } final class TaskToFuture[A](x: => Task[A]) { def asFuture: Future[A] = x.runAsync }

下面是一个完整的Task用例:

import scala.concurrent._ import scala.util._ import scala.concurrent.duration._ import monix.eval.Task import monix.execution._ object MonixTask extends App { import monix.execution.Scheduler.Implicits.global

  // Executing a sum, which (due to the semantics of apply) // will happen on another thread. Nothing happens on building // this instance though, this expression is pure, being // just a spec! Task by default has lazy behavior ;-)
  val task = Task { 1 + 1 } // Tasks get evaluated only on runAsync! // Callback style:
  val cancelable = task.runOnComplete { case Success(value) => println(value) case Failure(ex) => System.out.println(s"ERROR: ${ex.getMessage}") } //=> 2 // If we change our mind...
 cancelable.cancel() // Or you can convert it into a Future
  val future: CancelableFuture[Int] = task.runAsync // Printing the result asynchronously
  future.foreach(println) //=> 2
 val task = Task.now { println("Effect"); "Hello!" } //=> Effect // task: monix.eval.Task[String] = Delay(Now(Hello!))
}

下面咱们就看看各类Task的构建方法:

  /* ------ taskNow ----*/ val taskNow = Task.now { println("Effect"); "Hello!" } //=> Effect // taskNow: monix.eval.Task[String] = Delay(Now(Hello!))

  /* --------taskDelay possible another on thread ------*/ val taskDelay = Task { println("Effect"); "Hello!" } // taskDelay: monix.eval.Task[String] = Delay(Always(<function0>))
 taskDelay.runAsync.foreach(println) //=> Effect //=> Hello! // The evaluation (and thus all contained side effects) // gets triggered on each runAsync:
  taskDelay.runAsync.foreach(println) //=> Effect //=> Hello!

  /* --------taskOnce ------- */ val taskOnce = Task.evalOnce { println("Effect"); "Hello!" } // taskOnce: monix.eval.Task[String] = EvalOnce(<function0>)
 taskOnce.runAsync.foreach(println) //=> Effect //=> Hello! // Result was memoized on the first run!
  taskOnce.runAsync.foreach(println) //=> Hello!

  /* --------taskFork ------- */
  // this guarantees that our task will get executed asynchronously:
  val task = Task(Task.eval("Hello!")).executeAsync //val task = Task.fork(Task.eval("Hello!")) // The default scheduler
  import monix.execution.Scheduler.Implicits.global

  // Creating a special scheduler meant for I/O
 import monix.execution.Scheduler lazy val io = Scheduler.io(name="my-io") //Then we can manage what executes on which: // Override the default Scheduler by fork:
  val source = Task(println(s"Running on thread: ${Thread.currentThread.getName}")) val forked = source.executeOn(io,true) // val forked = Task.fork(source, io)
 source.runAsync //=> Running on thread: ForkJoinPool-1-worker-1
 forked.runAsync //=> Running on thread: my-io-4

  /* --------taskError ------- */ import scala.concurrent.TimeoutException val taskError = Task.raiseError[Int](new TimeoutException) // error: monix.eval.Task[Int] = // Delay(Error(java.util.concurrent.TimeoutException))
 taskError.runOnComplete(result => println(result)) //=> Failure(java.util.concurrent.TimeoutException)

下面是一些控制函数:

  final def doOnFinish(f: Option[Throwable] => Task[Unit]): Task[A] = final def doOnCancel(callback: Task[Unit]): Task[A] = final def onCancelRaiseError(e: Throwable): Task[A] = final def onErrorRecoverWith[B >: A](pf: PartialFunction[Throwable, Task[B]]): Task[B] = final def onErrorHandleWith[B >: A](f: Throwable => Task[B]): Task[B] = final def onErrorFallbackTo[B >: A](that: Task[B]): Task[B] = final def restartUntil(p: (A) => Boolean): Task[A] = final def onErrorRestart(maxRetries: Long): Task[A] = final def onErrorRestartIf(p: Throwable => Boolean): Task[A] = final def onErrorRestartLoop[S, B >: A](initial: S)(f: (Throwable, S, S => Task[B]) => Task[B]): Task[B] = final def onErrorHandle[U >: A](f: Throwable => U): Task[U] = final def onErrorRecover[U >: A](pf: PartialFunction[Throwable, U]): Task[U] =

Task是经过asyncRun和runSync来进行异步、同步实际运算的: 

  def runAsync(implicit s: Scheduler): CancelableFuture[A] = def runAsync(cb: Callback[A])(implicit s: Scheduler): Cancelable = def runAsyncOpt(implicit s: Scheduler, opts: Options): CancelableFuture[A] = def runAsyncOpt(cb: Callback[A])(implicit s: Scheduler, opts: Options): Cancelable = final def runSyncMaybe(implicit s: Scheduler): Either[CancelableFuture[A], A] = final def runSyncMaybeOpt(implicit s: Scheduler, opts: Options): Either[CancelableFuture[A], A] = final def runSyncUnsafe(timeout: Duration) (implicit s: Scheduler, permit: CanBlock): A = final def runSyncUnsafeOpt(timeout: Duration) (implicit s: Scheduler, opts: Options, permit: CanBlock): A = final def runOnComplete(f: Try[A] => Unit)(implicit s: Scheduler): Cancelable =

下面示范了两个一般的Task运算方法:

  val task1 = Task {println("sum:"); 1+2}.delayExecution(1 second) println(task1.runSyncUnsafe(2 seconds)) task1.runOnComplete { case Success(r) => println(s"result: $r") case Failure(e) => println(e.getMessage) }

下面是本次示范的源代码:

import scala.util._ import scala.concurrent.duration._ import monix.eval.Task import monix.execution._ object MonixTask extends App { import monix.execution.Scheduler.Implicits.global



  // Executing a sum, which (due to the semantics of apply) // will happen on another thread. Nothing happens on building // this instance though, this expression is pure, being // just a spec! Task by default has lazy behavior ;-)
  val task = Task { 1 + 1 } // Tasks get evaluated only on runAsync! // Callback style:
  val cancelable = task.runOnComplete { case Success(value) => println(value) case Failure(ex) => System.out.println(s"ERROR: ${ex.getMessage}") } //=> 2 // If we change our mind...
 cancelable.cancel() // Or you can convert it into a Future
  val future: CancelableFuture[Int] = task.runAsync // Printing the result asynchronously
  future.foreach(println) //=> 2

  /* ------ taskNow ----*/ val taskNow = Task.now { println("Effect"); "Hello!" } //=> Effect // taskNow: monix.eval.Task[String] = Delay(Now(Hello!))

  /* --------taskDelay possible another on thread ------*/ val taskDelay = Task { println("Effect"); "Hello!" } // taskDelay: monix.eval.Task[String] = Delay(Always(<function0>))
 taskDelay.runAsync.foreach(println) //=> Effect //=> Hello! // The evaluation (and thus all contained side effects) // gets triggered on each runAsync:
  taskDelay.runAsync.foreach(println) //=> Effect //=> Hello!

  /* --------taskOnce ------- */ val taskOnce = Task.evalOnce { println("Effect"); "Hello!" } // taskOnce: monix.eval.Task[String] = EvalOnce(<function0>)
 taskOnce.runAsync.foreach(println) //=> Effect //=> Hello! // Result was memoized on the first run!
  taskOnce.runAsync.foreach(println) //=> Hello!

  /* --------taskFork ------- */
  // this guarantees that our task will get executed asynchronously:
  val task = Task(Task.eval("Hello!")).executeAsync //val task = Task.fork(Task.eval("Hello!")) // The default scheduler
  import monix.execution.Scheduler.Implicits.global

  // Creating a special scheduler meant for I/O
 import monix.execution.Scheduler lazy val io = Scheduler.io(name="my-io") //Then we can manage what executes on which: // Override the default Scheduler by fork:
  val source = Task(println(s"Running on thread: ${Thread.currentThread.getName}")) val forked = source.executeOn(io,true) // val forked = Task.fork(source, io)
 source.runAsync //=> Running on thread: ForkJoinPool-1-worker-1
 forked.runAsync //=> Running on thread: my-io-4

  /* --------taskError ------- */ import scala.concurrent.TimeoutException val taskError = Task.raiseError[Int](new TimeoutException) // error: monix.eval.Task[Int] = // Delay(Error(java.util.concurrent.TimeoutException))
 taskError.runOnComplete(result => println(result)) //=> Failure(java.util.concurrent.TimeoutException)
 val task1 = Task {println("sum:"); 1+2}.delayExecution(1 second) println(task1.runSyncUnsafe(2 seconds)) task1.runOnComplete { case Success(r) => println(s"result: $r") case Failure(e) => println(e.getMessage) } }
相关文章
相关标签/搜索