在muduo的one loop per thread + thread pool模型中,线程和线程池应该是其中最基础也是最重要的两个组件了。因此,本文深刻代码,学习Thread和ThreadPool两个类的结构和实现。函数
Thread类
__thread关键字
学习Thread class以前,先了解一个关键字的用法:__thread。
__thread是GCC内置的线程局部存储设施。它的实现很是高效,比Pthread库中的pthread_key_t(muduo中ThreadLocal)快不少。__thread变量是表示每一个线程有一份独立实体,各个线程的变量值互不干扰。__thread只能修饰POD类型,不能修饰class类型,由于没法自动调用构造函数和析构函数。
Thread类的封装用到了命名空间CurrentThread,这个空间中定义了和线程相关的一些独立属性。oop
namespace CurrentThread { __thread int t_cachedTid = 0; __thread char t_tidString[32]; __thread int t_tidStringLength = 6; __thread const char* t_threadName = "unknown"; }
其中,t_cachedTid表示线程的真实id,Pthread库中提供了pthread_self()获取当前线程的标识,类型为pthread_t。可是,pthread_t不必定是数值类型,也多是一个结构体,这带来了一些问题。
(1)没法打印输出pthread_t,由于不知道其确切类型。
(2)没法比较pthread_t大小或计算hash值。
(3)pthread_t值只在进程内有意义,与操做系统的任务调度之间没法创建有效关系,Pthread库只能保证在同一进程以内,同一时刻的各个线程的id不一样。
因此,muduo采用gettid()系统调用的返回值做为线程id,muduo中将操做封装为gettid()函数。可是,咱们知道,调用系统调用开销比较大,因此,muduo中采用__thread变量t_cachedTid来存储,在线程第一次使用tid时经过系统调用得到,存储在t_cachedTid中,之后使用时再也不须要系统调用了。
t_tidString[32]:用string类型表示tid,以便输出日志。
t_tidStringLength:string类型tid的长度。
t_threadName:线程的名字。post
相关的数据结构
在Thread的实现中,还用到了两个数据结构,一个位ThreadData,用来辅助调用线程执行的函数。另外一个为ThreadNameInitializer,为线程的建立作环境准备。其中用到了pthread_atfork(NULL,NULL,&afterfork)
。该函数的原型为int pthread_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void));
这是一个跟进程建立有关的函数,为fork的调用作准备和调用后子进程父进程的初始化。prepare函数在调用fork前执行,parent在调用fork后的父进程中执行,child在调用fork后的子进程中执行。
固然,在实际应用中,多线程不要调用fork(),不然会出现一些问题。由于fork智能克隆当前线程的thread of control,却不克隆其余线程。fork()以后,除了当前线程以外,其余线程都消失了。这样,会出现不少问题。好比,若是复制了一个lock的mutex,却没有复制unlock的线程,那么在给mutex加锁时就会出现死锁。学习
Thread类分析
首先来看Thread类的数据成员和构造函数。this
class Thread : boost::noncopyable { typedef boost::function<void ()> ThreadFunc; ... private: bool started_; bool joined_; pthread_t pthreadId_; boost::shared_ptr<pid_t> tid_; ThreadFunc func_; string name_; static AtomicInt32 numCreated_; }; Thread::Thread(ThreadFunc&& func, const string& n) : started_(false), joined_(false), pthreadId_(0), tid_(new pid_t(0)), func_(func), name_(n) { setDefaultName(); }
其中有两处须要说明一下,首先是shared_ptr<pid> tid_
,可能有人会有疑问:为何这里要用shared_ptr包装pid?
缘由是tid_所属的对象Thread在主线程(A)中建立,而tid_须要在新建立的线程B中进行赋值操做,若是tid使用裸指针的方式传递给线程(B),那么线程A中Thread对象析构(下文)销毁后,线程B持有的就是一个野指针,因此,在Thread对象中将以shared_ptr包装。
而后是numCreated_,是一个静态变量,类型为AtomicInt32,原子类型,用来表示第几回建立线程实例,在记录日志时可用记录为:“线程名+numCreated_”。spa
接下来看Thread的一些接口函数。操作系统
void Thread::start() { started = true; detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_); if(pthread_create(&pthreadId_, NULL, &detail::startThread, data)); { started_ = false; delete data; LOG_SYSFATAL << "Failed in pthread_create"; } }
Thread::start()将调用pthread_create()建立新线程,detail::startThread()是新线程的入口函数,data是新线程执行的辅助结构体。detail::startThread()调用data->runInThread()执行线程逻辑(func_)。.net
int Thread::join() { assert(started_); assert(!joined_); joined_ = true; return pthread_join(pthreadId_, NULL); } Thread::~Thread() { if(started_ && !joined_) { pthread_detach(pthreadId_); } }
Thread析构的时候没有销毁持有的Pthreads句柄(pthread_t),也就是说Thread的析构不会等待线程结束。若是Thread对象的生命期长于线程,而后经过Thread::join()来等待线程结束并释放线程资源。若是Thread对象的生命期短于线程,那么析构时会自动detach线程,避免了资源泄露。
ThreadPool类
ThreadPool(线程池)本质上是一个生产者-消费者的模型,在实际中主要完成计算任务。在muduo线程池中有一个存放工做线程的容器ptr_vector,至关于消费者;有一个存听任务的队列deque。
任务队列是有界的,相似于BoundedBlockingQueue,实现时须要两个条件变量。
如下是ThreadPool的数据成员:
class ThreadPool : boost::noncopyable { typedef boost::function<void ()> Task; private: MutexLock mutex_; Condition notEmpty_; Condition notFull_; string name_; Task threadInitCallback_; boost::ptr_vector<muduo::Thread> threads_; std::deque<Task> queue_; size_t maxQueueSize_; bool running_; }
其中threadInitCallback_可由setThreadInitCallback(const Task& cb)设置,设置回调函数,每次在执行任务前先调用。在线程池开始运行以前,须要先设置任务队列的大小(调用setMaxQueueSize()),由于运行线程池时,线程会从任务队列取任务。
接下来是ThreadPool的一些接口函数。
void ThreadPool::start(int numThreads) { assert(threads_.empty()); running_ = true; threads_.reserve(numThreads); for (int i = 0; i < numThreads; ++i) { char id[32]; snprintf(id, sizeof id, "%d", i+1); threads_.push_back(new muduo::Thread( boost::bind(&ThreadPool::runInThread, this), name_+id)); threads_[i].start(); } if(numThreads == 0 && threadInitCallback_) { threadInitCallback_(); } }
void ThreadPool::start(int numThreads)
开启线程池,按照线程数量numThreads_建立工做线程,线程函数为ThreadPool::runInThread()。
void ThreadPool::runInThread() { try { if(threadInitCallback_) { threadInitCallback_(); } while(running_) { Task task(take()); if(task) { task(); } } } catch(const Exception& ex) { ... } }
若是设置了threadInitCallback_,则进行执行任务前的一些初始化操做。而后从任务队列中取任务执行,有可能阻塞,当任务队列为空时。
ThreadPool::Task ThreadPool::take() { MutexLockGuard lock(mutex_); while(queue_.empty() && running_) { notEmpty_.wait(); } Task task; if(!queue_.empty()) { task = queue_.front(); queue_.pop_front(); if(maxQueueSize_ > 0) { notFull_.notify(); } } return task; }
多线程从消息队列中取任务的时候,须要加锁保护。等到队列非空信号,就取任务。取出以后,便告知任务队列已经非满,能够继续添加任务。
void ThreadPool::run(const Task& task) { if(threads_.empty()) { task(); } else { MutexLockGuard lock(mutex_); while(isFull()) { notFull_.wait(); } assert(!isFull()); queue_.push_back(task); notEmpty_.notfy(); } }
若是ThreadPool没有子线程(set和start操做在run以前),就在主线程中执行该task,不然,将任务加入到队列,并通知线程从中取task,若是队列已满,便等待。
ThreadPool::~ThreadPool() { if(running_) { stop(); } } void ThreadPool::stop() { { MutexLockGuard lock(mutex_); running_ = false; notEmpty_.notifyAll(); } for_each(threads_.begin(),threads_.end(),boost::bind(&muduo::Thread::join, _1)); }
最后是ThreadPool的析构函数,在其中调用stop(),唤醒全部等待的线程,而后对线程池中的每个线程执行join()。
总结
以上就是muduo中Thread和ThreadPool类的学习,有不少源码,有点啰嗦。可是,在muduo的one loop per thread + thread pool模型中,Thread和ThreadPool是很重要的组件,因此须要深刻地掌握。