相信你们一直有听过线程池, 可是不必定都知道这究竟是个什么东西,是如何实现的;git
当一个服务器接受到大量短小线程的请求时, 使用线程池技术是很是合适的, 它能够大大减小线程的建立和销毁次数, 提升服务器的工做效率. 可是线程要求的运行时间比较长, 则不适用.github
线程池通常有如下三个功能:服务器
以上是对外的三个接口方法;数据结构
本次实现的线程池对外有四个接口:并发
struct sl_thread_pool *sl_thread_pool_create(unsigned int core_td_num, unsigned int max_td_num, int alive_time); void sl_thread_pool_destory(struct sl_thread_pool *pool); void sl_thread_pool_destory_now(struct sl_thread_pool *pool); int sl_thread_pool_push_task(struct sl_thread_pool *pool, void *(*task_fun)(void *arg), void *arg);
在销毁线程的时候我作了个功能细化,分为两种: 一种是当即销毁线程池, 一种是执行完任务队列中的任务再销毁线程池,两种方式都是为阻塞方式;函数
struct sl_thread_pool *sl_thread_pool_create(unsigned int core_td_num, unsigned int max_td_num, int alive_time);
core_td_num: 初始化线程数
max_td_num: 最大线程数目(线程数量是动态分配)
alive_time: 线程空闲时存活的时间,单位:毫秒
return: 返回线程池句柄
该接口主要是用于建立一个线程池, 笔者写的该线程池能够动态的伸缩因此加入了最大线程数限制和存活时间.oop
void sl_thread_pool_destory(struct sl_thread_pool *pool);
调用该接口时,线程池不会立马被注销而是处理完任务队列中的全部任务才注销;ui
void sl_thread_pool_destory_now(struct sl_thread_pool *pool);
调用该接口时,当即注销线程池;spa
int sl_thread_pool_push_task(struct sl_thread_pool *pool, void *(*task_fun)(void *arg), void *arg);
向线程池中添加一个新任务, 形参task_fun
为任务的函数指针, arg
为函数指针的参数;线程
笔者写的该线程池有两个重要的链表:一个是线程链表,一个是任务链表,还有一个重要的线程:manager线程,用于管理线程的销毁和建立;
struct sl_thread_pool *sl_thread_pool_create(unsigned int core_td_num, unsigned int max_td_num, int alive_time) { struct sl_thread_pool *pstp = NULL; struct sl_thread *thread = NULL; int create_ret = -1; pstp = (struct sl_thread_pool*)malloc(sizeof(struct sl_thread_pool)); ① if (pstp == NULL) { ERR("%s: malloc error for creat pool", __FUNCTION__); goto malloc_pool_err; } create_ret = sl_create_thread_key(destructor_fun); ② if (create_ret != 0) { ERR("%s: create thread key error", __FUNCTION__); goto create_key_err; } /* 建立manager*/ create_manager_looper(pstp); thread = sl_thread_create(sl_thread_manager_do, pstp); ③ if (thread == NULL) { ERR("%s: malloc error for create pool", __FUNCTION__); goto create_manager_err; } else { pstp->thread_manager = thread; pthread_setname_np(thread->thread_id, "manager_thread"); } /* 初始化线程池链表 */ list_head_init(&pstp->thread_head); list_head_init(&pstp->task_queue.task_head); /* 初始化线程池计数 */ pstp->core_threads_num = core_td_num; pstp->max_threads_num = max_td_num; pstp->keep_alive_time = alive_time; pstp->alive_threads_num = 0; pstp->destory = 0; pthread_mutex_init(&pstp->thread_mutex, NULL); // pthread_cond_init(&pstp->thread_run_signal, NULL); /* 初始化工做锁 */ pthread_mutex_init(&pstp->task_queue.task_mutex, NULL); /* 初始化工做队列同步条件 */ pthread_cond_init(&pstp->task_queue.task_ready_signal, NULL); /* 建立核心线程 */ for (int i = 0; i < pstp->core_threads_num; i++) { ④ thread = sl_thread_create(sl_thread_do, pstp); if (thread != NULL) { list_add(&pstp->thread_head, &thread->thread_list); pthread_setname_np(thread->thread_id, "core_thread"); } else { i--; } } /* 等待核心线程建立完成 */ while (pstp->alive_threads_num != pstp->core_threads_num); return pstp; create_manager_err: pthread_key_delete(g_key); create_key_err: free(pstp); malloc_pool_err: return NULL; }
①: 为线程池分配空间.
②: 建立线程私有数据.
③: 建立manager线程.
④: 建立核心线程,这必定数量的线程是不会被释放的.
线程池的数据结构以下:
struct sl_thread_pool { struct list_head thread_head; /* 线程链表 */ struct sl_task_queue task_queue; /* 任务链表 */ unsigned int core_threads_num; /* 初始化须要建立的线程数 */ unsigned int max_threads_num; /* 建立线程最大上限数 */ unsigned int alive_threads_num; /* 当前建立线程数量 */ pthread_mutex_t thread_mutex; pthread_cond_t thread_run_signal; /* 线程run信号 */ int keep_alive_time; /* 空闲线程保持存活时间 unit: ms */ struct sl_thread *thread_manager; /* 领导 */ unsigned int destory; };
static void *sl_thread_manager_do(void *arg) { struct sl_thread_pool *pstp = (struct sl_thread_pool *)arg; int next_poll_time = -1; int keep_alive_time = -1; if (pstp == NULL) { ERR("%s: pool is NULL", __FUNCTION__); return NULL; } do { usleep(100); } while(pstp->thread_manager == NULL); while (pstp->thread_manager->thread_status != THREAD_QUIT) { keep_alive_time = poll_event(pstp, next_poll_time); next_poll_time = get_next_poll_time(keep_alive_time); } INFO("sl_thread_manager_do quit"); return NULL; }
manager线程主要是epoll
来轮询事件,而后作出相应的处理;主要的事件有三个:
static int poll_event(struct sl_thread_pool *pool, int time_out) { ... struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int event_count = epoll_wait(g_epoll_fd, eventItems, EPOLL_MAX_EVENTS, time_out); ... // Check for poll timeout. if (event_count == 0) { ① list_for_each(plh, &pstp->thread_head) { pst = sl_list_entry(plh, struct sl_thread, thread_list); DEBUG("%s: pstp->alive_threads_num = %d, %ld thread status %s", __FUNCTION__, pstp->alive_threads_num, pst->thread_id, get_status(pst->thread_status)); if (pstp->alive_threads_num > pstp->core_threads_num) { if (pst->thread_status == THREAD_SUPPEND) { pst->thread_status = THREAD_QUIT; sl_notify_all(&pstp->task_queue); delete_when_each(plh); pthread_join(pst->thread_id, NULL); free(pst); keep_time = 50; // 50ms再检测一次 break; } } else { keep_time = -1; break; } } return keep_time; } // despatch for poll event for (int i = 0; i < event_count; i++) { fd = eventItems[i].data.fd; epoll_events = eventItems[i].events; if ((fd == g_wake_read_pip_fd) && (epoll_events & EPOLLIN)) { /* thread和task同时来临只处理thread */ ret_event = sl_get_event(); switch(ret_event) { case EVENT_THREAD: ② DEBUG("EVENT_THREAD"); if (pstp->alive_threads_num > pstp->core_threads_num) { keep_time = pstp->keep_alive_time; } else { keep_time = -1; } break; case EVENT_TASK: ③ DEBUG("EVENT_TASK"); /* 判断当前线程的消息和当前运行线程比例 */ pstq = &pstp->task_queue; if(pstq->num_tasks_alive >= (pstp->alive_threads_num * 2) && (pstp->alive_threads_num <= pstp->max_threads_num)) { /* 建立线程 */ pst = sl_thread_create(sl_thread_do, pstp); if (pst != NULL) { list_add(&pstp->thread_head, &pst->thread_list); pthread_setname_np(pst->thread_id, "other_thread"); } } break; case EVENT_SHUTDOWN: ④ DEBUG("EVENT_SHUTDOWN"); /* 执行完任务对列中的任务才shutdown */ pstp->core_threads_num = 0; pool->destory = 1; break; default: break; } } } return keep_time; }
①: wait超时的处理,通常进入超时状态都是准备注销线程, 线程空闲时则注销.
②: 线程状态变化处理,判断当前线程是否多余核心线程,若是是则设置存活时间为下一轮的wait超时时间.
③: 发送任务事件后,主要是判断当前任务数量,线程池是否处理的过来,不然建立新线程.
④: 注销事件,核心线程数设置为0,等待任务链表中的任务处理完再注销;
事件的轮询主要是借助epoll
监控管道的变化实现,想了解的能够详细看下代码;
static void *sl_thread_do(void *arg) { struct sl_thread_pool *pstp = (struct sl_thread_pool *)arg; struct sl_thread_task *pstt = NULL; struct sl_task_queue *pstq = NULL; if (pstp == NULL) { ERR("%s: pool is NULL", __FUNCTION__); return NULL; } pstq = &pstp->task_queue; pthread_mutex_lock(&pstp->thread_mutex); pstp->alive_threads_num++; pthread_mutex_unlock(&pstp->thread_mutex); sl_save_thread_self(pstp); while (sl_get_thread_self()->thread_status != THREAD_QUIT) { pstt = sl_task_pull(pstq); ① if (pstt != NULL) { sl_update_thread_status(THREAD_WORKING); pstt->task_fun(&pstt->arg); ② free(pstt); } } pthread_mutex_lock(&pstp->thread_mutex); pstp->alive_threads_num--; pthread_mutex_unlock(&pstp->thread_mutex); sl_update_thread_status(THREAD_IDLE); sl_clear_thread_self(); ③ INFO("thread_run_task %ld quit, currten threads count %d, currten tasks count %d\n", pthread_self(), pstp->alive_threads_num, pstq->num_tasks_alive); return NULL; }
①: 从任务对列中取出一个任务, 没有则休眠;
②: 执行任务
③: 清除私有数据中存放的值
这在说明一点,用线程的私有数据进行存储, 主要是为了更新线程的状态方便;
int sl_thread_pool_push_task(struct sl_thread_pool *pool, void *(*task_fun)(void *arg), void *arg) { struct sl_task_queue *pstq = NULL; struct sl_thread_task *pstt = NULL; if (pool == NULL || task_fun == NULL || pool->destory == 1) { ERR("%s: pool or task_fun is NULL or is destory status", __FUNCTION__); return -1; } pstq = &pool->task_queue; pstt = (struct sl_thread_task*)malloc(sizeof(struct sl_thread_task)); if (pstt == NULL) { ERR("%s: malloc error for creat a task", __FUNCTION__); return -1; } pstt->task_fun = task_fun; pstt->arg = arg; return sl_task_push(pstq, pstt); }
该接口主要分配了一个空间初始化传进来的任务,往下看:
static int sl_task_push(struct sl_task_queue *_stq, struct sl_thread_task *new_task) { struct sl_task_queue *pstq = _stq; struct sl_thread_task *pstt = new_task; if (pstq == NULL || pstt == NULL) { ERR("%s: pstq or pstt is NULL", __FUNCTION__); return -1; } pthread_mutex_lock(&pstq->task_mutex); list_add(&pstq->task_head, &pstt->task_list); pstq->num_tasks_alive++; pthread_mutex_unlock(&pstq->task_mutex); sl_notify_one(pstq); sl_update_task_queue_info(); return pstq->num_tasks_alive; }
将刚才保存的任务添加进任务对列并发送通知;
笔者写这个线程池,主要涉及到这个点有: 同步变量, 锁, 线程私有数据, 管道, epoll和双向队列;
代码已经放到个人github上了: thread pool