scala支持Java的多线程模型, 也继承了多线程固有的资源竞争和死锁问题.html
做为一种函数式编程语言, scala的actor消息模型提供了一种更便捷更安全的并发编程方案.编程
scala的线程模型来自于Java. 首先咱们要拓展一个Runable或Callable, 并重写run方法安全
trait Runnable { def run(): Unit }
Callable与Runable相似,可是有一个返回值:多线程
trait Callable[V] { def call(): V }
Thread须要一个Runable实例做为参数来建立:并发
scala> val thread = new Thread(new Runnable { | def run() { | println("hello world") | } | }) thread: Thread = Thread[Thread-2,5,main] scala> thread.start() hello world
synchronized
是JVM中最简单的使用互斥锁的方式:负载均衡
class User { var name: String = ""; def setName(nameArg :String) { this.synchronized { this.name = nameArg; } } }
当线程开始执行obj.synchronized
块中的代码前, 它将尝试得到对象obj
的锁, 若获取失败则线程进入阻塞状态.异步
当某个线程得到了对象的锁后, 其它线程就没法访问或修改该对象. 当obj.synchronized
块中的代码执行完成时, 线程会解除锁, 另外一个线程就能够加锁并访问对象了.编程语言
scala提供了Promise-Future-Callback异步模型:函数式编程
Future 表示一个尚未完成的任务的结果, Future对象能够在任务完成前访问函数
Promise 表示一个尚未执行的任务, 能够经过Promise标记任务的状态
Callback 回调用于在任务完成或其它状况下执行的操做
import scala.concurrent.{Await, Future} import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global object FutureDemo extends App { val f = Future { println("working on future task") Thread.sleep(100) 1+1 } println("waiting for future task complete") val result = Await.result(f, 1 second) println(result) }
执行异步任务须要上下文, ExecutionContext.Implicits.global
是使用当前的全局上下文做为隐式上下文.
引入.duration._
容许咱们使用1 second
, 200 milli
, 2 minute
这样的时间间隔字面值.
上述示例中Await.result
使用阻塞的方式等待Future任务完成, 若Future超时未完成则抛出TimeoutException
异常.
屡次运行上述示例就会发现, 两条提示输出顺序是不肯定的. 这是由于Future中的代码是在独立线程中执行的.
更好的方式是采用回调的方式来处理Future结果:
import scala.concurrent.{Future} import scala.concurrent.ExecutionContext.Implicits.global import scala.util.{Failure, Success} object FutureDemo2 extends App { val f = Future { 1 + 2 } f.onComplete{ case Success(value) => println(value) case Failure(e) => e.printStackTrace } }
或者定义onSuccess
和onFailure
两个回调.
import scala.concurrent.{Future} import scala.concurrent.ExecutionContext.Implicits.global object FutureDemo2 extends App { val f = Future { 1 + 2 } f.onSuccess { case value => println(value) } f.onFailure { case e => e.printStackTrace } }
Actor是一个基于消息机制的并发模型, 自Scala 2.11以后Akka Actor已成为Scala事实上的Actor标准.
akka不是scala的默认包, 这里咱们使用SBT来管理外部包依赖. 关于sbt的使用能够参见做者的另外一篇博文Scala构建工具SBT.
在build.sbt
中添加下列代码, 引入akka依赖.
scalaVersion := "2.12.1" resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.17"
更多关于引入akka的内容能够参见akka官网.
import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class HelloActor extends Actor { def receive() = { case "hello" => println("Hi, I am an actor."); case _ => println("?"); } } object Main extends App { val system = ActorSystem("HelloSystem"); val helloActor = system.actorOf(Props[HelloActor], name = "helloactor"); helloActor ! "hello"; helloActor ! "bye"; system.shutdown(); }
自定义类继承Actor并重写receive方法处理不一样类型的消息. 这里使用String类进行模式匹配, 使用case class进行模式匹配能够传递更多信息.
Actor须要ActorSystem的事件循环提供支持, 初始化一个ActorSystem后事件循环开始运行.最后必须执行system.shutdown();
不然scala程序会一直运行下去.
!
是用于发送消息的操做符, helloActor ! "hello";
将消息"hello"
发送给了helloActor.
receive
方法的返回值类型是PartialFunction[Any, Unit]
. 全部发送给Actor的消息都将被receive返回的偏函数处理.
偏函数的返回值类型为Unit, 也就是说处理消息时必须依赖反作用而不能有返回值; 偏函数的参数类型为Any, 也就是说全部消息在传入的时候都会发生类型丢失.
非类型化的消息便于设计消息转发, 负载均衡和代理Actor等机制, 且由于基于模式匹配的消息处理, 非类型化并不会产生问题.
基于事件循环的非阻塞机制已经被广为使用, 这里简单说明Actor与线程的问题.Actor并不是与线程一一对应, 一个线程能够为多个Actor服务. ActorSystem会根据实际状况选择线程数.