最近在看asio
相关的东西,看到有人说:html
只要稍微了解 Asio 的人都知道,在一个 io_service 上开几个线程后, 接下来就只要简单的使用 io_service.post() 便可投递一个闭包给线程去执行的. 这是一个自然的线程池.还能够同 io 操做复用你的线程.中止发明垃圾的线程池实现吧.c++
以前也参照c++11的语法写了一个简单的线程池,为了让开启的线程在没有任务跑的时候睡眠,主要用到了同步原语
中的互斥信号量和条件变量(任务来了发送条件变量用于唤醒线程),实现起来还挺麻烦.程序员
asio做为异步输入输出库,全部对象都提供了异步调用的函数,若是asio
能够实现,有一点能够确定,线程池中的异步处理基本上不用程序员关注了, 到底有多简单呢. 官方给出了基于boost::asio的实现,相应的线程/bind等组件我替换成了c++11(实际上随着c++11/14大量引入boost的东西,最新的asio已经能够不依赖boost的头文件了,boostsystem等连接库仍是须要的),我来学习记录一下加深理解.闭包
io_service
的方法run()
能够阻塞的等待在io_service上的异步操做,异步操做所有执行完以后结束, 那么使用io_service::work
控制run()使其持续等待由应用投递过来的任务,保证线程持续运行,同时开启N个线程来干这个活儿,也就是保证了线程资源只有一次申请和释放. 用到的boost::asio
类/操做以下:异步
io_service
之上任务的起始和结束,声明即开始,做用域结束后自动释放,告诉io_service任务都结束了. 不过对于线程池的实现来讲,只用了构造函数,告诉io_service要持续运行,为了让client能够主动释放,这里没有使用work的析构函数,而是调用了io_service提供的stop()方法,让run()结束进而释放线程资源.代码以下:async
using namespace std; using namespace boost; typedef std::shared_ptr<std::thread> thread_ptr; typedef std::vector<thread_ptr> vecThread; class ThreadPool { public: ThreadPool(int num) : threadNum_(num), stopped_(false), work_(io_) { for(int i=i; i<threadNum_; ++i) { threads_.push_back(std::make_shared<std::thread>([&](){io_.run();})); } } ~ThreadPool() { stop(); } template<typename F, typename...Args> void post(F &&f, Args&&...args) { io_.post(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); } void stop() { if(!stopped_) { io_.stop(); for(auto t : threads_) t->join(); stopped_ = true; } } private: bool stopped_; vecThread threads_; int threadNum_; asio::io_service io_; asio::io_service::work work_; };
首先,线程池的构造函数中先根据入参(线程池个数)启动N个线程,每一个线程池调用io_service的run方法,在调用以前,初始化了asio::io_service::work
,意味着全部线程run方法在work没有析构或者io_service没有主动结束的时候必定持续等待运行任务.函数
// 在io_service run以前,要用同一个io_service初始化work. 因为std::bind和boost::bind实现方式不同(boost:bind支持了不少重载),使用std::bind会编译报错,所以用lambda代替 threads_.push_back(std::make_shared<std::thread>([&](){io_.run();}));
此时,线程池已经启动,只须要封装post调用,便可在应用侧灌入任何类型的异步操做.post
template<typename F, typename...Args> void post(F &&f, Args&&...args) { io_.post(std::bind(std::forward<F>(f), std::forward<Args>(args)...)); }
最后提供客户端中止线程池的接口,全部io_service将结束线程,线程池资源释放:学习
void stop() { if(!stopped_) { io_.stop(); for(auto t : threads_) t->join(); stopped_ = true; } }
void test1(int x) {std::cout<<"test 1:"<<x<<std::endl;} void test2(int y) {std::cout<<"test 2:"<<y<<std::endl;} int main() { ThreadPool threads(5); threads.post([](){std::cout<<"test 1"<<std::endl;}); threads.post([](){std::cout<<"test 2"<<std::endl;}); threads.post(test1, 3); threads.post(test2, 5); std::this_thread::sleep_for(2s); threads.stop(); }
A thread pool for executing arbitray tasks
https://stackoverflow.com/que...this