使用异步编程

转发至:http://www.ituring.com.cn/article/130823javascript

导言

现代的应用程序面临着诸多的挑战,如何构建具备可伸缩性和高性能的应用成为愈来愈多软件开发者思考的问题。随着应用规模的不断增大,业务复杂性的增加以及实时处理需求的增长,开发者不断尝试榨取硬件资源、优化。html

在不断的探索中,出现了不少简化场景的工具,好比提供可伸缩计算资源的Amazon S3Windows Azure,针对大数据的数据挖掘工具MapReduce,各类CDN服务,云存储服务等等。还有不少的工程实践例如敏捷DDD等提供了指导。能够看到,将每一个关注层面以服务的方式提供,成为了愈来愈流行的一种模式,或许咱们能够激进的认为,这就是SOAjava

开发者须要将不一样的资源粘合在一块儿来提供最终的应用,这就须要协调不一样的资源。node

咱们能够设想一个大的场景,开发者正在开发的一个用例会从用户的浏览器接收到请求,该请求会先从一个开放主机服务(OHS)获取必要的资源res1,而后调用本机的服务s1对资源res1进行适应的转换产生资源res2,接着以res2为参数调用远程的数据仓库服务rs1获取业务数据bs1,最后以bs1为参数调用本机的计算服务calc并通过10s产生最终的数据。jquery

简单的用ASP.NET MVC 5表示就是这样的(这些代码是我瞎掰的):git

// notes: ASP.NET vNext changed MVC 5 usage, 
// ActionResult now became IActionResult
public IActionResult CrazyCase(UserData userData) {
    var ticket = CrazyApplication.Ticket;

    var ohsFactory = new OpenHostServiceFactory(ticket);
    var ohs = ohsFactory.CreateService();

    var ohsAdapter = new OhsAdapter(userData);

    var rs1 = ohs.RetrieveResource(ohsAdapter);
    var rs2 = _localConvertingService.Unitize(rs1);
    var bs1 = _remoteRepository.LoadBusinessData(rs2);
    var result = _calculationService.DoCalculation(bs1);

    return View(result);
}

这多是中等复杂度的一个场景,可是相信开发者已经意识到了这其中所涉及的复杂度。咱们看到每一步都是依赖于前者所产生的数据,在这样一种场景之下,传统的多线程技术将极度受限,而且最顶层的协调服务将始终占用一个线程来协调每一步。github

线程是要增长开销的,尤为是上下文的转换,别扯什么线程池了,建立线程的开销是节省了,上下文切换的开销才是要命的。web

经济不景气,能省点儿资源就省点儿吧。ajax


因此咱们该怎么办?纵向扩展给服务器加多点内存?横向扩展上负载均衡?别闹了咱们又不是民工,想问题不要太简单粗暴。解决的办法就是,异步,并且咱们这篇也只讨论异步这一种技术。算法

为何使用异步

那么,异步的优点在哪里?这首先要和同步作一个对比。

仍是开头那个场景,示例代码所展现的是使用同步阻塞的方式来一步一步的执行,以下示意:

main) +++$----$------$--------$----------$+++
         |   /|     /|       /|         /
ohs )    $++$ |    / |      / |        /
              |   /  |     /  |       /
rs1 )         $++$   |    /   |      /
                     |   /    |     /
s1  )                $++$     |    /
                              |   /
calc)                         $++$

notes:
$ code point
+ thread busy
- thread blocked(means, wasted)

能够明显的看到,当主线程发起各个service请求后,彻底处于闲置占用的状态,所作的无非是协调任务间的依赖顺序。这里所说的占用,其实就是CPU的时间片。

咱们为何要等全部的子任务结束?由于任务间有前后顺序依赖。有没有更好的方式来规避等待所带来的损耗呢?考虑一个场景,正上着班呢,忽然想起要在网上买个东西,那么打开京东你就顺利的下单了,事情并无结束,你不会等快递的小哥给你送来东西之后再接着今天的工做吧?你会给快递留下你的联系方式,让他到了给你打电话(耗时的I/O任务),而后你继续今天烧脑的编程任务(CPU密集型)。从人类的角度来看,这必定是最正常不过的,也就是要讨论的异步的方式。

必定有人会提议单开一个线程作收快递的任务,我赞成这是一种解决方案,可是若是用等效的人类角度的语言来讲,就是你将大脑的资源分红了两半,一半在烧脑编程,一半在盯着手机发呆,脑利用率降低太明显。而用异步的方式,你不须要关注手机,由于手机响了你就天然获得了通知。 固然,你也能够任性的说,我就喜欢等快递来了再干活。if so,咱们就不要作朋友了。

因此咱们能够有一个推论:异步所解决的,就是节省低速的IO所阻塞的CPU计算时间。

转换一下思路,咱们使用异步非阻塞的方式来构建这段业务,并借助异步思想早已深刻人心的javascript语言来解释,能够是这样的:

// express

var ohs = require('./anticorruption/OpenHostService');
var localConvertingService = require('./services/LocalConverting');
var remoteRepository = require('./repositories/BusinessData');
var calculationService = require('./services/Calculation');

function(req, res) {
    var userData = req.body;

    // level1 nest
    ohs.retrieveResource(userData, function(err, rs1) {
        if(err) {
            // error handling
        }
        // level2 nest
        localConvertingService.unitize(rs1, function(err, rs2) {
            if(err) {
                // error handling
            }
            //level3 nest
            remoteRepository.loadBusinessData(rs2, function(err, bs1) {
                if(err) {
                    // error handling
                }
                //level4 nest
                calculationService.doCalculation(bs1, function(err, result) {
                    if(err) {
                        // error handling
                    }
                    res.view(result);
                });
            });
        });
    });
}

看着一层又一层的花括号也是醉了,咱们以后会讨论如何解嵌套。那么这段代码所反应的是怎样的事实呢?以下示意:

main) +++$                           $+++
          \                         /
ohs )      $++$                    /
               \                  /
rs1 )           $++$             /
                    \           /
s1  )                $++$      /
                         \    /
calc)                     $++$

notes:
$ code point
+ thread busy
- thread blocked(means, wasted)

因为异步解放了原始的工做线程,使CPU资源能够不被线程的阻塞而被浪费,从而能够有效的提升吞吐率。

异步的使用场景

技术和选择和使用场景有着很大的关系,每项技术不都是银弹,使用对的工具/技术解决对的问题是开发者的义务。

开发者最多关注的是计算密集和I/O密集这两个维度,对于这两个维度每每有着不一样的技术选型。

计算密集型应用

何为计算密集型应用?下面两我的畜皆知的函数都是计算密集型的。

 1 // F#
 2 let fibonacci n =
 3     let rec f a b n =
 4         match n with
 5         | 0 -> a
 6         | 1 -> b
 7         | n -> (f b (a + b) (n - 1))
 8     f 0 1 n
 9 
10 let rec factorial n = 
11     match n with
12     | 0 -> 1
13     | n -> n * factorial (n - 1)

尤为是第二个阶乘函数,若是在调用的时候不当心手抖多加了几个0,基本上能够出去喝个咖啡谈谈理想聊聊人生玩一天再回来看看有没有算完了。

简而言之,计算密集型的任务是典型的重度依赖CPU/GPU,不涉及磁盘、网络、输入输出的任务。游戏中场景渲染是计算密集的,MapReduce中的Reduce部分是计算密集的,视频处理软件的实时渲染是计算密集的,等等。

在这样的场景之下,异步是没有太大的优点的,由于计算资源就那么多,不增不减,用多线程也好用异步流也好,CPU永远处于高负荷状态,这病不能治,解决方案只能是:

  • 横向的集群方案
  • 纵向的升级主机CPU或采用更快的GPU
  • 优化算法,使之空间/时间成本下降

可是有一种场景是能够考虑使用异步的,考虑一个分布式的计算场,一个计算任务发起后,协调者须要等待全部的计算节点子结果集返回后者能作最后的结果化简。那么此时,虽然场景是计算密集的,可是因为涉及到任务的依赖协调,采用异步的方式,能够避免等待节点返回结果时的阻塞,也能够避免多线程方式的上下文切换开销,要知道在这样的场景下,上下文切换的开销是能够大的惊人的。

类似的场景还有,一个桌面应用,假设点击界面上一个按钮以后会进行大量的计算,若是采用同步阻塞的方式,那么当计算完成以前UI是彻底阻塞的跟假死同样,可是如何使用异步的方式,则不会发生UI阻塞,计算在结束后会以异步的方式来更新界面。还记得WinForm编程中的BeginInvokeEndInvoke吗?虽然它们的实现方式是以单独线程的方式来实现异步操做的,可是这仍然属于异步流控制的范畴。

异步的实现方式有不少,可使用已有的线程技术(Rx和C#的async/await就是使用这种方式),也可使用相似于libuv之类的I/O异步封装配合事件驱动(node就是使用这种方式)。并于异步流控制的部分咱们以后会讨论。

因此若是你的应用是计算密集型的,在充分分析场景的前提下能够适当的采用异步的方式。大部分的计算密集型场景是不用介入异步控制技术的,除非它能够显著改善应用的流程控制能力。

I/O密集型应用

何为I/O密集型应用?Web服务器自然就是I/O密集型的,由于有着高并发量与网络吞吐。文件服务器和CDN是I/O密集型的,由于高网络吞吐高磁盘访问量。数据库是I/O密集型的,涉及磁盘的访问及网络访问。说到底,一切和输入输出相关的场景都是I/O密集型的。

I/O囊括的方面主要是两方面:

  • 网络访问
  • 磁盘读写

简单粗暴的解释,就是接在主板南桥上的设备的访问都属于I/O。多提一句,内存是直接接在北桥上的,这货,快。

开发者遇到最多的场景即是Web应用和数据库的高并发访问。其它的服务调用都属于网络I/O,可归为一类。

典型的就是Web服务器接收到了HTTP请求,而后具体的Web框架会单开一个线程服务这个请求。由于HTTP是构建在TCP之上的,因此在请求结束返回结果以前,socket并无关闭,在windows系统上这就是一个句柄,在*nix之类的posix系统上这就是一个文件描述符,都是系统资源紧张的很。这是硬性的限制,能打开多少取决与内存与操做系统,咱们暂且不关注这部分。该线程若是采用同步的方式,那么它程的生命周期会吻合socket的生命周期,期间无论是访问文件系统花了10s致使cpu空闲10s的时间片,仍是访问数据库有3s的时间片空隙,这个线程都不会释放,就是说,这个线程是专属的,即使是使用线程池技术,该占还得占。

这有点像是银行的VIP专线,服务人员就那么多,若是每人服务一个VIP且甭管人家在聊人生聊理想仍是默默注视,后面人就算是VIP也得等着,由于没人能够服务你了。

那么咱们继续深刻,线程也是一种相对昂贵的资源,虽然比建立进程快了太多,可是仍然有限制。windows的32位操做系统默认每进程可以使用2GB用户态内存(64bit是8Tb用户态内存, LoL),每一个线程有1Mb的栈空间(能改,但不建议。);*nix下是8Mb栈空间,32位的进程空间是4Gb,64位则大到几近没有用户态内存限制。咱们能够假定32位系统下一个合理的单进程线程数量:1500。那么一个进程最大的并发量就是1500请求了,抛开多核不谈,这1500个线程就算轮班倒,并发量不会再上去了,由于一个socket一个线程。若是每一个请求都是web服务器处理1s加访问数据库服务器3s,那么时钟浪费率则大的惊人。何况,1500个线程的上下文切换想一想都是开心,开了又开

不幸的是,以前的web服务器都是这么干的。此时咱们思考,若是采用异步的方式,那3s的阻塞彻底能够规避,从而使线程轮转的更快,由于1s的处理时间结束后线程返回线程池而后服务于另外一个请求,从而总体提升服务器的吞率。

事实上,node压根就没有多线程的概念,使用事件循环配合异步I/O,一个线程总够你甩传统的Web服务器吞吐量几条街。没错,请叫我node雷锋。

再继续深刻异步编程前,咱们先理一理几个常常混淆的概念。

一些概念的区别

多核与多线程

多核是一种物理上的概念,即指主机所拥有的物理CPU核心数量,总核心数 = CPU个数 * 每一个CPU的核心数。每一个核心是独立的,能够同时服务于不一样的进程/线程。

多线程是一种操做系统上的概念,单个进程可能建立多个线程来达到细粒度进行流程控制的目的。操做系统的核心态调度进程与线程,在用户态之下其实还能够对单个线程有更细粒度的控制,这称之为协程(coroutine)纤程(fibers)

多线程是指在单个进程空间内经过操做系统的调度来达到多流程同时执行的一种机制,固然,单个CPU核心在单位时间内永远都只是执行一个线程的指令,因此须要以小的时间片断雨露均沾的执行每一个线程的部分指令。在切换线程时是有上下文的切换的,包括寄存器的保存/还原,线程堆栈的保存/还原,这就是开销。

并行与并发

关于并行,真相只有一个,单个CPU核心在单位时间内只能执行一个线程的指令,因此若是总核心数为20,那么咱们能够认为该主机的并行能力为20,可是用户态的并行能力是要比这个低的,由于操做系统服务和其它软件也是要用cpu的,所以这个数值是达不到的。

一个题外话,若是并行能力为20,那么咱们能够粗略的认为,该主机一次能够同时执行20个线程,若是程序的线程使用率健康的话,保持线程池为20左右的大小能够作到彻底的线程并行执行没有上下文切换。

那么并发则关注于应用的处理能力。这是一个更加侧重网络请求/服务响应能力的概念,能够理解为单位时间内能够同时接纳并处理用户请求的能力。它和多少CPU没有必然的关系,单纯的考量了服务器的响应回复能力。

阻塞与非阻塞

阻塞/非阻塞与同步/异步是常常被混淆的。同步/异步其实在说事件的执行顺序,阻塞/非阻塞是指作一件事能不能当即返回。

咱们举个去KFC点餐的例子。点完餐交完钱了,会有这么几种状况:

  • 服务人员直接把东西给我,由于以前已经作好了,因此能立刻给我,这叫作非阻塞,我不须要等,结果当即返回。这整个过程是同步完成的。
  • 服务人员一看没有现成的东西了,跑去现作,那么我就在这儿一直等,没刷微信没作别的干等,等到作出来拿走,这叫阻塞,由于我傻到等结果返回再离开点餐台。这整个过程是同步完成的。
  • 服务人员一看没有现成的东西了,跑去现作,并告诉我说:先去作别的,作好了我叫你的号。因而我开心的找了个座位刷微信,等叫到了个人号了取回来。这叫作非阻塞,整个过程是异步的,由于我还刷了微信思考了人生。

异步是非阻塞的,可是同步能够是阻塞的也能够是非阻塞的,取决于消费的资源。

异步编程的挑战

异步编程的主要困难在于,构建程序的执行逻辑时是非线性的,这须要将任务流分解成不少小的步骤,再经过异步回调函数的形式组合起来。在异步大行其道的javascript界常常能够看到不少层的});,简单酸爽到妙趣横生。这一节将讨论一些经常使用的处理异步的技术手段。

回调函数地狱

开头的那个例子使用了4层的嵌套回调函数,若是流程更加复杂的话,还须要嵌套更多,这不是一个好的实践。并且以回调的方式组织流程,在视觉上并非很直白,咱们须要更加优雅的方式来解耦和组织异步流。

使用传统的javascript技术,能够展平回调层次,例如咱们能够改写以前的例子:

 1 var ohs = require('./anticorruption/OpenHostService');
 2 var localConvertingService = require('./services/LocalConverting');
 3 var remoteRepository = require('./repositories/BusinessData');
 4 var calculationService = require('./services/Calculation');
 5 
 6 function(req, res) {
 7     var userData = req.body;
 8 
 9     ohs.retrieveResource(userData, ohsCb);
10 
11     function ohsCb(err, rs1) {
12         if(err) {
13             // error handling
14         }
15         localConvertingService.unitize(rs1, convertingCb);
16     }
17 
18     function convertingCb(err, rs2) {
19         if(err) {
20             // error handling
21         }
22         remoteRepository.loadBusinessData(rs2, loadDataCb);
23     }
24 
25     function loadDataCb(err, bs1) {
26         if(err) {
27             // error handling
28         }
29         calculationService.doCalculation(bs1 , calclationCb);
30     }
31 
32     function calclationCb(err, result) {
33         if(err) {
34             // error handling
35         }
36         res.view(result);
37     }
38 }

解嵌套的关键在于如何处理函数做用域,以后金字塔厄运迎刃而解。

还有一种更为优雅的javascript回调函数处理方式,能够参考后面的Promise部分。

而对于像C#之类的内建异步支持的语言,那么上述问题更加的不是问题,例如:

 1 public async IActionResult CrazyCase(UserData userData) {
 2     var ticket = CrazyApplication.Ticket;
 3 
 4     var ohsFactory = new OpenHostServiceFactory(ticket);
 5     var ohs = ohsFactory.CreateService();
 6 
 7     var ohsAdapter = new OhsAdapter(userData);
 8 
 9     var rs1 = await ohs.RetrieveResource(ohsAdapter);
10     var rs2 = await _localConvertingService.Unitize(rs1);
11     var bs1 = await _remoteRepository.LoadBusinessData(rs2);
12     var result = await _calculationService.DoCalculation(bs1);
13 
14     return View(result);
15 }

async/await这糖简直不能更甜了,其它C#的编译器仍是生成了使用TPL特性的代码来作异步,说白了就是一些Task<T>在作后台的任务,当遇到async/await关键字后,编译器将该方法编译为状态机,因此该方法就能够在await的地方挂起和恢复了。整个的开发体验几乎彻底是同步式的思惟在作异步的事儿。后面有关于TPL的简单介绍。

异常处理

因为异步执行采用非阻塞的方式,因此当前的执行线程在调用后捕获不到异步执行栈,所以传统的异步处理将再也不适用。举两个例子:

1 try {
2     Task.Factory.StartNew(() => {
3         throw new InvalidOperationException("diablo coming.");
4     });
5 } catch(InvalidOperationException e) {
6     // nothing captured.
7     throw;
8 }

1 try {
2     process.nextTick(function() {
3         throw new Error('diablo coming.');
4     });
5 } catch(e) {
6     // nothing captured.
7     throw e;
8 }

在这两个例子中,try语句块中的调用会当即返回,不会触发catch语句。那么如何在异步中处理异常呢?咱们考虑异步执行结束后会触发回调函数,那么这即是处理异常的最佳地点。node的回调函数几乎老是接受一个错误做为其首个参数,例如:

fs.readFile('file.txt', 'utf-8', function(err, data) { });

编译器所构建的状态机能够支持异常的处理,简直是强大到无与伦比。固然,对于TPL的处理也有其专属的支持,相似于node的处理方式:

1 Task.Factory.StartNew(() => {
2     throw new InvalidOperationException("diablo coming.");
3 })
4 .ContinueWith(parent => {
5     var parentException = parent.Exception;
6 });

注意这里访问到的parent.Exception是一个AggregateException类型,对应的处理方式也较传统的异常处理也稍有不一样:

1 parentException.Handle(e => {
2     if(e is InvalidOperationException) {
3         // exception handling.
4         return true;
5     }
6 
7     return false;
8 });

异步流程控制

异步的技术也许明白了,可是遇到更复杂的异步场景呢?假设咱们须要异步并行的将目录下的3个文件读出,所有完成后进行内容拼接,那么就须要更细粒度的流程控制。

咱们能够借鉴async.js这款优秀的异步流程控制库所带来的便捷。

 1 async.parallel([
 2     function(callback) {
 3          fs.readFile('f1.txt', 'utf-8', callback)
 4     },
 5     function(callback) {
 6          fs.readFile('f2.txt', 'utf-8', callback)
 7     },
 8     function(callback) {
 9          fs.readFile('f3.txt', 'utf-8', callback)
10     }
11 ], function (err, fileResults) {
12     // concat the content of each files
13 });

若是使用C#并配合TPL,那么这个场景能够这么实现:

 1 public async void AsyncDemo() {
 2     var files = new []{
 3         "f1.txt",
 4         "f2.txt",
 5         "f3.txt"
 6     };
 7 
 8     var tasks = files.Select(file => {
 9         return Task.Factory.StartNew(() => {
10             return File.ReadAllText(file);
11         });
12     });
13 
14     await Task.WhenAll(tasks);
15 
16     var fileContents = tasks.Select(t => t.Result);
17 
18     // concat the content of each files
19 }

咱们再回到咱们开头遇到到的那个场景,可使用async.jswaterfall来简化:

 1 var ohs = require('./anticorruption/OpenHostService');
 2 var localConvertingService = require('./services/LocalConverting');
 3 var remoteRepository = require('./repositories/BusinessData');
 4 var calculationService = require('./services/Calculation');
 5 var async = require('async');
 6 
 7 function(req, res) {
 8     var userData = req.body;
 9 
10     async.waterfall([
11         function(callback) {
12             ohs.retrieveResource(userData, function(err, rs1) {
13                 callback(err, rs1);
14             });
15         },
16         function(rs1, callback) {
17             localConvertingService.unitize(rs1, function(err, rs2) {
18                 callback(err, rs2);
19             });
20         },
21         function(rs2, callback) {
22             remoteRepository.loadBusinessData(rs2, function(err, bs1) {
23                 callback(err, bs1);
24             });
25         },
26         function(bs1, callback) {
27             calculationService.doCalculation(bs1, function(err, result) {
28                 callback(err, result);
29             });
30         }
31     ],
32     function(err, result) {
33         if(err) {
34             // error handling
35         }
36         res.view(result);
37     });
38 }

若是须要处理先后无依赖的异步任务流可使用async.series()来串行异步任务,例如先开电源再开热水器电源最后亮起红灯,并无数据的依赖,但有前后的顺序。用法和以前的parallel()waterfall()大同小异。另外还有优秀的轻量级方案step,以及为javascript提供monadic扩展的wind.js(特别像C#提供的方案),有兴趣能够深刻了解。

反人类的编程思惟

异步是反人类的

人类生活在一个充满异步事件的世界,可是开发者在构建应用时却遵循同步式思惟,究其缘由就是由于同步符合直觉,而且能够简化应用程序的构建。

究其深层缘由,就是由于现实生活中咱们是在演绎,并经过不一样的口头回调来完成一系列的异步任务,咱们会说你要是有空了来找我聊人生,货到了给我打电话,小红你写完文案了交给小明,小丽等全部的钱都到了通知小强……而在作开发时,咱们是在列清单,咱们的说法就是:我等着你有空而后开始聊人生,我等着货到了而后我就知道了,我等着小红文案写完了而后开始让她交给小明,我等着小丽确认全部的钱到了而后开始让她通知小强……

同步的思惟能够简化编程的关注点,可是没有将流程进行现实化的切分,咱们老是倾向于用同步阻塞的方式来将开发变成简单的步骤程序化,却忽视了用动态的视角以及消息/事件驱动的方式构建任务流程。

异步在编程看来是反人类的,可是从业务角度看倒是再合理不过的了。经过当的工具及技术,使用异步并非难以企及的,它可使应用的资源利用更加的高效,让应用的响应性更上一个台阶。

扩展阅读

Promise/Deferred

在通常状况下,Promise、Deferred、Future这些词能够当作是同义词,描述的是同一件事情。

jQuery 1.5+以后出现了一种新的API调用方式,相比于旧的API,新的方式更好的解耦了关注点,并带来了更好的组合能力。

咱们看一个传统的使用ajax的例子:

1 $.get('/api/service1', {
2     success: onSuccess,
3     failure: onFailure,
4     always:  onAlways
5 });

使用新的API后,调用的方式变成了:

1 $.get('/api/service1')
2     .done(onSussess)
3     .fail(onFailure)
4     .always(onAlways);

get方法返回的是一个promise对象,表示这个方法会在将来某个时刻执行完毕。

PromiseCommonJS提出的规范,而jQuery的实如今其基础上有所扩展,旗舰级的实现能够参考Kris KowalQ.js

咱们使用jQuery来构建一个promise对象:

 1 var longTimeOperation = function() {
 2     var deferred = $.Deferred();
 3 
 4     // taste like setTimeout()
 5     process.nextTick(function() {
 6         // do operation.
 7         deferred.resolve();
 8         // if need error handling, use deferred.reject();
 9     });
10 
11     return deferred.promise();
12 }
13 
14 $.when(longTimeOperation())
15     .done(success)
16     .fail(failure);

因为jQuery生成的Deferred能够自由的进行resolve()reject(),因此在返回时咱们使用.promise()生成不含这个两方法的对象,从而更好的封装逻辑。

那么Promise究竟带给咱们的便利是什么?Promise表示在将来这个任务会成功或失败,可使用1和0来表示,那么开发者立刻就开始欢呼了,给我布尔运算我能撬动地球!因而,咱们能够写出以下的代码:

1 $.when(uploadPromise, downloadPromise)
2     .done(function() {
3         // do animation.
4     });

对于开头的那个例子咱们说过有着更优雅的解回调函数嵌套的方案,那就是使用promise,咱们来尝试改写开头的那个例子:

 1 var ohs = require('./anticorruption/OpenHostService');
 2 var localConvertingService = require('./services/LocalConverting');
 3 var remoteRepository = require('./repositories/BusinessData');
 4 var calculationService = require('./services/Calculation');
 5 var $ = require('jquery');
 6 
 7 function(req, res) {
 8     var userData = req.body;
 9 
10     function deferredCallback(deferred) {
11         return function(err) {
12             if(err) {
13                 deferred.reject(err);
14             } else {
15                 var args = Array.prototype.slice.call(arguments, 1);
16                 deferred.resolve(args);
17             }
18         };
19     }
20 
21     function makeDeferred(fn) {
22         var deferred = $.Deferred();
23         var callback = deferredCallback(deferred);
24         fn(callback);
25         return deferred.promise();
26     }
27 
28     var retrieveResourcePromise = makeDeferred(function(callback) {
29         ohs.retrieveResource(userData, callback);
30     });
31 
32     var convertingPromise = makeDeferred(function(callback) {
33         localConvertingService.unitize(rs1, callback);
34     });
35 
36     var loadBusinessDataPromise = makeDeferred(function(callback) {
37         remoteRepository.loadBusinessData(rs2, callback);
38     });
39 
40     var calculationPromise = makeDeferred(function(callback) {
41         calculationService.doCalculation(bs1 , callback);
42     });
43 
44     var pipedPromise = retrieveResourcePromise
45         .pipe(convertingPromise)
46         .pipe(loadBusinessDataPromise)
47         .pipe(calculationPromise);
48 
49     pipedPromise
50         .done(function(result) {
51             res.view(result);
52         })
53         .fail(function(err) {
54             // error handling
55         });
56 }

咱们使用了一个高阶函数来生成能够兼容deferred构造的回调函数,进而使用jQuerypipe特性(在Q.js里可使用then()组合每一个promise),使解决方案优雅了不少,而这个工具函数在Q.js里直接提供,因而新的解决方案能够以下:

 1 var ohs = require('./anticorruption/OpenHostService');
 2 var localConvertingService = require('./services/LocalConverting');
 3 var remoteRepository = require('./repositories/BusinessData');
 4 var calculationService = require('./services/Calculation');
 5 var Q = require('q');
 6 
 7 function(req, res) {
 8     var userData = req.body;
 9 
10     var retrieveResourceFn = Q.denodeify(ohs.retrieveResource)
11     var convertingFn = Q.denodeify(localConvertingService.unitize);
12     var loadBusinessDataFn = Q.denodeify(remoteRepository.loadBusinessData);
13     var calculationFn = Q.denodeify(calculationService.doCalculation);
14 
15     retrieveResourceFn(userData)
16         .then(convertingFn)
17         .then(loadBusinessDataFn)
18         .then(calculationFn)
19         .then(function(result) {
20             res.view(result);
21         }, function(err) {
22             // error handling
23         });
24 }

那咱们如何看待TPL特性呢?咱们看看TPL能够作什么:

  • Task为基本构造单位,执行时不阻塞调用线程
  • 每一个Task是独立的,Task有不一样的状态,可使用Task.Status获取
  • Task能够组合,使用相似.ContinueWith(Task))以及.WhenAll(Task[]).WhenAny(Task[])的方式自由组合。

对比一下Promise

  • Promise为基本构造单位,表示一个未来完成的任务,调用时当即返回
  • 每一个Promise是独立的,Promise有不一样的状态,可使用.state获取
  • Promise能够组合,使用.then().pipe()以及.when()来组合执行流程

能够看到,不管是Promise仍是TPL,在设计上都有着惊人的类似性。咱们有理由猜测在其它的的语言或平台都存在着相似的构造,由于异步说白了,就是让将来完成的事情本身触发后续的步骤。

Pull vs. Push

GoF32中没有提到迭代器模式(Iterator)与观察者模式(Observer)的区别和联系,其实这两个模式有着千丝万缕的联系。

Iterator反映的是一种Pull模型,数据经过同步的方式从生产者那里拉过来,咱们经过它的定义即可看到这一事实:

1 interface IEnumerator<out T>: IDisposable
2 {
3     bool MoveNext();
4     T Current { get; }
5 }

经过阻塞的方式调用MoveNext(),数据一个一个的拉取到本地。

而Observer反映的是一种Push模型,经过注册一个观察者(相似于回调函数),当生产者有数据时,主动的推送到观察者手里。观察者注册结束后,本地代码没有阻塞,推送数据的整个过程是异步执行的。咱们经过它的定义来对比Iterator:

1 interface IObserver<in T>
2 {
3     void OnCompleted();
4     void OnError(Exception exception);
5     void OnNext(T value);
6 }

咱们发现,其实这两个接口是彻底对偶的(参见Erik Meijer大大的论文Subject/Observer is Dual to Iterator):

  • MoveNext()拉取下一个数据,OnNext(T)推送下一个数据
  • MoveNext()返回值指示了有无剩余数据(完成与否),OnCompleted()指示了数据已完成(推送数据完成的消息)
  • Iterator是同步的,因此出现了异常直接在当前运行栈上,Observer是异步的,因此须要另外一种方式来通知发生了异常(参见上文中的异步处理一节),因而有了OnError(Exception)

那么事情就变的有意思了,咱们知道Enumerable的数据能够任意的方式组合,因而产生了像LINQ之类的库可供咱们使用,可是这是一种阻塞的方式,由于Iterator自己就是一种Pull模型,这造就了同步等待的结果。

没错你是对的,若是使用EF之类的框架来查询数据库,大部分的操做是延迟执行的,代表操做并无发生而是像占位符同样在那里。可是别忘了,你最终须要去查询数据库的,在查询的一刹那,世界仍是阻塞的,等结果吧亲。

而Observer是异步Push的,有点像是事件驱动,有事件了触发,没事件了也不干扰订阅者的执行。

你是否是也隐隐的以为事件也能够和Push模式同样有统一的模型?并且不仅一次?

好,咱们重复一遍:事件,非阻塞触发(并带有事件数据)。Push,非阻塞通知订阅者。

其实,这是同一种模式,语言中对事件(就是event关键字)的支持其实就是对Observer模式的支持,而foreach则实现了对Iterator模式的语言内建支持。所谓设计模式,就是由于语言的内建支持不够而出现的,说白了,是语言的补丁。

那么咱们来看一看异常强大的Rx如何改变事件。

1 // unitized event
2 var mouseDown = Observable
3     .FromEventPattern<MouseEventArgs>(this.myPictureBox, "MouseDown")
4     .Select(x =>x.EventArgs);
5 
6 // unitized APM model
7 var request = WebRequest.Create("http://www.shinetechchina.com");
8 var webRequest = Observable
9     .FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse);
 1 ar mouseDown = Observable
 2     .FromEventPattern<MouseEventArgs>(this.controlSource, "MouseDown")
 3     .Select(x => x.EventArgs.GetPosition(this));
 4 var mouseUp = Observable
 5     .FromEventPattern<MouseEventArgs>(this.controlSource, "MouseUp")
 6     .Select(x => x.EventArgs.GetPosition(this));
 7 var mouseMove = Observable
 8     .FromEventPattern<MouseEventArgs>(this.controlSource, "MouseMove")
 9     .Select(x => x.EventArgs.GetPosition(this));
10 var dragandDrop = 
11     from down in mouseDown
12     from move in mouseMove.StartWith(down).TakeUntil(mouseUp)
13     select new {
14         X = move.X - down.X,
15         Y = move.Y - down.Y
16     };
17 
18 dragandDrop.Subscribe(value =>
19 {
20     DesktopCanvas.SetLeft(this.controlSource, value.X);
21     DesktopCanvas.SetTop(this.controlSource, value.Y);
22 });

Rx也提供了javascript扩展,有兴趣能够深刻研究。

(完)

ar mouseDown =Observable.FromEventPattern<MouseEventArgs>(this.controlSource,"MouseDown").Select(x => x.EventArgs.GetPosition(this));var mouseUp =Observable.FromEventPattern<MouseEventArgs>(this.controlSource,"MouseUp").Select(x => x.EventArgs.GetPosition(this));var mouseMove =Observable.FromEventPattern<MouseEventArgs>(this.controlSource,"MouseMove").Select(x => x.EventArgs.GetPosition(this));var dragandDrop =from down in mouseDown from move in mouseMove.StartWith(down).TakeUntil(mouseUp)selectnew{ X = move.X - down.X, Y = move.Y - down.Y }; dragandDrop.Subscribe(value =>{DesktopCanvas.SetLeft(this.controlSource, value.X);DesktopCanvas.SetTop(this.controlSource, value.Y);});
相关文章
相关标签/搜索