TaskCpp是c++11开发的一个跨平台的并行task库,它的设计思路来源于微软的并行计算库ppl和intel的并行计算库tbb,关于ppl和tbb我在前面有介绍。既然已经有了这两个大公司开发的并行计算库,我为何还要开发本身的并行计算库。有两个缘由:html
TaskCpp在接口设计上尽可能和ppl保持一致,由于我以为ppl的接口很好很强大。所以,TaskCpp的接口用法和语义和ppl基本是一致的。由于TaskCpp是一个轻量级的task库,总共也不过三百多行代码,本着简单够用的原则,只提供了一些和ppl相似的经常使用用法, 有些不经常使用的特性不考虑支持。好比,不支持任务的取消,由于加入任务的取消会致使增长不少复杂性,而实际中用得比较少,因此不考虑支持,够用就好。java
须要支持c++11的编译器,建议编译器:linux
使用TaskCpp仅仅须要包含头文件便可,在程序中使用只须要包含#include <TaskCpp.h>和少许的boost头文件便可。c++
TaskCpp提供一下功能:算法
Task会建立一个异步操做,这个异步操做发起方式是延迟加载方式发起的,即在调用Task的Wait或者Get时才真正发起异步操做。Task能够经过std::function或者lambda表达式去建立,不支持直接原生函数建立,若是要用原生函数须要先经过lambda或者std::function包装一下。Task的Wait接口只是等待异步操做结束。Task的Get接口接收参数并等待异步操做结束并返回结果。PPL中的get接口是不能接收参数的,TaskCpp的Get接口是能够接受任意参数的,更灵活一点,算是较PPL的一个小优势吧。下面是Task的基本用法:windows
#include <TaskCpp.h> using namespace Cosmos; void TestTask() { Task<void()> task([]{cout << 1 << endl; }); task.Wait(); Task<void()> task1 = []{cout << 1 << endl; }; task1.Wait(); Task<int()> task2 = []{cout << 1 << endl; return 1; }; cout << task2.Get() << endl; Task<int(int)> task3 = [](int i){cout << i << endl; return i; }; cout << task3.Get(3) << endl; }
WhenAll保证一个任务集合中全部的任务完成。WhenAll函数会生成一个任务,该任务可在完成一组任务以后完成。 此函数可返回一个std::vector 对象,该对象包含集合中每一个任务的结果。 如下基本示例使用WhenAll建立表示完成其余三个任务的任务。下面是WhenAll的基本用法:并发
void PrintThread() { cout << std::this_thread::get_id() << endl; std::this_thread::sleep_for(std::chrono::milliseconds(5)); } void TestWhenAll() { vector<Task<int()>> v = { Task<int()>([]{PrintThread(); std::this_thread::sleep_for(std::chrono::seconds(5)); return 1; }), Task<int()>([]{PrintThread(); return 2; }), Task<int()>([]{PrintThread(); return 3; }), Task<int()>([]{PrintThread(); return 4; }) }; cout << "when all " << endl; WhenAll(v).Get(); }
注意:WhenAll是非阻塞的,它只是建立一个任务,在Wait或Get时才发起异步操做。传递给WhenAll的任务必须是统一的。 换言之,它们必须都返回相同类型。框架
WhenAny在任务集合中任意一个任务结束以后就返回。函数会生成一个任务,该任务可在完成一组任务的第一个任务以后完成。 此函数可返回一个 std::pair 对象,该对象包含已完成任务的结果和集合中任务的索引。下面是WhenAny的基本用法:异步
void TestWhenAny() { vector<Task<int()>> v = { Task<int()>([]{PrintThread(); std::this_thread::sleep_for(std::chrono::seconds(5)); return 1; }), Task<int()>([]{PrintThread(); return 2; }), Task<int()>([]{PrintThread(); return 3; }), Task<int()>([]{PrintThread(); return 4; }) }; cout << "when any " << endl; WhenAny(v).Then([](std::pair<int, int>& result) { cout << " index " << result.first << " result " << result.second << endl; return result.second; }).Then([](int result){cout << "any result: " << result << endl; }).Get(); }
注意:WhenAny是非阻塞的,它只是建立一个任务,在Wait或Get时才发起异步操做。传递给WhenAny的任务必须是统一的。 换言之,它们必须都返回相同类型。ide
TaskGroup能够并行的处理一组任务,TaskGroup能够接受多个task或者function,TaskGroup的Wait等待全部任务完成。下面是TaskGroup的基本用法:
void TestTaskGroup() { Task<int()> t1([]{PrintThread(); return 1; }); Task<double()> t2([]{PrintThread(); return 2.123; }); Task<void()> t3([]{PrintThread(); }); Task<string()> t4([]{PrintThread(); return "ok"; }); TaskGroup group; group.Run(t1); group.Run(t2); group.Run(t3); group.Run(t4); //若是你以为这样一个一个Run加入任务,你也能够一块儿Run group.Run(t1, t2, t3, []{PrintThread(); return 1; }); group.Wait(); }
PPL的task_group的任务只能是void()形式的,TaskCpp容许一些简单类型的任务如int()、double()、string()等,其实任务的返回类型没有实际意义,由于Wait没有返回值,这里支持多种返回类型的任务只不过是为了减小一点限制,用起来稍微方便一点罢了。PPL加入任务只能一个一个Run,要加入多个任务时有点繁琐,TaskCpp能够一次Run多个任务,比PPL要方便一些。这两点算是较PPL的两个小优势吧。
ParallelForeach算法与 STL std::for_each 算法相似,只是 parallel_for_each 算法并发执行任务。用法比较简单:
bool check_prime(int x) // 为了体现效果, 该函数故意没有优化. { for (int i = 2; i < x; ++i) if (x % i == 0) return false; return true; } void TestParallelFor() { vector<int> v; for (int i = 0; i < 100000; i++) { v.push_back(i + 1); } boost::timer t; ParallelForeach(v.begin(), v.end(), check_prime); ParallelForeach(v.begin(), v.end(), check_prime); cout << "taskcpp: " << t.elapsed() << endl; }
ParallelInvoke算法并行执行一组任务。 在完成全部任务以前,此算法不会返回。 当您须要同时执行多个独立的任务时,此算法颇有用。ParallelInvoke和TaskGroup的做用是同样的。用法比较简单:
void TestParaInvoke() { auto f = []{cout << "1" << endl; return 1; }; ParallelInvoke(f, []{cout << "2" << endl; }); }
ParallelReduce算法在实际应用中比较经常使用,有点相似于map-reduce,能够并行的对一个集合进行reduce操做。ParallelReduce的用法稍微复杂一点,它的原型:
第一个参数是集合,第二个参数是算法的初始值,第三个参数rangeFunc是一个声称中间结果的函数,第四个参数是中间结果的汇聚函数。若是调用ParallelReduce(range,init, reduceFunc),则表示rangeFunc和reduceFunc是一个函数。
下面的例子是并行的计算100000000个整数的和:
void TestParallelSum() { vector<int> v; const int Size = 100000000; v.reserve(Size); for (int i = 0; i < Size; i++) { v.push_back(i + 1); } int i = 0; boost::timer t; auto r = ParallelReduce(v, i, [](const vector<int>::iterator& begin, vector<int>::iterator&end, int val) { return std::accumulate(begin, end, val); }); cout << t.elapsed() << " " << r << endl; }
下面是并行查找最长的字符串的例子:
void TestFindString() { vector<string> v; v.reserve(10000000); for (int i = 0; i < 10000000; i++) { v.emplace_back(std::to_string(i + 1)); } string init = ""; auto f = [](const vector<string>::iterator& begin, vector<string>::iterator&end, string& val) { return *std::max_element(begin, end, [](string& str1, string& str2){return str1.length()<str2.length(); }); }; boost::timer t; auto r = ParallelReduce(v, init, f, f); cout << t.elapsed() << " " << r << endl; }
用四个测试用例对比测试了tbb、ppl、TaskCpp和单线程的性能。下图是测试对比的结果:
能够看到TaskCpp的性能比单线程效率要高,整体上也优于ppl和tbb,其中ppl和tbb在某些场景下性能还不如单线程高,因此在使用时要以实际测试数据为准,并非一用并行库效率就能提升。
遵循LGPL(GNU General Public License)协议。
TaskCpp是一个任务库,不是线程池,每启动一个task就会建立一个线程,若是须要线程池能够看这里。
若是发现问题或者有什么建议请给我留言或者发邮件qicosmos@163.com .
c++11 boost技术交流群:296561497,欢迎你们来交流技术。
TaskCpp的开发和测试花费了我两三周的时间,开发之初我就计划将其开源,我但愿更多的人能用起来并推广TaskCpp,促进它的发展。曾经有人问我,为何坚持发原创文章分享技术,是否是有什么好处。我一会儿还真答不上来,由于我根本就没有想过有啥好处,如今再想一下,好处嘛,分享的术也许对别人学习有帮助吧。再想一想我这样作的缘由,一个缘由是兴趣,这要感谢c++11,是c++11让我以为c++语言是很是有意思和有魅力的语言,总能带给人惊喜,没有c++11我也不可能完成TaskCpp库。还有就是一点点分享快乐的精神,我分享我快乐。最重要的缘由是一点点梦想,c++中开源库太少了,不少框架和基础库都还不够,远远赶不上java,因此在使用和推广上不如java。可是我有一点梦想:我但愿经过本身的一点努力能让c++的世界变得更加美好,能让c++开发者的日子变得美好。是的,正是这个梦想促使我将我开发的大部分代码都开源出来!也正是这个梦想促使我坚持写原创博客分享技术!