浅析pplx库的设计与实现。

主要有三部分组成,threadpool,scheduler,task。linux

 

 三者关系如上图示,pplx只着重实现了task部分功能,scheduler跟threadpool只是简略实现。app

 threadpool主要依赖boost.asio达到跨平台的目标,cpprestsdk的 io操做同时也依赖这个threadpool。async

pplx提供了两个版本的scheduler,分别是函数

linux_scheduler依赖boost.asio.threadpool。oop

window_schedule依赖win32 ThreadPool。this

默认的scheduler只是简单地将work投递到threadpool进行分派。spa

用户能够根据本身须要,实现scheduler_interface,提供复杂的调度。线程

 

每一个task关联着一个_Task_impl实现体,一个_TaskCollection_t(唤醒事件,后继任务队列,这个队列的任务之间的关系是并列的),还有一个_PPLTaskHandle代码执行单元。设计

task,并行执行的单位任务。经过scheduler将代码执行单元调度到线程去执行。3d

task提供相似activeobject模式的功能,能够看做是一个future,经过get()同步阻塞等待执行结果。

task提供拓扑模型,经过then()建立后续task,并做为后继执行任务。注意的是每一个task能够接受不限数量的then(),这些后继任务之间并不串行。例 task().then().then()串行,(task1.then(), task1.then())并行。一个任务在执行完成时,会将结果传递给它的全部直接后继执行任务。

 

此外,task拓扑除了then()函数外,还能够在执行lambda中添加并行分支,而后能够在后继任务中同步这些分支。

也就是说后继任务同步本来task拓扑外的task拓扑才能继续执行。

 1 auto fork0 =
 2      task([]()->task<void>{
 3         auto fork1 = 
 4  task([]()->task<void>{ 5 auto fork2 = 6  task([](){ 7 // do your fork2 work 8 9  }); 10 // do your fork1 work 11 12 return fork2; 13 }).then([](task<void>& frk2){ frk2.wait(); }); // will sync fork2 14 // do your fork0 work 15 16 return fork1; 17 }).then([](task<void>& frk1){ frk1.wait(); }); // will sync fork1 18 fork0.wait(); // sync fork1, fork2

上面的方式有一个问题,若是里层的fork先完成,将不要阻塞线程,可是外层fork先完成就不得不阻塞线程等待内层fork完成。

因此能够用when_all

task<task<void> >([]()->task<void> {
    std::vector<task<void> > forks; forks.push_back( task([]() { /* do fork0 work */ }) ); forks.push_back( task([]() { /* do fork1 work */ }) ); forks.push_back( task([]() { /* do fork2 work */ }) ); forks.push_back( task([]() { /* do fork3 work */ }) ); return when_all(std::begin(forks), std::end(forks)); }).then([](task<void> forks){ forks.wait(); }).wait();

经过上面的方式,也能够在lambda中,将其它task拓扑插入到你原来的task拓扑。

在include/cpprest/atreambuf.h实现的_do_while就是这样一个例子

template<class F, class T = bool>
pplx::task<T> _do_while(F func)
{
    pplx::task<T> first = func();
    return first.then([=](bool guard) -> pplx::task<T> {
        if (guard)
            return pplx::details::_do_while<F, T>(func);
        else
            return first;
    });
}

若是func不返回一个false,就会无限地在first.then()这两任务拓扑结束前,再插入多一个first.then()任务拓扑,无限地顺序地执行下去,如loop同样地进行。

  

task结束,分两种状况,完成以及取消。取消执行,只能在执行代码时经过抛出异常,task并无提供取消的接口。任务在执行过程当中抛出的异常,就会被task捕捉,并暂存异常,而后取消执行。异常在wait()时从新抛出。下面的时序分析能够看到全过程 。

 

 

值得注意的是,PPL中task本来的设计是的有Async与Inline之分的。在_Task_impl_base::_Wait()有一小段注释说明

// If this task was created from a Windows Runtime async operation, do not attempt to inline it. The
// async operation will take place on a thread in the appropriate apartment Simply wait for the completed
// event to be set.
            
                

也就是task除了由scheduler调度到线程池分派执行,还能够强制在wait()函数内分派执行,后继task也没必要再次调度而能够在当前线程继续分派执行。可是pplx没有实现

class _TaskCollectionImpl
{
    ...
    void _Cancel() { // No cancellation support  } void _RunAndWait() { // No inlining support yet  _Wait(); }

 

如今再来比较 task<_ReturnType> 与 task< task<_ReturnType > >,当一个前驱任务抛出异常停止后,若是前驱任务是task<_ReturnType>的话,后续任务的lambda参数就是_ReturnType,由后续任务执行_Continue时代为执行了前驱任务的get(),这时就会rethrow异常,而后就直接停止后续任务。可是若是后续任务的lambda参数是task<_ReturnType>的话,用户的lambda就有机会处理前驱任务的错误异常。因此就有了 task_from_result<_ReturnType>跟task_from_exception两个函数,将结果或异常转化成task,以符合后续任务的lambda的参数要求。

 

 下面是对task的时序分析。

开始的task建立_InitialTaskHandle, 一种只能用于始首的Handle执行单元。

 

经过then()添加的task,建立_ContinuationTaskHandle,(一种能够入链的后继执行单元),并暂存起来。

 

当一个任务在线程池中分派结束时,就会将全部经过then()添加到它结尾的后继任务一次过向scheduler调度出去。

任务只能经过抛出异常从而本身停止执行,task并暂存异常(及错误信息)。

 

 

 后继任务被调度到线程池继续分派执行。

 

这里顺便讨论一个开销,在window版本中,每一个task都有一个唤醒事件,使用事件内核对象,都要建立释放一个内核对象,在高并行任务时,可能会消耗过多内核对象,消耗句柄数。

而且continuation后继任务,在默认scheduler调度下,不会在同一线程中分派,全部后继任务都会简单投递到线程池。由线程池去决定分派的线程。因此由then()串行起来的任务可能会由不一样的线程顺序分派,从而产生开销。由于pplx并无实现 Inline功能,全部task都会视做Async从新调度到线程池。

相关文章
相关标签/搜索