本文由 GodPan 发表在 ScalaCool 团队博客。html
经过前几篇的学习,相信你们对Akka应该有所了解了,都说解决并发哪家强,JVM上面找Akka,那么Akka到底在解决并发问题上帮咱们作了什么呢?java
众所周知,在处理并发问题上面,最核心的一部分就是如何处理共享内存,不少时候咱们都须要花费不少时间和精力在共享内存上,那么在学习Akka对共享内存是如何管理以前,咱们先来看看Java中是怎么处理这个问题的。缓存
相信对Java并发有所了解的同窗都应该知道在Java5推出JSR 133后,Java对内存管理有了更高标准的规范了,这使咱们开发并发程序也有更好的标准了,不会有一些模糊的定义致使的没法肯定的错误。安全
首先来看看一下Java内存模型的简单构图:多线程
从图中咱们能够看到咱们线程都有本身的一个工做内存,这就比如高速缓存,它是对主内存部分数据的拷贝,线程对本身工做内存的操做速度远远快于对主内存的操做,但这也每每会引发共享变量不一致的问题,好比如下一个场景:并发
int a = 0;
public void setA() {
a = a + 1;
}复制代码
上面是一个很简单的例子,a是一个全局变量,而后咱们有一个方法去修改这个值,每次增长一,假如咱们用100个线程去运行这段代码,那a最终的结果会是多少呢?
100?显然不必定,它多是80,90,或者其余数,这就形成共享变量不一致的问题,那么为何会致使这个问题呢,就是咱们上面所说的,线程去修改a的时候可能就只是修改了本身工做内存中a的副本,但并无将a的值及时的刷新到主内存中,这便会致使其余线程可能读到未被修改a的值,最终出现变量不一致问题。框架
那么Java中是怎么处理这种问题,如何保证共享变量的一致性的呢?ide
大致上Java中有3类同步机制,但它们所解决的问题并不相同,咱们先来看一看这三种机制:学习
写过Java程序的同窗对这个关键词应该再熟悉不过了,其基本含义就是不可变,不可变变量,好比:优化
final int a = 10;
final String b = "hello";复制代码
不可变的含义在于当你对这些变量或者对象赋初值后,不能再从新去赋值,但对于对象来讲,咱们不能修改的是它的引用,可是对象内的内容仍是能够修改的。下面是一个简单的例子:
final User u = new User(1,"a");
u.id = 2; //能够修改
u = new User(2,"b"); //不可修改复制代码
因此在利用final关键词用来保证共享变量的一致性时必定要了解清楚本身的需求,选择合适的方法,另外final变量必须在定义或者构建对象的时候进行初始化,否则会报错。
不少同窗在遇到共享变量不一致的问题后,都会说我在声明变量前加一个volatile就行了,但事实真是这样嘛?答案显然不是。那咱们来看看volatile到底为咱们作了什么。
前面咱们说过每一个线程都有本身的工做内存,不少时候线程去修改一个变量的值只是修改了本身工做内存中副本的值,这便会致使主内存的值并非最新的,其余线程读取到的变量便会出现问题。volatile帮咱们解决了这个问题,它有两个特色:
举个例子:
volatile int a = 0;
public void setA() {
a = a + 1;
}复制代码
如今线程在执行这段代码时,都会强制去主内存中读取变量的值,修改后也会立刻更新到主内存中去,可是这真的能解决共享变量不一致的问题嘛,其实否则,好比咱们有这么一个场景:两个线程同时读取了主内存中变量最新的值,这是咱们两个线程都去执行修改操做,最后结果会是什么呢?这里就留给你们本身去思考了,其实也很简单的。
那么volatile在什么场景下能保证线程安全,按照官方来讲,有如下两个条件:
多的方面这里我就不展开了,推荐两篇我以为写的还不错的文章:volatile的使用及其原理volatile的适用场景
不少同窗在学习Java并发过程当中最早接触的就是synchronized关键词了,它确实能解决咱们上述的并发问题,那它到时如何帮咱们保证共享变量的一致性的呢?
简而言之的说,线程在访问请求用synchronized关键词修饰的方法,代码块都会要求得到一个监视器锁,当线程得到了监视器锁后,它才有权限去执行相应的方法或代码块,并在执行结束后释放监视器锁,这便能保证共享内存的一致性了,由于本文主要是讲Akka的共享内存,过多的篇幅就不展开了,这里推荐一篇解析synchronized原理很不错的文章,有兴趣的同窗能够去看看:Synchronized及其实现原理
Akka中的共享内存是基于Actor模型的,Actor模型提倡的是:经过通信来实现共享内存,而不是用共享内存来实现通信,这点是跟Java解决共享内存最大的区别,举个例子:
在Java中咱们要去操做共享内存中数据时,每一个线程都须要不断的获取共享内存的监视器锁,而后将操做后的数据暴露给其余线程访问使用,用共享内存来实现各个线程之间的通信,而在Akka中咱们能够将共享可变的变量做为一个Actor内部的状态,利用Actor模型自己串行处理消息的机制来保证变量的一致性。
固然要使用Akka中的机制也必须知足一下两条原则:
第二个原则很好理解,就是上面咱们说的Actor内部是串行处理消息,那咱们来看看第一个原则,为何要保证消息的发送先于消息的接收,是为了防止咱们在建立消息的时候发生了不肯定的错误,接收者将可能接收到不正确的消息,致使发生奇怪的异常,主要表现为消息对象未初始化完整时,若没有这条规则保证,Actor收到的消息便会不完整。
经过前面的学习咱们知道Actor是一种比线程更轻量级,抽象程度更高的一种结构,它帮咱们规避了咱们本身去操做线程,那么Akka底层究竟是怎么帮咱们去保证共享内存的一致性的呢?
一个Actor它可能会有不少线程同时向它发送消息,以前咱们也说到Actor自己是串行处理的消息的,那它是如何保障这种机制的呢?
Mailbox在Actor模型是一个很重要的概念,咱们都知道向一个Actor发送的消息首先都会被存储到它所对应的Mailbox中,那么咱们先来看看MailBox的定义结构(本文所引用的代码都在akka.dispatch.Mailbox.scala中,有兴趣的同窗也能够去研究一下):
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable {}复制代码
很清晰Mailbox内部维护了一个messageQueue这样的消息队列,并继承了Scala自身定义的ForkJoinTask任务执行类和咱们很熟悉的Runnable接口,由此能够看出,Mailbox底层仍是利用Java中的线程进行处理的。那么咱们先来看看它的run方法:
override final def run(): Unit = {
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
processMailbox() //Then deal with messages
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}复制代码
为了配合理解,咱们这里先来看一下定义:
@inline
final def currentStatus: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
@inline
final def isClosed: Boolean = currentStatus == Closed复制代码
这里咱们能够看出Mailbox自己会维护一个状态Mailbox.Status,是一个Int变量,并且是可变的,而且用到volatile来保证了它的可见性:
@volatile
protected var _statusDoNotCallMeDirectly: Status = _ //0 by default复制代码
如今咱们在回去看上面的代码,run方法的执行过程,首先它会去读取MailBox此时的状态,由于是一个Volatile read,因此能保证读取到的是最新的值,而后它会先处理任何的系统消息,这部分不须要咱们太过关心,以后即是执行咱们发送的消息,这里咱们须要详细看一下processMailbox()的实现:
@tailrec private final def processMailbox(
left: Int = java.lang.Math.max(dispatcher.throughput, 1),
deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
if (shouldProcessMessage) {
val next = dequeue() //去出下一条消息
if (next ne null) {
if (Mailbox.debug) println(actor.self + " processing message " + next)
actor invoke next
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
processAllSystemMessages()
if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
processMailbox(left - 1, deadlineNs) //递归处理下一条消息
}
}复制代码
从上述代码中咱们能够清晰的看到,当知足消息处理的状况下就会进行消息处理,从消息队列列取出下一条消息就是上面的dequeue()
,而后将消息发给具体的Actor进行处理,接下去又是处理系统消息,而后判断是否还有知足状况须要下一条消息,如有则再次进行处理,能够当作一个递归操做,@tailrec
也说明了这一点,它表示的是让编译器进行尾递归优化。
如今咱们来看一下一条消息从发送到最终处理在Akka中究竟是怎么执行的,下面的内容是我经过阅读Akka源码加自身理解得出的,这里先画了一张流程图:
消息的大体流程我都在图中给出,还有一些细节,必须序列化消息,获取状态等就没有具体说明了,有兴趣的同窗能够本身去阅读如下Akka的源码,我的以为Akka的源码阅读性仍是很好的,好比:
固然也有一些困扰,咱们在不了解各个类,接口之间的关系时,阅读体验就会变得很糟糕,固然我用IDEA很快就解决了这个问题。
咱们这里来看看关键的部分:Actor是如何保证串行处理消息的?
上图中有一根断定,是否已有线程在执行任务?咱们来看看这个断定的具体逻辑:
@tailrec
final def setAsScheduled(): Boolean = { //是否有线程正在调度执行该MailBox的任务
val s = currentStatus
/* * Only try to add Scheduled bit if pure Open/Suspended, not Closed or with * Scheduled bit already set. */
if ((s & shouldScheduleMask) != Open) false
else updateStatus(s, s | Scheduled) || setAsScheduled()
}复制代码
从注释和代码的逻辑上咱们能够看出当已有线程在执行返回false,若没有则去更改状态为以调度,直到被其余线程抢占或者更改为功,其中updateStatus()是线程安全的,咱们能够看一下它的实现,是一个CAS操做:
@inline
protected final def updateStatus(oldStatus: Status, newStatus: Status): Boolean =
Unsafe.instance.compareAndSwapInt(this, AbstractMailbox.mailboxStatusOffset, oldStatus, newStatus)复制代码
到这里咱们应该能够大体清楚Actor内部是如何保证共享内存的一致性了,Actor接收消息是多线程的,但处理消息是单线程的,利用MailBox中的Status来保障这一机制。
经过上面的内容咱们能够总结出如下几点: