经过线程池来处理web服务器端的数据处理和接收,会使服务器端的效率显著提高。 web
经过队列工厂生产和消费的模型,来编写线程池。 服务器
线程池中每个线程的结构以下: 线程
typedef struct worker{ void *(*process)(void *arg); void *arg; struct worker *next; }Thread_worker;线程池的结构以下:
typedef struct{ pthread_mutex_t queue_lock; pthread_cond_t queue_ready; Thread_worker *queue_head; int shutdown; pthread_t *threadID; int max_thread_num; int cur_queue_size; }Thread_pool;线程初始化:
void pool_init(int max_thread_num){ pool = (Thread_pool*) malloc(sizeof(Thread_pool)); pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = max_thread_num; pool->cur_queue_size = 0; pool->shutdown = 0; pool->threadID = (pthread_t *)malloc(max_thread_num *sizeof(pthread_t)); int i = 0; for (i = 0; i < max_thread_num; i++){ pthread_create(&(pool->threadID[i]), NULL, thread_routine, NULL); } }向线程池内添加任务队列以下:
int pool_add_worker(void *(*process)(void *arg), void *arg){ Thread_worker *newworker = (Thread_worker *)malloc(sizeof(Thread_worker)); newworker->process = process; newworker->arg = arg; newworker->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); Thread_worker *member = pool->queue_head; if (member != NULL){ while (member->next != NULL) member = member->next; member->next = newworker; }else{ pool->queue_head = newworker; } assert(pool->queue_head != NULL); pool->cur_queue_size++; pthread_mutex_unlock(&(pool->queue_lock)); pthread_cond_signal(&(pool->queue_ready)); return 0; }线程周期化处理和循环:
void *thread_routine(void *arg){ printf("starting thread 0x%x\n", (unsigned int)pthread_self()); while(1){ pthread_mutex_lock(&(pool->queue_lock)); while(pool->cur_queue_size == 0 && !pool->shutdown){ printf("thread 0x%x is waiting\n", (unsigned int)pthread_self()); pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); } if (pool->shutdown){ pthread_mutex_unlock(&(pool->queue_lock)); printf("thread 0x%x will exit\n", (unsigned int)pthread_self()); pthread_exit(NULL); } printf("thread 0x%x is starting to work\n", (unsigned int)pthread_self()); assert(pool->cur_queue_size != 0); assert(pool->queue_head != NULL); pool->cur_queue_size--; Thread_worker *worker = pool->queue_head; pool->queue_head = worker->next; pthread_mutex_unlock(&(pool->queue_lock)); (*(worker->process))(worker->arg); free(worker); worker = NULL; } pthread_exit(NULL); }至此线程池的操做简单的完成了。
Comming soon..... code