简单Linux C线程池的实现

大多处网络服务器,包括Web服务器都有一个特色,就是单位时间内要处理大量的请求,且处理的时间每每比较短。本文会分析一下多进程网络服务器模型、多线程网络服务器模型(包括线程池)之间的优缺点,并给出一个简单的线程池模型。linux

多进程网络服务器模型

多进程网络服务器模型,其基本框架每每是,父进程用socket建立一个监听套接字,而后bindIP以及port,接着开始listen该套接字,经过一个while循环来accept链接,对于每个链接,fork一个子进程来处理链接,并继续accept。简化后的代码以下:web

int listenfd, clientfd;
struct sockadd_in servaddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
// ...初始化servaddr,填入addr,port等...
bind(listenfd, (struct sockadd \*)&servaddr, sizeof(servaddr));
listen(listenfd, LISTENQ);
pid_t pid;
while (1)
{
    clientfd = accept(listenfd, NULL, NULL);
    pid = fork();
    if (pid == -1)
    {
        // ...出错处理...
        return 0;
    }
    if (pid == 0) // in child
    {
        close(listenfd);
        // handle(clientfd);
        return 0;
    }
    else if (pid > 0) // in parent
    {
        close(clientfd);
    }
}
复制代码

这种模型的缺点也十分明显,咱们都知道fork一个子进程的代价是很高的,表如今如下几点:bash

1.每次进来一个链接,操做系统为其建立一个进程,开销太大。《APUE》书8.3节讲到子进程是父进程的副本,父进程和子进程共享正文段,子进程得到父进数据空间、堆和栈的副本。即使如今如今不少实现经过写时复制(Copy-On-Write,COW)技术来替代彻底拷贝,可是其中有一个复制父进程页表的操做,这也是为何在Linux下建立进程比建立线程开销大的缘由,而全部线程都共享一个页表。服务器

2.进程调度压力大。当并发量上来以后,系统会有N多个进程,这时候操做系统将花费至关多的时间来调度进程以及执行进程的上下文切换。网络

3.每一个进程都有本身独立的地址空间,须要消耗必定的内存,太多的进程会形成内存的大量消耗。同时,高并发下父子进程之间的IPC也是一个问题。多线程

多线程网络服务器模型

多线程网络服务器模型大体同上,不一样点在于把每次accept一个新链接是建立一个线程而不是进程来处理。然而咱们知道web服务器的一个特色就是短而高频率的请求,表如今服务器端就是不停地建立线程,销毁线程。因此该方法虽然在必定程度上解决了fork的开销问题,可是一样没有办法避免线程调度开销问题以及内存问题。架构

一个改进的方法是改用线程池,让线程的数量固定下来,这样就解决了上述问题。其基本架构为用一个loop来accept链接,以后把这个链接分配给线程池中的一个线程来处理,处理完了以后这个线程回归线程池等待下一次处理链接。从目前来看,该方法已经很好地解决了上面提到的各类问题,文末也会给出该方法的不足以及改进。下面给出一个较为简单的线程池模型。这份threadpool的实现并不是我原创的,看到代码条理清楚,清晰易懂,就转载过来了。并发

threadpool.h框架

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include <pthread.h>
typedef void *(*callback_func)(void*);
typedef struct job
{
    callback_func p_callback_func;          // 线程回调函数
    void *arg;
    struct job *next;
} job_t;
typedef struct threadpool
{
    int thread_num;                         // 线程池中开启线程的个数
    int queue_max_num;                      // 队列中最大job的个数
    job_t *head;                            // 指向job的头指针
    job_t *tail;                            // 指向job的尾指针
    pthread_t *pthreads;                    // 线程池中全部线程的pthread_t
    pthread_mutex_t mutex;                  // 互斥信号量
    pthread_cond_t queue_empty;             // 队列为空的条件变量
    pthread_cond_t queue_not_empty;         // 队列不为空的条件变量
    pthread_cond_t queue_not_full;          // 队列不为滿的条件变量
    int queue_cur_num;                      // 队列当前的job个数
    int queue_close;                        // 队列是否已经关闭
    int pool_close;                         // 线程池是否已经关闭
} threadpool_t;
/*
 * pthreadpool_init - 初始化线程池
 * @thread_num - 线程池开启的线程个数
 * @queue_max_num - 队列的最大job个数
 * 返回 - 成功返回线程池地址 失败返回NULL
 * */
threadpool_t *threadpool_init(int thread_num, int queue_max_num);
/*
 * threadpool_add_job - 想线程池中添加任务
 * @pool - 线程池地址
 * @callback_function - 回调函数
 * @arg - 回调函数参数
 * 返回 - 成功返回0 失败返回-1
 * */
int threadpool_add_job(threadpool_t *pool, callback_func p_callback_fun, void *arg);
/*
 * threadpool_destory - 销毁线程池
 * @pool - 线程池地址
 * 返回 - 永远返回0
 * */
int threadpool_destory(threadpool_t *pool);
/*
 * threadpool_function - 线程池中线程函数
 * @arg - 线程池地址
 * */
void *threadpool_function(void *arg);
#endif /* _THREAD_POOL_H_ */

复制代码

threadpool.csocket

#include "threadpool.h"
#include "common.h"
threadpool_t *threadpool_init(int thread_num, int queue_max_num)
{
    threadpool_t *pool = NULL;
    pool = malloc(sizeof(threadpool_t));
    do
    {
        if (NULL == pool)
        {
            bug("failed to malloc threadpool\n");
            break;
        }
        pool->thread_num = thread_num;
        pool->queue_max_num = queue_max_num;
        pool->queue_cur_num = 0;
        pool->head = NULL;
        pool->tail = NULL;
        if (pthread_mutex_init(&(pool->mutex), NULL))
        {
            bug("pthread_mutex_init\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_empty), NULL))
        {
            bug("pthread_cond_init\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_not_empty), NULL))
        {
            bug("pthread_cond_init\n");
            break;
        }
        if (pthread_cond_init(&(pool->queue_not_full), NULL))
        {
            bug("pthread_cond_init\n");
            break;
        }
        pool->pthreads = malloc(sizeof(pthread_t) * thread_num); 
        if (NULL == pool->pthreads)
        {
            bug("malloc error\n");
            break;
        }
        
        pool->queue_close = 0;
        pool->pool_close = 0;
        int i;
        for (i = 0; i < pool->thread_num; ++i)
        {
            if (pthread_create(&(pool->pthreads[i]), NULL, threadpool_function, (void *)pool) < 0)
                bug("pthread_create\n");
        }
        return pool;
    } while (0);
    return NULL;
}
int threadpool_add_job(threadpool_t *pool, callback_func p_callback_func, void *arg)
{
    if (pool == NULL || p_callback_func == NULL)
        return -1;
    pthread_mutex_lock(&(pool->mutex));
    while ((pool->queue_cur_num == pool->queue_max_num) && !(pool->pool_close || pool->queue_close))
    {
        // 等待threadpool_function发送queue_not_full信号
        pthread_cond_wait(&(pool->queue_not_full), &(pool->mutex)); // 队列满的时候就等待
    }
    if (pool->queue_close || pool->pool_close) // 队列关闭或者线程池关闭就退出
    {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
    }
    job_t *pjob = (job_t *)malloc(sizeof(job_t));
    if (NULL == pjob)
    {
        pthread_mutex_unlock(&(pool->mutex));
        return -1;
    }
    pjob->p_callback_func = p_callback_func;
    pjob->arg = arg;
    pjob->next = NULL;
    if (pool->head == NULL)
    {
        pool->head = pool->tail = pjob;
        pthread_cond_broadcast(&(pool->queue_not_empty)); // 队列空的时候,有任务来了,就通知线程池中的线程:队列非空
    }
    else
    {
        pool->tail->next = pjob;
        pool->tail = pjob; // 把任务插入到队列的尾部
    }
    pool->queue_cur_num++;
    pthread_mutex_unlock(&(pool->mutex));
    return 0;
}
void *threadpool_function(void *arg)
{
    threadpool_t *pool = (threadpool_t *)arg;
    job_t *pjob = NULL;
    while (1)
    {
        pthread_mutex_lock(&(pool->mutex));
        while ((pool->queue_cur_num == 0) && !pool->pool_close) // 队列为空,就等待队列非空
        {
            // 等待threadpool_add_job函数发送queue_not_empty信号
            pthread_cond_wait(&(pool->queue_not_empty), &(pool->mutex));
        }
        if (pool->pool_close) // 线程池关闭,线程就退出
        {
            pthread_mutex_unlock(&(pool->mutex));
            pthread_exit(NULL);
        }
        pool->queue_cur_num--;
        pjob = pool->head;
        if (pool->queue_cur_num == 0)
        {
            pool->head = pool->tail = NULL;
        }
        else
        {
            pool->head = pjob->next;
        }
        if (pool->queue_cur_num == 0)
        {
            pthread_cond_signal(&(pool->queue_empty)); // 通知destory函数能够销毁线程池了
        }
        else if (pool->queue_cur_num <= pool->queue_max_num - 1)
        {
            // 向threadpool_add_job发送queue_not_full信号
            pthread_cond_broadcast(&(pool->queue_not_full));
        }
        pthread_mutex_unlock(&(pool->mutex));
        (*(pjob->p_callback_func))(pjob->arg); // 线程真正要作的工做,调用回调函数
        free(pjob);
        pjob = NULL;
    }
}
int threadpool_destory(threadpool_t *pool)
{
    if (pool == NULL)
        return 0;
    pthread_mutex_lock(&(pool->mutex));
    if (pool->queue_close && pool->pool_close) // 线程池已经退出了,就直接返回
    {
        pthread_mutex_unlock(&(pool->mutex));
        return 0;
    }
    pool->queue_close = 1; // 关闭任务队列,不接受新的任务了
    while (pool->queue_cur_num != 0)
    {
        pthread_cond_wait(&(pool->queue_empty), &(pool->mutex)); // 等待队列为空
    }
    pool->pool_close = 1; // 线程池关闭
    pthread_mutex_unlock(&(pool->mutex));
    pthread_cond_broadcast(&(pool->queue_not_empty)); // 唤醒线程池中正在阻塞的线程
    pthread_cond_broadcast(&(pool->queue_not_full)); // 唤醒添加任务的threadpool_add_job函数
    int i;
    for (i = 0; i < pool->thread_num; ++i)
    {
        pthread_join(pool->pthreads[i], NULL); // 等待线程池的全部线程执行完毕
    }
    pthread_mutex_destroy(&(pool->mutex)); // 清理资源
    pthread_cond_destroy(&(pool->queue_empty)); 
    pthread_cond_destroy(&(pool->queue_not_empty)); 
    pthread_cond_destroy(&(pool->queue_not_full)); 
    free(pool->pthreads);
    job_t *pjob;
    while (pool->head != NULL)
    {
        pjob = pool->head;
        pool->head = pjob->next;
        free(pjob);
    }
    free(pool);
    return 0;
}
复制代码

剩余问题

线程池的方案虽然看起来很不错,但在实际状况中,不少链接都是长链接(在一个TCP链接上进行屡次通讯),一个线程在受到任务之后,处理完第一批来的数据,此时会再次调用read,可是客户端下一次发送数据过来的时机是不肯定的,因而这个线程就被这个read给阻塞住了(socket fd默认是blocking的),直到1.这个fd可读者;2.对方已经关闭链接;3.TCP超时这3个状况之一发生以前什么都不能干,那么并发量上来以后仍是会发生部分链接没法被即便处理的状况。

一个比较好的解决方案是把fd 设置为non-blocking,并经过事件驱动(Event-driven)的方法来处理链接,在linux下能够经过epoll实现。

小结

请你们多多关注小伙,您的关注与点赞是小伙更新的动力。

关于更多epoll问题能够加小伙QQ,更多福利资源等你领取哟,QQ 758810390

相关文章
相关标签/搜索