线程池实现

线程池实现

1 基本设计思路

咱们首先设计TPThread类,用于管理单个线程的属性和方法;有了TPThread表示的线程以后,咱们定义ThreadPool类,用以管理一组TPThread对象,此处所说的管理包括:针对全部TPThread线程的建立、销毁以及调度。算法

咱们怎么将须要在线程中调用的业务逻辑代码接入线程池呢?选择之一是利用TPThread提供多态函数,将业务逻辑代码嵌入TPThread的子对象。可是这样作,在每次将并发任务代理给线程池以后,TPThread将会绑定到固定的业务逻辑上。更灵活的设计是分离出一个专门用于代理并发任务的TPTask类。缓存

那么,如今的线程池结构是:一个全局的ThreadPool对象,在它初始化的时候会建立一组初始数量的TPThread对象,用户的并发业务逻辑在TPTask的子对象中实现,这样的子对象一样也都交给全局的ThreadPool对象管理,ThreadPool中当前空闲的TPThread线程将被分配去处理TPTask任务。服务器

2 使用线程池的优点

设计线程池的初衷是规避频繁建立和销毁线程的开销。多线程

但从上面的设计来看,除了性能方面的优点以外,利用线程池,用户一般只需一个相似 addTask 的方法便可快捷地添加并发任务;并发线程与主线程的交互也将更加简单,这将在接下来的源码中体现出来。并发

综上所述,线程池对比操做系统提供的原始线程控制原语,它不只下降了线程频繁建立、销毁的性能开销,也为用户提供了更简易明了的操做接口。框架

3 TPTask

咱们将为TPTask类提供两个函数:一个纯虚函数,用于提供给继承该对象的用户子对象重写该方法以嵌入并发的业务逻辑;一个有默认实现的在主线程中被调用的虚函数。less

class TPTask
{
public:
    enum TPTaskState
    {
        TPTask_Completed = 0, // 一个任务已经完成
        TPTask_ContinueMainThread = 1, // 继续在主线程执行
        TPTask_ContinueChildThread = 2, // 继续在子线程执行
    };

    virtual bool process() = 0;
    virtual TPTask::TPTaskState presentMainThread() { return TPTask::TPTask_Completed; }
};

presentMainThread 函数是并发线程与主线程交互的接口,它将在 TPThread 对象的 process 函数执行以后的某一时刻在主线程中被调用。函数

4 TPThread

class ThreadPool;
class TPThread
{
public:
    friend class ThreadPool;

    enum EThreadState
    {
        ThreadState_Stop = -1,
        ThreadState_Sleep = 0,
        ThreadState_Busy = 1,
        ThreadState_End = 2
    };

    TPThread(ThreadPool* threadPool, int threadWaitSecond = 0)
        :mThreadWaitSecond(threadWaitSecond)
        ,mpCurrTask(NULL)
        ,mpThreadPool(threadPool)
    {
        mState = ThreadState_Sleep;
        initCond();
        initMutex();
    }

    virtual ~TPThread()
    {
        deleteCond();
        deleteMutex();
    }

    THREAD_ID createThread(void);
    bool join(void);

    void onTaskCompleted(void);

    // 线程通知 等待条件信号
    bool onWaitCondSignal(void);

    virtual TPTask* tryGetTask(void);

#if PLATFORM == PLATFORM_WIN32
    static unsigned __stdcall threadFunc(void *arg);
#else
    static void* threadFunc(void* arg);
#endif

    int sendCondSignal(void) { return THREAD_SINGNAL_SET(mCond); }

    virtual void onStart() {}
    virtual void onEnd() {}

    virtual void onProcessTaskStart(TPTask* pTask) {}
    virtual void processTask(TPTask* pTask) { pTask->process(); }
    virtual void onProcessTaskEnd(TPTask* pTask) {}

    THREAD_ID id(void) const { return mTid; }
    void id(THREAD_ID tidp) { mTid = tidp; }

    virtual void initCond(void) { THREAD_SINGNAL_INIT(mCond); }
    virtual void initMutex(void) { THREAD_MUTEX_INIT(mMutex); }

    virtual void deleteCond(void) { THREAD_SINGNAL_DELETE(mCond); }
    virtual void deleteMutex(void) { THREAD_MUTEX_DELETE(mMutex); }

    virtual void lock(void) { THREAD_MUTEX_LOCK(mMutex); }
    virtual void unlock(void) { THREAD_MUTEX_UNLOCK(mMutex); }

    TPTask* task(void) const { return mpCurrTask; }
    void task(TPTask* tpt) { mpCurrTask = tpt; }

    int state(void) const { return mState; }

    ThreadPool* threadPool() { return mpThreadPool; }

    virtual std::string printWorkState()
    {
        char buf[128];
        lock();
        sprintf(buf, "%p,%u", mpCurrTask, mDoneTasks);
        unlock();
        return buf;
    }

    void resetDoneTasks() { mDoneTasks = 0; }
    void incDoneTasks() { ++mDoneTasks; }
protected:
    THREAD_ID mTid; // 本线程的ID
    THREAD_SINGNAL mCond;
    THREAD_MUTEX mMutex;

    int mThreadWaitSecond; // 线程空闲状态超过这个秒数则线程退出, 小于0为永久线程(秒单位)
    TPTask *mpCurrTask; // 该线程的当前执行的任务
    ThreadPool *mpThreadPool; // 线程池指针
    EThreadState mState; // 线程状态
    uint32 mDoneTasks; // 线程启动一次在未改变到闲置状态下连续执行的任务计数
};

该对象封装了Win32和Unix平台下的线程实现。mpCurrTask指向该线程当前的任务,这不须要用户操心,它实际是由ThreadPool管理的。post

5 ThreadPool

class ThreadPool
{
public:
    ThreadPool();
    virtual ~ThreadPool();

    void finalise();
    void destroy();

    /** 建立线程池
    @param inewThreadCount: 当系统繁忙时线程池会新增长这么多线程(临时)
    @param inormalMaxThreadCount: 线程池会一直保持这么多个数的线程
    @param imaxThreadCount: 线程池最多只能有这么多个线程
    */
    bool createThreadPool(uint32 inewThreadCount, uint32 inormalMaxThreadCount, uint32 imaxThreadCount);

    virtual TPThread* createThread(int threadWaitSecond = ThreadPool::timeout);

    void bufferTask(TPTask* tptask);
    TPTask* popbufferTask(void);

    bool addFreeThread(TPThread* tptd);
    bool addBusyThread(TPThread* tptd);
    void addFiniTask(TPTask* tptask);

    bool removeHangThread(TPThread* tptd);

    virtual void onMainThreadTick();

    bool hasThread(TPThread* pTPThread);

    std::string printThreadWorks();

    bool addTask(TPTask* tptask);

    bool addBackgroundTask(TPTask* tptask) { return addTask(tptask); }
    bool pushTask(TPTask* tptask) { return addTask(tptask); }

    uint32 currentThreadCount(void) const { return mCurrentThreadCount; }
    uint32 currentFreeThreadCount(void) const { return mCurrentFreeThreadCount; }
    bool isThreadCountMax(void) const { return mCurrentThreadCount >= mMaxThreadCount; }

    bool isBusy(void) const { return mBufferedTaskList.size() > THREAD_BUSY_SIZE; }

    bool isInitialize(void) const { return mIsInitialize; }
    bool isDestroyed() const { return mIsDestroyed; }

    uint32 bufferTaskSize() const { return mBufferedTaskList.size(); }
    std::queue<TPTask*>& bufferedTaskList() { return mBufferedTaskList; }

    void lockBufferedTaskList() { THREAD_MUTEX_LOCK(mBufferedTaskListMutex); }
    void unlockBufferedTaskList() { THREAD_MUTEX_UNLOCK(mBufferedTaskListMutex); }

    uint32 finiTaskSize() const { return mFiniTaskListCount; }

    virtual std::string name() const{ return "ThreadPool"; }
public:
    static int timeout;
protected:
    bool mIsInitialize;
    bool mIsDestroyed;

    std::queue<TPTask *> mBufferedTaskList; // 系统处于繁忙时还未处理的任务列表
    std::list<TPTask *> mFinishedTaskList; // 已经完成的任务列表
    size_t mFiniTaskListCount;

    THREAD_MUTEX mBufferedTaskListMutex; // 处理mBufferedTaskList互斥锁
    THREAD_MUTEX mThreadStateListMutex; // 处理mBufferedTaskList and mFreeThreadList互斥锁
    THREAD_MUTEX mFinishedTaskListMutex; // 处理mFinishedTaskList互斥锁

    std::list<TPThread *> mBusyThreadList; // 繁忙的线程列表
    std::list<TPThread *> mFreeThreadList; // 闲置的线程列表
    std::list<TPThread *> mAllThreadList; // 全部的线程列表

    uint32 mMaxThreadCount; // 最大线程总数
    uint32 mExtraNewAddThreadCount; // 若是mNormalThreadCount不足够使用则会新建立这么多线程
    uint32 mCurrentThreadCount; // 当前线程数
    uint32 mCurrentFreeThreadCount; // 当前闲置的线程数
    uint32 mNormalThreadCount; // 标准状态下的线程总数 即:默认状况下一启动服务器就开启这么多线程,若是线程不足够,则会新建立一些线程, 最大可以到mMaxThreadCount
};

5.1 线程管理

从声明中能够看到有三个线程对象列表性能

  1. mAllThreadList 该容器是 mFreeThreadList 和 mBusyThreadList 容器的并集。
  2. mFreeThreadList 该容器记录了当前闲置的线程对象,当用户调用 addTask 方法添加并发任务对象时,线程池将尝试从该容器中取出线程对象来执行并发任务。
  3. mBusyThreadList 当前正在执行并发任务的线程对象将会保存到此容器。

在调用 createThreadPool 初始化线程池时,全部的线程对象都会被添加到 mFreeThreadList 容器, mBusyThreadList 初始化为空。

在调用线程池的 addTask 方法添加并发任务时,若 mFreeThreadList 非空则会从 mFreeThreadList 列表中取出一个线程对象来执行并发任务,该对象将被转移到 mBusyThreadList,表示其正在执行任务。

5.2 并发任务管理

线程池成员中跟任务相关的容器有:

  1. mBufferedTaskList 若是当前没有闲置的线程,那么,用户新增的任务将缓存至此容器。
  2. mFinishedTaskList 在并发任务对象的 process 方法执行完以后,它将被添加到该容器。 mFinishedTaskList 容器中的并发任务对象将在主线程中被进一步处理,其 presentMainThread 函数将被调用,而后根据返回值来决定相应任务对象的去向。

注意到, mBufferedTaskList 和 mFinishedTaskList 容器保存的都是当前未在执行的并发任务。实际上,正在执行的并发任务对象是直接代理给 TPThread 线程对象的,并未用任何形式的容器去缓存。

6 实现细节

 

6.1 线程回调函数

  • 注册线程回调函数
    THREAD_ID TPThread::createThread(void)
    {
    #if PLATFORM == PLATFORM_WIN32
        mTid = (THREAD_ID)_beginthreadex(NULL, 0, &TPThread::threadFunc, (void*)this, NULL, 0);
    #else
        pthread_create(&mTid, NULL, TPThread::threadFunc, (void*)this);
    #endif
        return mTid;
    }
    
  • 线程回调函数的实现
    #if PLATFORM == PLATFORM_WIN32
    unsigned __stdcall TPThread::threadFunc(void *arg)
    #else
    void* TPThread::threadFunc(void* arg)
    #endif
    {
        TPThread *tptd = static_cast<TPThread *>(arg);
        ThreadPool *pThreadPool = tptd->threadPool();
    
        bool isRun = true;
        tptd->resetDoneTasks();
    
    #if PLATFORM == PLATFORM_WIN32
    #else
        pthread_detach(pthread_self());
    #endif
    
        tptd->onStart();
    
        while(isRun)
        {
            if(tptd->task() != NULL)
            {
                isRun = true;
            }
            else
            {
                tptd->resetDoneTasks();
                isRun = tptd->onWaitCondSignal();
            }
    
            if(!isRun || pThreadPool->isDestroyed())
            {
                if(!pThreadPool->hasThread(tptd))
                    tptd = NULL;
    
                goto __THREAD_END__;
            }
    
            TPTask * task = tptd->task();
            if(task == NULL)
                continue;
    
            tptd->mState = ThreadState_Busy;
    
            while(task && !tptd->threadPool()->isDestroyed())
            {
                tptd->incDoneTasks();
                tptd->onProcessTaskStart(task);
                tptd->processTask(task); // 处理该任务
                tptd->onProcessTaskEnd(task);
    
                TPTask * task1 = tptd->tryGetTask(); // 尝试继续从任务队列里取出一个繁忙的未处理的任务
    
                if(!task1)
                {
                    tptd->onTaskCompleted();
                    break;
                }
                else
                {
                    pThreadPool->addFiniTask(task);
                    task = task1;
                    tptd->task(task1);
                }
            }
        }
    
    __THREAD_END__:
        if(tptd)
        {
            TPTask * task = tptd->task();
            if(task)
            {
                delete task;
            }
    
            tptd->onEnd();
            tptd->mState = ThreadState_End;
            tptd->resetDoneTasks();
        }
    
    #if PLATFORM == PLATFORM_WIN32
        return 0;
    #else
        pthread_exit(NULL);
        return NULL;
    #endif
    }
    

    咱们能够看到,该函数的核心便是:取出并发任务并执行它。

    在刚进入循环时,会执行以下代码:

    if(tptd->task() != NULL)
    {
        isRun = true;
    }
    else
    {
        tptd->resetDoneTasks();
        isRun = tptd->onWaitCondSignal();
    }
    

    onWaitCondSignal 的实现以下:

    bool TPThread::onWaitCondSignal(void)
    {
    #if PLATFORM == PLATFORM_WIN32
        if(mThreadWaitSecond <= 0)
        {
            mState = ThreadState_Sleep;
            WaitForSingleObject(mCond, INFINITE);
            ResetEvent(mCond);
        }
        else
        {
            mState = ThreadState_Sleep;
            DWORD ret = WaitForSingleObject(mCond, mThreadWaitSecond * 1000);
            ResetEvent(mCond);
    
            // 若是是由于超时了, 说明这个线程好久没有被用到, 咱们应该注销这个线程。
            // 通知ThreadPool注销本身
            if (ret == WAIT_TIMEOUT)
            {
                mpThreadPool->removeHangThread(this);
                return false;
            }
            else if(ret != WAIT_OBJECT_0)
            {
            }
        }
    #else
        if(mThreadWaitSecond <= 0)
        {
            lock();
            mState = ThreadState_Sleep;
            pthread_cond_wait(&mCond, &mMutex);
            unlock();
        }
        else
        {
            struct timeval now;
            struct timespec timeout;
            gettimeofday(&now, NULL);
            timeout.tv_sec = now.tv_sec + mThreadWaitSecond;
            timeout.tv_nsec = now.tv_usec * 1000;
    
            lock();
            mState = ThreadState_Sleep;
            int ret = pthread_cond_timedwait(&mCond, &mMutex, &timeout);
            unlock();
    
            // 若是是由于超时了, 说明这个线程好久没有被用到, 咱们应该注销这个线程。
            if (ret == ETIMEDOUT)
            {
                // 通知ThreadPool注销本身
                mpThreadPool->removeHangThread(this);
                return false;
            }
            else if(ret != 0)
            {
            }
        }
    #endif
        return true;
    }
    

    能够看到,在 tptd->task() 为空的时候,线程将休眠。若是该线程是在线程池初始化时所建立,那么,将进入永久休眠,直至被唤醒;若是该线程是在用户添加任务过程当中因线程池中暂无闲置线程而临时建立的,那么,将进入超时休眠。临时线程在超时唤醒后将被回收,初始线程则会直到线程池销毁后才被回收。这体如今以下的代码中:

    if(!isRun || pThreadPool->isDestroyed())
    {
        if(!pThreadPool->hasThread(tptd))
            tptd = NULL;
    
        goto __THREAD_END__;
    }
    

    在上述预处理以后,线程回调函数将正式进入任务处理流程:

    TPTask * task = tptd->task();
    if(task == NULL)
        continue;
    
    tptd->mState = ThreadState_Busy;
    
    while(task && !tptd->threadPool()->isDestroyed())
    {
        tptd->incDoneTasks();
        tptd->onProcessTaskStart(task);
        tptd->processTask(task); // 处理该任务
        tptd->onProcessTaskEnd(task);
    
        TPTask * task1 = tptd->tryGetTask(); // 尝试继续从任务队列里取出一个繁忙的未处理的任务
    
        if(!task1)
        {
            tptd->onTaskCompleted();
            break;
        }
        else
        {
            pThreadPool->addFiniTask(task);
            task = task1;
            tptd->task(task1);
        }
    }
    

    上述循环的退出时机是:当前任务已被执行,而且线程池的 mBufferedTaskList 容器为空,这表示该线程暂时完成了本身的使命,能够先休息了。

6.2 线程池管理

 

6.2.1 线程池初始化

bool ThreadPool::createThreadPool(uint32 inewThreadCount, uint32 inormalMaxThreadCount, uint32 imaxThreadCount)
{
    assert(!mIsInitialize);

    mExtraNewAddThreadCount = inewThreadCount;
    mNormalThreadCount = inormalMaxThreadCount;
    mMaxThreadCount = imaxThreadCount;

    for(uint32 i = 0; i < mNormalThreadCount; ++i)
    {
        TPThread* tptd = createThread(0);

        if(!tptd)
        {
            // ERROR_MSG("ThreadPool::createThreadPool: create is error! \n");
            return false;
        }

        mCurrentFreeThreadCount++;
        mCurrentThreadCount++;
        mFreeThreadList.push_back(tptd);
        mAllThreadList.push_back(tptd);
    }

    mIsInitialize = true;
    sleepms(100);
    return true;
}

TPThread* ThreadPool::createThread(int threadWaitSecond)
{
    TPThread* tptd = new TPThread(this, threadWaitSecond);
    tptd->createThread();
    return tptd;
}

线程池的初始化便是建立一批初始数量的线程。在线程初始化过程当中建立的线程将一直延续到线程池销毁为止。

6.2.2 并发任务添加

bool ThreadPool::addTask(TPTask* tptask)
{
    THREAD_MUTEX_LOCK(mThreadStateListMutex);
    if(mCurrentFreeThreadCount > 0)
    {
        std::list<TPThread *>::iterator itr = mFreeThreadList.begin();
        TPThread* tptd = (TPThread *)(*itr);
        mFreeThreadList.erase(itr);
        mBusyThreadList.push_back(tptd);
        --mCurrentFreeThreadCount;

        tptd->task(tptask);                                             // 给线程设置新任务

#if PLATFORM == PLATFORM_WIN32
        if(tptd->sendCondSignal()== 0){
#else
        if(tptd->sendCondSignal()!= 0){
#endif
            THREAD_MUTEX_UNLOCK(mThreadStateListMutex);
            return false;
        }

        THREAD_MUTEX_UNLOCK(mThreadStateListMutex);
        return true;
    }

    bufferTask(tptask);

    if(isThreadCountMax())
    {
        THREAD_MUTEX_UNLOCK(mThreadStateListMutex);

        return false;
    }

    for(uint32 i = 0; i < mExtraNewAddThreadCount; ++i)
    {
        TPThread* tptd = createThread(300);                                 // 设定5分钟未使用则退出的线程

        mAllThreadList.push_back(tptd);                                     // 全部的线程列表
        mFreeThreadList.push_back(tptd);                                    // 闲置的线程列表
        ++mCurrentThreadCount;
        ++mCurrentFreeThreadCount;

    }

    THREAD_MUTEX_UNLOCK(mThreadStateListMutex);
    return true;
}

若是当前有闲置线程,则直接将并发任务代理给它;若是当前没有闲置线程,那就先将并发任务缓存起来,未来的空闲线程将经过 tryGetTask() 取出缓存的并发任务。

咱们注意到,在缓存任务以后,会尝试建立一批新的线程对象,这些线程对象是临时的,当线程池“悠闲”下来的时候,它们将被回收。这个额外的优化若是利用得好,将会提升线程池的并发度。

6.2.3 与主线程的交互

这是一种常有的需求,并发任务产生的输出一般须要提交给主线程。咱们提供 onMainThreadTick 方法来达成此目的。

void ThreadPool::onMainThreadTick()
{
    std::vector<TPTask *> finitasks;

    THREAD_MUTEX_LOCK(mFinishedTaskListMutex);

    if(mFinishedTaskList.size() == 0)
    {
        THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex);
        return;
    }

    std::copy(mFinishedTaskList.begin(), mFinishedTaskList.end(), std::back_inserter(finitasks));
    mFinishedTaskList.clear();
    THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex);

    std::vector<TPTask *>::iterator finiiter  = finitasks.begin();

    for(; finiiter != finitasks.end(); )
    {
        TPTask::TPTaskState state = (*finiiter)->presentMainThread();

        switch(state)
        {
        case TPTask::TPTask_Completed:
            delete (*finiiter);
            finiiter = finitasks.erase(finiiter);
            --mFiniTaskListCount;
            break;
        case TPTask::TPTask_ContinueChildThread:
            this->addTask((*finiiter));
            finiiter = finitasks.erase(finiiter);
            --mFiniTaskListCount;
            break;
        case TPTask::TPTask_ContinueMainThread:
            THREAD_MUTEX_LOCK(mFinishedTaskListMutex);
            mFinishedTaskList.push_back((*finiiter));
            THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex);
            ++finiiter;
            break;
        default:
            Assert(false);
            break;
        };
    }
}

注意到,此函数只针对了 mFinishedTaskList 容器中的任务进行了处理,这表示,一个并发任务得在执行了至少一次 process 以后才能与主线程交互。

6.2.4 线程池销毁

void ThreadPool::destroy()
{
    mIsDestroyed = true;

    int itry = 0;
    while(true)
    {
        sleepms(300);
        itry++;

        std::string taskaddrs = "";
        THREAD_MUTEX_LOCK(mThreadStateListMutex);

        int count = mAllThreadList.size();
        std::list<TPThread *>::iterator itr = mAllThreadList.begin();
        for(; itr != mAllThreadList.end(); ++itr)
        {
            if((*itr))
            {
                if((*itr)->state() != TPThread::ThreadState_End)
                {
                    (*itr)->sendCondSignal();
                }
                else
                {
                    count--;
                }
            }
        }

        THREAD_MUTEX_UNLOCK(mThreadStateListMutex);

        if(count <= 0)
        {
            break;
        }
    }

    THREAD_MUTEX_LOCK(mThreadStateListMutex);
    sleepms(100);
    std::list<TPThread*>::iterator itr = mAllThreadList.begin();
    for(; itr != mAllThreadList.end(); ++itr)
    {
        if((*itr))
        {
            delete (*itr);
            (*itr) = NULL;
        }
    }
    mAllThreadList.clear();
    THREAD_MUTEX_UNLOCK(mThreadStateListMutex);

    THREAD_MUTEX_LOCK(mFinishedTaskListMutex);
    if(mFinishedTaskList.size() > 0)
    {
        std::list<TPTask*>::iterator finiiter  = mFinishedTaskList.begin();
        for(; finiiter != mFinishedTaskList.end(); ++finiiter)
        {
            delete (*finiiter);
        }

        mFinishedTaskList.clear();
        mFiniTaskListCount = 0;
    }
    THREAD_MUTEX_UNLOCK(mFinishedTaskListMutex);

    THREAD_MUTEX_LOCK(mBufferedTaskListMutex);
    if(mBufferedTaskList.size() > 0)
    {
        while(mBufferedTaskList.size() > 0)
        {
            TPTask* tptask = mBufferedTaskList.front();
            mBufferedTaskList.pop();
            delete tptask;
        }
    }
    THREAD_MUTEX_UNLOCK(mBufferedTaskListMutex);

    THREAD_MUTEX_DELETE(mThreadStateListMutex);
    THREAD_MUTEX_DELETE(mBufferedTaskListMutex);
    THREAD_MUTEX_DELETE(mFinishedTaskListMutex);
}

线程池销毁时将会等待全部线程正常退出,因此用户重写的 process 方法须要在线程池销毁时所有退出。

7 总结

实现高效且友好的线程池确实须要注意不少细节,但总的来讲,线程池技术并不包含多么高深的算法。任何线程池框架通常都只有以下的几个简单执行步骤:

  1. 预建立一批线程
  2. 添加并发任务
  3. 销毁全部线程

Date: 2015-11-03T18:33+0800

Author: ruleless

Org version 7.9.3f with Emacs version 24

Validate XHTML 1.0
相关文章
相关标签/搜索