本文介绍和点评JVM上的Thread, Thread Pool, Future, Rx, async-await, Fiber, Actor等并发编程模型。本人经验有限,不免粗陋,还请高手多多指教。
咱们知道程序分为同步风格和异步风格。java
异步的目的:充分利用计算资源。数据库
同步使线程阻塞,致使等待。编程
异步是非阻塞的,无需等待。设计模式
若是发生了没必要要的等待,就会浪费资源,使程序变慢。服务器
好比这样的程序:网络
val res1 = get("http://server1") val res2 = get("http://server2") compute(res1, res2)
按照同步编程风格,必定要先拿到res1,才能开始拿res2。session
按照异步编程风格,res1和res2互不依赖,发起对res1的获取后,没必要等待结果,而是立刻发起对res2的获取,到了compute的时候,才须要阻塞等待两个数据。数据结构
这是一种“顺序解耦”。有时候咱们并不要求某些操做按顺序执行!那么为何要强制其顺序呢?异步风格让咱们能放弃强制,解放资源,减小没必要要的等待。多线程
若是异步操做能并行,程序性能就提高了,若是不能并行,程序性能就没有提高。在当今的硬件条件下,通常都能并行,因此异步成为了趋势。闭包
怎么个并行法?这要从计算机架构提及了。让咱们把任何有处理能力的硬件看作一个处理单元——CPU显然是主要的处理单元,I/O设备也是处理单元,好比说网卡、内存控制器、硬盘控制器。CPU能够向一或多个I/O设备发出请求,当设备在准备数据时,CPU能够作其余事情(设备就绪后会用中断通知CPU),这时就有n个硬件在并行了!何况CPU本就是多核的,能作并行计算。除此以外,在分布式系统中,能同时调动多台计算机配合完成任务,也是并行。
所以,让CPU等待、每次只请求一个I/O设备、不利用多核、不利用其余空闲的计算机,都是比较浪费的。
下面咱们来分析常见的并发编程模型。
这是最简单的模型,建立线程来执行一个任务,完毕后销毁线程。当任务数量大时,会建立大量的线程。
你们都知道大量的线程会下降性能,可是你真的清楚性能开销在哪里吗?我试列举一下:
建立线程
建立一个线程是比较耗时间的。须要请求操做系统、分配栈空间、初始化等工做。
上下文切换
你们都知道的,操做系统基本概念,再也不赘述。值得注意的是,WAITING状态的线程(多见于I/O等待)几乎不会被调度,所以并不致使过多的上下文切换。
CPU cache miss
大量线程频繁切换,势必要访问不一样的数据,打乱了空间局部性,致使CPU cache miss增长,须要常常访问更慢的内存,会明显影响CPU密集型程序的性能,这点你们恐怕没想到吧。
内存占用
线程会增长内存占用,线程的栈空间一般占1MB,1000个就是1GB。并且在栈上引用了不少对象,暂时不能回收,你说有多少个GB?
资源占用
一些有限的资源,如锁、数据库链接、文件句柄等,当线程被挂起或阻塞,就暂时无人可用了,浪费!还有死锁风险!
那么分配多少线程好呢?
传统的网络程序是每一个会话占用一个链接、一个线程。I/O多路复用(I/O multiplexing:多个会话共用一个链接)是应C10K问题而生的,C10K就是1万个链接。1万个链接是很耗系统资源的,况且还有1万个线程。从上文的分析可知,C1K的时候就能够开始运用I/O多路复用了。
预留一些可反复使用的线程在一个池里,反复地接受任务。线程数量多是固定的,也多是必定范围内变更的,依所选择的线程池的实现而定。
这个模型是极其经常使用的,例如Tomcat就是用线程池来处理请求的。
注意——尽可能不要阻塞任务线程;若实在没法避免,多开一些线程——每阻塞一个线程,线程池就少一个可用的线程。
Java典型的线程池有Executors.newFixedThreadPool
Executors.newFixedThreadPool
Executors.newFixedThreadPool
Executors.newScheduledThreadPool
等等,也能够直接new ThreadPoolExecutor
(可指定线程数的上限和下限)。
Scala没有增长新的线程池种类,但有个blocking方法能告诉线程池某个调用会阻塞,须要临时增长1个线程。
Future是一个将来将会有值的对象,至关于一个占位符(提货凭证!)。
将任务投入线程池执行时,可为任务绑定一个Future,凭此Future便可在将来取得任务执行结果。将来是何时呢?要经过检查Future内部的状态来获知——任务完成时会修改这个状态,将执行结果存进去。
最初的代码示例可改写为:
// 两个future是并行的 val f1 = Future { get("http://server1") } val f2 = Future { get("http://server2") } compute(f1.get(), f2.get())
Rx (Reactive Extensions)是响应式编程的一种具体形式。响应式编程是一种面向数据流和变化传播的编程模式。
咱们知道Java 8提供了Stream类型,表明一个有限或无限的数据流,可应用map, filter, collect等操做。Rx相似于Stream,也是有限或无限的数据流,只不过数据操做能够委托给线程池异步执行。(Rx也像是生产者/消费者模型的延伸,增长了分发和转换的能力。对数据流进行链接组合,这边生产,那边分发和转换,源源不断交给消费者。)
以RxJava为例:
Flowable.just("file.txt") .map(name -> Files.readLines(name)) .subscribe(lines -> System.out.println(lines.size()), Throwable::printStackTrace);
以Reactor为例:
Flux.fromIterable(getSomeLongList()) .mergeWith(Flux.interval(100)) .doOnNext(serviceA::someObserver) .map(d -> d * 2) .take(3) .onErrorResumeWith(errorHandler::fallback) .doAfterTerminate(serviceM::incrementTerminate) .subscribe(System.out::println);
由代码可见,对数据流的操做很像是对集合的函数式操做,subscribe就是异步的forEach,doOnNext就是有返回值的异步的forEach。
主流实现有RxJava、Reactor、Akka Streams,API各有不一样。可是它们都在靠拢Reactive Streams规范,想必会变得愈来愈类似。
async-await是一种特殊语法,能自动把同步风格代码转换成异步风格代码。正确运用,就能使代码在阻塞时自动让出控制权。
C#内置的async-await是最完整的实现。Scala经过Async库提供这个语法,代码大概是这样:
val future = async { println("Begin blocking") await { async {Thread.sleep(1000)} } println("End blocking") }
代码会被自动转换成多种future的组合形式。无需特地处理,能并行的部分都会自动并行。
Fiber是协程的仿制品。通常多线程是抢占式调度,你一个任务跑得好好的忽然把你暂停;协程是协做式的,你一个任务阻塞或完成时要主动让出控制权,让调度器换入另外一个任务。
async-await自动把代码转换成可自动让出控制权的形式,已经有协程的雏形了。Fiber更加智能,连async-await语法都不用了,只要把代码写在Fiber里面,就像写在Thread里面同样,自动异步化了。
async-await只能暂存当前做用域(转换成闭包),Fiber则能暂存整个执行栈(每一个做用域只是一个栈帧)。固然了,运用嵌套的async-await也能暂存整个执行栈,我更赞同如此,由于能更好地控制内存占用。
JVM上主流的实现是Quasar,经过java-agent改写字节码来实现,在须要让出控制权时抛出异常打断控制流(没必要担忧异常方面的性能开销),保存执行栈,而后换入另外一个任务。
Java示例:
new Fiber<V>() { @Override protected V run() throws SuspendExecution, InterruptedException { // your code } }.start();
Kotlin示例:
fiber @Suspendable { // your code }
代码中调用的任何会阻塞的方法都要标记@Suspendable,让Quasar知道调这个方法时要暂停当前Fiber并执行另外一个Fiber,同时用另外的线程池执行会阻塞的方法。
起源于电信领域的Erlang的编程模型。actor是任务处理单元:每一个actor只处理一个任务,每一个任务同时只有一个actor处理(若是有大任务,就要分解成小任务),actor之间用消息来通讯。
在Erlang中,每一个actor是一个轻量级进程,有独立的内存空间(因此通讯只能靠消息),所以有独立的垃圾回收,不会stop the world。
actor能够发了消息就无论了(tell),这是典型的异步;也能够发了消息等回应(ask),返回值是一个Future,其实是建立了一个新的actor在悄悄等待回应,仍然是异步。
actor能够透明地分布在不一样机器上,消息能够发给本机的actor,也能够发给远程的actor。
JVM上惟一成熟的实现是Akka,JVM不能给每一个actor独立的内存,垃圾回收仍可能stop the world。
actor显然是一个对象,拥有状态和行为。
actor也可被视为一个闭包,拥有函数和上下文(整个对象的状态都是上下文)。
actor每次能接收并处理一个消息,处理中能够发送消息给本身或另外一个actor,而后挂起或结束。
为何要发送消息给本身呢?由于正在处理消息时是不能挂起的,只能在“一个消息以后,下一个消息以前”的间隙中挂起。
假设你收到一个A消息,执行前半段业务逻辑,要作一次I/O再执行后半段业务逻辑。作I/O时应当结束当前处理,当IO完成时给本身发一个B消息,下次再让你在处理B消息时完成剩余业务逻辑。先后逻辑要分开写,共享变量要声明为actor的对象字段。
伪代码以下:
class MyActor extends BasicActor { var halfDoneResult: XXX = None def receive(): Receive = { case A => { halfDoneResult = 前半段逻辑() doIO(halfDoneResult).onComplete { self ! B() } } case B => 后半段逻辑(halfDoneResult) } }
当actor的状态要完全改变时,能够用become操做完全改变actor的行为。从面向对象编程的设计模式来看,这是state pattern,从函数式编程来看,这是把一个函数变换成另外一个函数。
因而可知,actor模型就是把函数表示成了更容易控制的对象,以便于知足一些并发或分布式方面的架构约束。
这段逻辑假如改写成async-await或fiber,伪代码以下所示,简单多了:
def logicInAsync() = async { val halfDoneResult = 前半段逻辑() await { doIO(halfDoneResult) } 后半段逻辑(halfDoneResult) } def logicInFiber() = fiber { val halfDoneResult = 前半段逻辑() doIO(halfDoneResult) 后半段逻辑(halfDoneResult) }
能够看出,相比于async-await或Fiber,actor就是一种状态机,是较为底层、不易用的编程模型。可是actor附带了成熟的分布式能力。
我感受actor很像异步版的EJB。EJB中有stateless session bean和stateful session bean,actor也可按stateless和stateful来分类。
PayPal的支付系统就是基于Akka的,还为此编写并开源了一个Squbs框架。业务逻辑还是用actor实现,Squbs只增长了集成和运维方面的支持(这个也重要)。然而我对此技术路线(业务逻辑基于actor)持审慎态度,接下来就分类说明个人意见:
我认为,此架构只须要三种通讯模型:消息队列、同步RPC、异步RPC。
消息队列、同步RPC都不须要Akka出场,自有各类MQ、RPC框架来解决。至于异步RPC,GRPC是一个跨语言的RPC框架,也可建造一个基于WebSocket协议的RPC框架。若是无需跨语言,也可以让Akka出场,但不是直接基于Akka编程——而是在Akka之上构建一个RPC层。若是功力较高,可直接基于Netty构建RPC层。
actor进行“请求-响应”往返通讯时,在收到响应以前,请求端的actor要挂起、暂存在内存中。协程进行这种通讯时,则是请求端的执行栈要挂起、暂存在内存中。
这是actor的龙兴之地, 也是最合适的用武之地。
以即时聊天(IM)为例,用actor怎么实现呢?
所以选择第一种实现:每一个actor对应一我的,actor要记得它对应哪一个人、消息往来状况如何,这就是“状态”!若是10万用户在线,就要10万链接(这与IO多路复用无关,对吧?),单机显然hold不住,须要多机。若是用actor A和actor B不在同一台机器,就要远程通讯了。对基于Akka的程序来讲,本地通讯或远程通讯是透明的,赞!
其实不用actor也能实现,一切状态和关系都能用数据结构来表达,只不过actor可能更方便一些。
总而言之,Akka模仿Erlang,精心设计了业务无关的actor的概念,然而越是精心设计的业务无关的概念越有可能不符合多变的业务需求:)。若是问我用不用actor,我只能说,看状况吧。也但愿有哪位英雄能介绍一两个非actor不可的场景。
如今,假设有一个微服务架构,在众多服务中有A、B、C三个服务,调用顺序是A->B->C。RPC只能以A->B->C的方向请求,再以C->B->A的方向响应;actor则能让C直接发送响应给A。但若是C要直接回复A,就要与A创建链接,使网络拓扑和依赖管理都变复杂了——如非必要,勿增复杂。
为了不,利用MQ来发送响应?MQ就像一个聊天服务,让分布各处的服务能彼此聊天。IM、actor、MQ,一切都联系起来了,有没有感觉到妙趣横生的意境?
可是压力集中到了MQ的broker,网络也多了一跳(publisher->broker->consumer),对性能有所影响。
本文介绍、点评了JVM上多种常见的并发模型,并试图创建模型之间的联系,最后以分布式架构为例加以分析。
那么应用程序要怎么写呢?看文档吧,各类库或框架都但愿有人来用,知足它们吧!