线程池介绍
服务器完成一项任务的时间可分为:T1:建立线程或进程时间;T2:执行任务时间;T3:销毁进程或线程时间。一般T1+T3的时间大于T2,线程池正是关注如何缩短T1和T3的时间。
线程池经过在系统中预先建立必定数量的线程,当任务请求到来时从线程池中分一个预先建立的线程去处理任务,线程在处理完任务后还能够重用,不会销毁,继续等待下次任务的到开。这样能避免大量的线程建立和销毁操做,从而节省系统资源;同时有不少任务时,也会减小建立线程的数量。
用C++11的线程相关特性,好比线程、条件变量、互斥量,让咱们编写并发程序更简单。
linux
半同步半异步线程池实现的关键技术分析
线程池又三层组成:
1. 同步服务层:不断的将新任务添加到同步队列中,能够用多路复用或者多线程来完成。一开始没看懂任务是什么,其实一个函数就是一个任务,C++11经过std::function将函数封装为类模板对象,能够将这些任务(函数)放到容器中保存起来,以进行添加读取任务操做。
2. 排队层:就是一个同步队列,处于核心地位。全部待处理的任务都存在这里,要保证队列中共享数据线程安全(加锁),还要控制任务的数量,上层服务层往队列添加任务,下层从这里取任务去执行。
3. 异步服务层: 预先建立好线程,来并行处理队列中的任务。
ios
本身画了一张图:安全
代码实现与分析
用到了不少C++11的特性,里面写了不少注释,是根据本身理解分析的。服务器
同步队列:多线程
#include <iostream> #include <string> #include <stack> #include <vector> #include <algorithm> #include <cstdio> #include <list> #include <thread> #include <mutex> #include <condition_variable> using namespace std; template<typename T> class Syncqueue { public: //初始化,队列的最大元素个数,开始不终止 Syncqueue( int maxsize ) : m_maxsize(maxsize),m_needStop(false){} //往队列中添加任务,重载两个版本,左值和右值引用 void Put( const T& x ) { Add(x); } void Put(T&& x) { Add(forward<T>(x)); } //Take和Add相似 void Take(list<T>& list) { unique_lock<mutex> locker(m_mtx); m_notEmpty.wait(locker, [this] { return m_needStop|| NotEmpty(); });//中止或者不空就继续执行,不用wait if(m_needStop) return ; //一次加锁,一下取出队列中的全部数据 list = move(m_queue); //经过移动,将 m_queue 转移到 list,而不是拷贝 m_notFull.notify_one(); //唤醒线程去添加任务 } //每次获取一个数据,效率较低 void Take(T& t) { unique_lock<mutex> locker(m_mtx); m_notFull.wait(locker, [this] { return m_needStop || NotEmpty(); }); if(m_needStop) return ; t = m_queue.front(); //取出一个 m_queue.pop_front(); m_notFull.notify_one(); } //方便让用户能终止任务 void Stop() { { lock_guard<mutex> locker(m_mtx); m_needStop = true; //将须要中止标志 置为 true //执行到这,lock_guard释放锁 } //唤醒全部等待的线程,到if(m_needStop)时为真,而后相继退出 m_notEmpty.notify_all(); //被唤醒的线程直接获取锁 m_notFull.notify_all(); } //判断队列是否为空 bool Empty() { lock_guard<mutex> locker(m_mtx); return m_queue.empty(); } //判断队列满了 bool Full() { lock_guard<mutex> locker(m_mtx); return m_queue.size() == m_maxsize; } //队列大小 size_t Size() { lock_guard<mutex> locker(m_mtx); return m_queue.size(); } private: //队列未满 bool NotFull() const { bool full = m_queue.size() >= m_maxsize; if(full) cout << "缓冲区满了,须要等待…… " << endl; return !full; } //队列不空 bool NotEmpty() const { bool empty = m_queue.empty(); if(empty) cout << "缓冲区空了,须要等待…… 异步层的线程id: " << this_thread::get_id() << endl; return !empty; } //范型事件函数 template<typename F> void Add(F&& x) { unique_lock<mutex> locker(m_mtx); m_notFull.wait(locker,[this]{ return m_needStop|| NotFull(); }); //须要中止 或者 不满则继续往下执行,不然wait if(m_needStop) return; //若是须要终止就 return m_queue.push_back(forward<F>(x)); //不终止,把任务添加到同步队列 m_notEmpty.notify_one(); //提醒线程队列不为空,唤醒线程去取数据 } private: list<T> m_queue; //缓冲区 用链表实现 mutex m_mtx; //互斥量 condition_variable m_notEmpty; //不为空的条件变量 condition_variable m_notFull; //没有满的条件变量 int m_maxsize; //同步队列最大的size bool m_needStop; //中止的标志,开始是false };
线程池:并发
/************************************************************************* > File Name: ThreadPool.h > Author: Tanswer > Mail: duxm@xiyoulinux.org > Created Time: 2017年08月10日 星期四 14时46分51秒 ************************************************************************/ #include <iostream> #include <string> #include <stack> #include <vector> #include <list> #include <algorithm> #include <cstdio> #include <thread> #include <mutex> #include <atomic> #include <condition_variable> #include <functional> #include "Syncqueue.h" using namespace std; const int MaxTaskCount = 100; //最大任务数量 class ThreadPool { public: using Task = function<void()>; //任务类型,这里是无参数无返回值,能够修改成任何类型的范型函数模板 //hardware_concurrency CPU核数 当默认线程数 ThreadPool(int numThreads = thread::hardware_concurrency()) : m_queue(MaxTaskCount) { Start(numThreads); //启动 } ~ThreadPool() { Stop(); //若是没有中止时,则主动终止线程池 } //终止线程池,销毁池中全部线程 void Stop() { //保证多线程状况下只调用一次StopThreadGroup call_once(m_flag, [this] { StopThreadGroup(); }); } //同步服务层:往同步队列中添加任务,两个版本 void AddTask(Task&& task) { m_queue.Put(forward<Task>(task)); } void AddTask(const Task& task) { m_queue.Put(task); } private: void Start( int numThreads ) //线程池开始,预先建立包含numThreads 个线程的线程组 { m_running = true; //建立线程组 for(int i=0; i<numThreads; i++) { //智能指针管理,给出线程函数&ThreadPool::RunInThread 和对应参数this m_threadgroup.push_back( make_shared<thread>(&ThreadPool::RunInThread, this) ); } } void RunInThread() { while(m_running) { //一次取出队列中全部任务 list<Task> list; m_queue.Take(list); for(auto& task : list) { if(!m_running) //若是中止 return ; task(); //执行任务 } } } //终止线程池,销毁池中全部线程 void StopThreadGroup() { m_queue.Stop(); //让同步队列中的线程中止 m_running = false; //让内部线程跳出循环并退出 for(auto thread : m_threadgroup) { if(thread) thread -> join(); } m_threadgroup.clear(); } private: list<shared_ptr<thread>> m_threadgroup; //处理任务的线程组,用list保存 Syncqueue<Task> m_queue; //同步队列 atomic_bool m_running; //是否中止的标志 once_flag m_flag; //call_once的参数 };
测试例子:异步
/************************************************************************* > File Name: TestThreadPool.cpp > Author: Tanswer > Mail: duxm@xiyoulinux.org > Created Time: 2017年08月10日 星期四 16时49分35秒 ************************************************************************/ #include <iostream> #include <string> #include <stack> #include <vector> #include <algorithm> #include <cstdio> #include <thread> #include "ThreadPool.h" using namespace std; void TestThreadPool() { ThreadPool pool(2); //线程池建立两个线程,异步层此时无任务须要先等待 //pool.Start(2); //建立两个同步层的线程不断往线程池中添加任务 //在这任务很简单,打印同步层线程ID,用lambda表达式表示,每一个线程处理10个任务 thread thd1( [&pool]{ for(int i=0; i<10; i++ ) { auto thdId = this_thread::get_id(); pool.AddTask( [thdId]{ cout << "同步层线程1的线程ID: " << thdId << endl; } ); } } ); thread thd2( [&pool]{ for( int i=0; i<10; i++ ) { auto thdId = this_thread::get_id(); pool.AddTask( [thdId]{ cout << "同步层线程2的线程ID: " << thdId << endl; } ); } } ); this_thread::sleep_for(chrono::seconds(2)); getchar(); //中止线程池 pool.Stop(); //等待同步层的两个线程执行完 thd1.join(); thd2.join(); } int main() { TestThreadPool(); exit(EXIT_SUCCESS); }
测试结果:函数
线程池预先建立了两个线程,线程ID分别为: 140141544822528 和 140141536429824,开始时同步队列是空的,尚未任务,因此两个线程都等待。而后建立了两个同步层线程1和2,线程ID分别为:140141528037120 和 140141519644416,这两个线程开始不断往线程池同步队列中添加任务。有了任务后,线程池异步层中的两个线程开始处理任务,任务很简单,就是打印同步层线程ID,异步层线程交替处理上层的任务。测试