线程池的理解与简单实现(学习版)

因为服务器的硬件资源“充裕”,那么提升服务器性能的一个很直接的方法就是以空间换时间,即“浪费”服务器的硬件资源,以换取其运行效率。这就是池的概念。html

池是一组资源的集合,这组资源在服务器启动之初就被建立并初始化,这称为静态资源分配。程序员

当服务器进入正式运行阶段,即开始处理客户请求的时候,若是它须要相关的资源,就能够直接从池中获取,无需动态分配。很显然,直接从池中取得所需资源比动态分配资源的速度要快得多,由于分配系统资源的系统调用都是很耗时的。算法

当服务器处理完一个客户链接后,能够把相关的资源放回池中,无需执行系统调用来释放资源。从最终效果来看,池至关于服务器管理系统资源的应用设施,它避免了服务器对内核的频繁访问。提升了效率。编程

池能够分为不少种,常见的有进程池,线城池,内存池。服务器


内存池

内存池是一种内存分配方式。一般咱们直接使用new、malloc等系统调用申请分配内存,这样作的缺点在于:因为所申请内存块的大小不定,当频繁使用时会形成大量的内存碎片并进而下降性能。函数

内存池则是在真正使用内存以前,先申请分配必定数量的、大小相等的内存块留做备用。当有新的内存需求时,就从内存池中分出一部份内存块,若内存块不够再继续申请新的内存。这样作的一个显著优势是,使得内存分配效率获得提高。性能


进程池&&线程池

在面向对象程序编程中,对象的建立与析构都是一个较为复杂的过程,较费时间,因此为了提升程序的运行效率尽量减小建立和销毁对象的次数,特别是一些很耗资源的对象建立和销毁。 
因此咱们能够建立一个进程池(线程池),预先放一些进程(线程)进去,要用的时候就直接调用,用完以后再把进程归还给进程池,省下建立删除进程的时间,不过固然就须要额外的开销了。 
利用线程池与进程池可使管理进程与线程的工做交给系统管理,不须要程序员对里面的线程、进程进行管理。学习

以进程池为例测试

进程池是由服务器预先建立的一组子进程,这些子进程的数目在 3~10 个之间(固然这只是典型状况)。线程池中的线程数量应该和CPU数量差很少。网站

进程池中的全部子进程都运行着相同的代码,并具备相同的属性,好比优先级、 PGID 等。

当有新的任务来到时,主进程将经过某种方式选择进程池中的某一个子进程来为之服务。相比于动态建立子进程,选择一个已经存在的子进程的代价显得小得多。至于主进程选择哪一个子进程来为新任务服务,则有两种方法:

  • 主进程使用某种算法来主动选择子进程。最简单、最经常使用的算法是随机算法和Round Robin(轮流算法)。
  • 主进程和全部子进程经过一个共享的工做队列来同步,子进程都睡眠在该工做队列上。当有新的任务到来时,主进程将任务添加到工做队列中。这将唤醒正在等待任务的子进程,不过只有一个子进程将得到新任务的“接管权”,它能够从工做队列中取出任务并执行之,而其余子进程将继续睡眠在工做队列上。

当选择好子进程后,主进程还须要使用某种通知机制来告诉目标子进程有新任务须要处理,并传递必要的数据。最简单的方式是,在父进程和子进程之间预先创建好一条管道,而后经过管道来实现全部的进程间通讯。在父线程和子线程之间传递数据就要简单得多,由于咱们能够把这些数据定义为全局,那么它们自己就是被全部线程共享的。


线程池的应用

线程池主要用于 
一、须要大量的线程来完成任务,且完成任务的时间比较短。 
WEB服务器完成网页请求这样的任务,使用线程池技术是很是合适的。 
由于单个任务小,而任务数量巨大,一个热门网站的点击次数会不少。 
但对于长时间的任务,好比一个Telnet链接请求,线程池的优势就不明显了。由于Telnet会话时间比线程的建立时间大多了。

二、对性能要求苛刻的应用,好比要求服务器迅速响应客户请求。

三、接受突发性的大量请求,但不至于使服务器所以产生大量线程的应用。


线程池&&进程池的好处

进程池进程池减小了建立,归还的时间。提升了效率。


用C++模拟线程池

Linux系统下用C语言建立的一个线程池。线程池会维护一个任务链表(每一个CThread_worker结构就是一个任务)。 
pool_init()函数预先建立好max_thread_num个线程,每一个线程执thread_routine ()函数。该函数中

1 while (pool->cur_queue_size == 0) 2 { 3       pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock)); 4 }

表示若是任务链表中没有任务,则该线程出于阻塞等待状态。不然从队列中取出任务并执行。 
pool_add_worker()函数向线程池的任务链表中加入一个任务,加入后经过调用pthread_cond_signal (&(pool->queue_ready))唤醒一个出于阻塞状态的线程(若是有的话)。 
pool_destroy ()函数用于销毁线程池,线程池任务链表中的任务不会再被执行,可是正在运行的线程会一直把任务运行完后再退出。

具体代码:

#include <stdio.h>  
#include <stdlib.h>  
#include <unistd.h>  
#include <sys/types.h>  
#include <pthread.h>  
#include <assert.h>  

/* 
*线程池里全部运行和等待的任务都是一个CThread_worker 
*因为全部任务都在链表里,因此是一个链表结构 
*/  
typedef struct worker  
{  
    /*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/  
    void *(*process) (void *arg);  
    void *arg;/*回调函数的参数*/  
    struct worker *next;    
} CThread_worker;  

/*线程池结构*/  
typedef struct  
{  
    pthread_mutex_t queue_lock;  
    pthread_cond_t queue_ready;  

    /*链表结构,线程池中全部等待任务*/  
    CThread_worker *queue_head;  

    /*是否销毁线程池*/  
    int shutdown;  
    pthread_t *threadid;  
    /*线程池中容许的活动线程数目*/  
    int max_thread_num;  
    /*当前等待队列的任务数目*/  
    int cur_queue_size;  

} CThread_pool;  


int pool_add_worker (void *(*process) (void *arg), void *arg);  
void *thread_routine (void *arg);  


static CThread_pool *pool = NULL;  
void pool_init (int max_thread_num)  
{  
    pool = (CThread_pool *) malloc (sizeof (CThread_pool));  

    pthread_mutex_init (&(pool->queue_lock), NULL);  
    pthread_cond_init (&(pool->queue_ready), NULL);  

    pool->queue_head = NULL;  

    pool->max_thread_num = max_thread_num;  
    pool->cur_queue_size = 0;  

    pool->shutdown = 0;  

    pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));  
    int i = 0;  
    for (i = 0; i < max_thread_num; i++)  
    {   
        pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL);  
    }  
}  



/*向线程池中加入任务*/  
int pool_add_worker (void *(*process) (void *arg), void *arg)  
{  
    /*构造一个新任务*/  
    CThread_worker *newworker = (CThread_worker *) malloc (sizeof (CThread_worker));  
    newworker->process = process;  
    newworker->arg = arg;  
    newworker->next = NULL;/*别忘置空*/  

    pthread_mutex_lock (&(pool->queue_lock));  
    /*将任务加入到等待队列中*/  
    CThread_worker *member = pool->queue_head;  
    if (member != NULL)  
    {  
        while (member->next != NULL)  
            member = member->next;  
        member->next = newworker;  
    }  
    else  
    {  
        pool->queue_head = newworker;  
    }  

    assert (pool->queue_head != NULL);  

    pool->cur_queue_size++;  
    pthread_mutex_unlock (&(pool->queue_lock));  
    /*好了,等待队列中有任务了,唤醒一个等待线程; 
    注意若是全部线程都在忙碌,这句没有任何做用*/  
    pthread_cond_signal (&(pool->queue_ready));  
    return 0;  
}  



/*销毁线程池,等待队列中的任务不会再被执行,可是正在运行的线程会一直 
把任务运行完后再退出*/  
int pool_destroy ()  
{  
    if (pool->shutdown)  
        return -1;/*防止两次调用*/  
    pool->shutdown = 1;  

    /*唤醒全部等待线程,线程池要销毁了*/  
    pthread_cond_broadcast (&(pool->queue_ready));  

    /*阻塞等待线程退出,不然就成僵尸了*/  
    int i;  
    for (i = 0; i < pool->max_thread_num; i++)  
        pthread_join (pool->threadid[i], NULL);  
    free (pool->threadid);  

    /*销毁等待队列*/  
    CThread_worker *head = NULL;  
    while (pool->queue_head != NULL)  
    {  
        head = pool->queue_head;  
        pool->queue_head = pool->queue_head->next;  
        free (head);  
    }  
    /*条件变量和互斥量也别忘了销毁*/  
    pthread_mutex_destroy(&(pool->queue_lock));  
    pthread_cond_destroy(&(pool->queue_ready));  

    free (pool);  
    /*销毁后指针置空是个好习惯*/  
    pool=NULL;  
    return 0;  
}  



void * thread_routine (void *arg)  
{  
    printf ("starting thread 0x%x\n", pthread_self ());  
    while (1)  
    {  
        pthread_mutex_lock (&(pool->queue_lock));  
        /*若是等待队列为0而且不销毁线程池,则处于阻塞状态; 注意 
        pthread_cond_wait是一个原子操做,等待前会解锁,唤醒后会加锁*/  
        while (pool->cur_queue_size == 0 && !pool->shutdown)  
        {  
            printf ("thread 0x%x is waiting\n", pthread_self ());  
            pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));  
        }  

        /*线程池要销毁了*/  
        if (pool->shutdown)  
        {  
            /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/  
            pthread_mutex_unlock (&(pool->queue_lock));  
            printf ("thread 0x%x will exit\n", pthread_self ());  
            pthread_exit (NULL);  
        }  

        printf ("thread 0x%x is starting to work\n", pthread_self ());  

        /*assert是调试的好帮手*/  
        assert (pool->cur_queue_size != 0);  
        assert (pool->queue_head != NULL);  

        /*等待队列长度减去1,并取出链表中的头元素*/  
        pool->cur_queue_size--;  
        CThread_worker *worker = pool->queue_head;  
        pool->queue_head = worker->next;  
        pthread_mutex_unlock (&(pool->queue_lock));  

        /*调用回调函数,执行任务*/  
        (*(worker->process)) (worker->arg);  
        free (worker);  
        worker = NULL;  
    }  
    /*这一句应该是不可达的*/  
    pthread_exit (NULL);  
}  

//    下面是测试代码  

void * myprocess (void *arg)  
{  
    printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);  
    sleep (1);/*休息一秒,延长任务的执行时间*/  
    return NULL;  
}  

int main (int argc, char **argv)  
{  
    pool_init (3);/*线程池中最多三个活动线程*/  

    /*连续向池中投入10个任务*/  
    int *workingnum = (int *) malloc (sizeof (int) * 10);  
    int i;  
    for (i = 0; i < 10; i++)  
    {  
        workingnum[i] = i;  
        pool_add_worker (myprocess, &workingnum[i]);  
    }  
    /*等待全部任务完成*/  
    sleep (5);  
    /*销毁线程池*/  
    pool_destroy ();  

    free (workingnum);  
    return 0;  
}

  

学习版:https://www.cnblogs.com/cthon/p/9085026.html

通用版代码:https://www.cnblogs.com/cthon/p/9097007.html  

难度升级版代码:https://www.cnblogs.com/cthon/p/9085623.html

相关文章
相关标签/搜索