原文:Unlike network I/O, there are no platform-specific file I/O primitives libuv could rely on, so the current approach is to run blocking file I/O operations in a thread pool。 翻译:不像网络IO,libuv没有特定平台的异步IO原语能够依赖,因此当前是在线程池中执行阻塞(同步)IO来实现异步的。
根据libuv官网对其架构的介绍,咱们能够知道它并非单线程的,它有一个线程池,用来处理文件IO、DSN查询等操做。在介绍线程池以前,先经过POSIX Threads介绍一下线程的基本操做,为下一篇文章介绍线程池打下基础。若是您对libuv的总体架构感兴趣,能够访问下面连接了解,固然之后我也会写文章介绍的。html
Design overview - libuv documentation相信你们对线程的概念都有或多或少的了解,这里就不介绍了,下面将直接经过API和demo来学习。因为不一样平台线程的规范和原语不同,而我对Linux比较熟悉,因此接下来将经过Linux中的POSIX Threads来说解。libuv线程池主要涉及到线程建立、互斥锁和条件变量3个东西,下面将分别介绍它们。api
让咱们首先了解一下如何建立线程,代码和输出以下:数组
#include <stdio.h>
#include <pthread.h>
#define NUM_THREADS 5
int sum = 0;
void * thread_task(void * args) {
int max = (int)args;
for (int i = 0; i <= max; ++i) {
sum += i;
}
printf("sum: %i\n", sum);
pthread_exit(NULL);
}
int main() {
pthread_t threads[NUM_THREADS];
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 8192);
for (int i = 0; i < NUM_THREADS; ++i) {
pthread_t thread = threads[i];
int result = pthread_create(&thread, &attr, thread_task, (void *)10);
if (result) {
printf("线程建立失败 errCode:%i", result);
return -1;
}
}
pthread_attr_destroy(&attr);
for (int i = 0; i < NUM_THREADS; ++i) {
pthread_t thread = threads[i];
int result = pthread_join(thread, NULL);
if (result == 3) {
printf("线程%i已经结束了\n", i);
continue;
}
}
printf("main函数运行结束, sum: %i\n", sum);
return 0;
}复制代码
上面代码很简单,建立5个线程,每一个线程将传入数据做为最大值max,而后从0,1,2,3,...,max加到sum上。接下来粗略讲解一下每行代码的含义:bash
#include <stdio.h>
#include <pthread.h>
#define NUM_THREADS 5
int sum = 0;复制代码
前4行代码引入了stdio.h、pthread.h两个头文件,函数printf在stdio.h中定义,线程相关的api在pthread.h中定义;定义了一个常量NUM_THREADS和一个变量sum,常量NUM_THREADS表示要建立的线程数,变量sum用来计算总和。微信
void * thread_task(void * args) {
int max = (int)args;
for (int i = 0; i <= max; ++i) {
sum += i;
}
printf("sum: %i\n", sum);
pthread_exit(NULL);
}复制代码
接下来定义了一个函数thread_task,该函数会被每一个线程执行。函数很简单,将输入的int参数做为max,将0,1,2,3,...,max依次加到sum上,并将当前sum输出到控制台。最后执行pthread_exist结束线程。网络
最后让咱们看下main函数里面的内容,架构
int main() {
pthread_t threads[NUM_THREADS];
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 8192);
for (int i = 0; i < NUM_THREADS; ++i) {
pthread_t thread = threads[i];
int result = pthread_create(&thread, &attr, thread_task, (void *)10);
if (result) {
printf("线程建立失败 errCode:%i", result);
return -1;
}
}
pthread_attr_destroy(&attr);
for (int i = 0; i < NUM_THREADS; ++i) {
pthread_t thread = threads[i];
int result = pthread_join(thread, NULL);
if (result == 3) {
printf("线程%i已经结束了\n", i);
}
}
printf("main func end\n");
return 0;
}复制代码
先定义了5个表明线程的数组threads;接着定义线程属性变量attr,将线程的栈设为8192个字节;以后经过pthread_create建立线程,每一个线程将会执行thread_task函数,并经过第3个参数将10传递给thread_task;最后经过pthread_join告诉main函数等到全部线程执行完以后再继续执行。app
若是足够仔细,相信你可能已经发现上面的输出不符合预期,按理说应该输出275才对,为啥只输出了249呢?异步
咱们再运行一下程序看看,结果又正常了。函数
让咱们简单经过2个线程来分析一下,假设此刻sum值为120,线程1中i循环到3,线程2循环到6,下表展现了致使sum错误的可能状况:
经过上表能够发现之因此出现问题是由于将i加到sum这个操做不是原子的,若是从读取sum、将i加到sum整个过程变成原子操做,就不会有问题了。解决该问题的经常使用方法之一就是互斥锁,让咱们简单修改一下代码:
...
pthread_mutex_t mutex;
void * thread_task(void * args) {
int max = (int)args;
for (int i = 0; i <= max; ++i) {
pthread_mutex_lock(&mutex);
sum += i;
pthread_mutex_unlock(&mutex);
}
printf("sum: %i\n", sum);
pthread_exit(NULL);
}
int main() {
pthread_mutex_init(&mutex, NULL);
...
pthread_mutex_destroy(&mutex);
}
复制代码
从代码的角度来看,修改后的代码增长了一个全局互斥锁mutex,并在main函数初始化。在sum += i;前面加了一句代码pthread_mutex_lock(&mutex),它告诉线程尝试获取锁,获取失败就挂起,等待其余线程释放锁;获取成功就继续执行代码,并经过pthread_mutex_unlock(&mutex)将获取的锁给释放掉。
互斥锁只解决了多个线程修改共享变量的问题,对于下面场景它是没法办法解决的。一个线程须要知足一个条件才能执行下去,而这个条件由另外一个线程知足的。好比如今有一个变量i和2个线程,当i为0时第一个线程输出一段内容,并将i变成1;当i为1时,第二个线程输出一段内容,并将i变成0;两个线程依次交替执行。对于这个问题,咱们能够经过条件变量来实现。下面是实现的代码和输出。
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
int i = 0;
pthread_mutex_t mutex;
pthread_cond_t cond0;
pthread_cond_t cond1;
void * thread_task0(void * args) {
while(1) {
pthread_mutex_lock(&mutex);
while (i != 0) {
pthread_cond_wait(&cond0, &mutex);
}
sleep(1);
printf("**************thread_task0 i: %i\n", i);
i = 1;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond1);
}
}
void * thread_task1(void * args) {
while(1) {
pthread_mutex_lock(&mutex);
while (i != 1) {
pthread_cond_wait(&cond1, &mutex);
}
sleep(1);
printf("################thread_task1 i: %i\n", i);
i = 0;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond0);
}
}
int main() {
pthread_t thread0;
pthread_t thread1;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond0, NULL);
pthread_cond_init(&cond1, NULL);
pthread_create(&thread0, NULL, thread_task0, NULL);
pthread_create(&thread1, NULL, thread_task1, NULL);
pthread_join(thread0, NULL);
pthread_join(thread1, NULL);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond0);
pthread_cond_destroy(&cond1);
return 0;
}复制代码
让咱们简单分析一下代码吧,前3行引入了3个头文件,前2个已经介绍过了,第3个头文件中有sleep函数的定义,后面会用到。
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>复制代码
随后定义了变量i,互斥锁mutex和2个条件变量cond0、cond1,这里须要注意一下条件变量是须要和互斥锁一块儿使用的。
int i = 0;
pthread_mutex_t mutex;
pthread_cond_t cond0;
pthread_cond_t cond1;复制代码
紧接着咱们定义了2个函数,分别由2个线程执行,因为这两个函数文字解释比较麻烦,下面经过表格来表示两个线程的执行过程。这里须要注意的是,pthread_cond_wait会放弃当前线程得到的锁,并进入挂起状态。当其余线程经过pthread_cond_signal通知该线程时,该线程会被唤起,从新得到锁。
void * thread_task0(void * args) {
while(1) {
pthread_mutex_lock(&mutex);
while (i != 0) {
pthread_cond_wait(&cond0, &mutex);
}
sleep(1);
printf("**************thread_task0 i: %i\n", i);
i = 1;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond1);
}
}
void * thread_task1(void * args) {
while(1) {
pthread_mutex_lock(&mutex);
while (i != 1) {
pthread_cond_wait(&cond1, &mutex);
}
sleep(1);
printf("################thread_task1 i: %i\n", i);
i = 0;
pthread_mutex_unlock(&mutex);
pthread_cond_signal(&cond0);
}
}复制代码
main函数只是用来启动上面介绍的两个线程,因此这里就不解释了。
上面介绍了POSIX Threads,接下来让咱们粗略的看下libuv的线程吧,libuv官网也给出了对应的API文档,有兴趣的同窗能够看下:
Threading and synchronization utilities经过翻看源码,咱们能够在src/unix/thread.c和src/win/thread.c文件下看到libuv线程的实现,很简单,就是对各个平台原有线程API进行包装,使得API统一化,下面经过src/unix/thread.c稍稍看下它的实现吧。
typedef pthread_t uv_thread_t;
int uv_thread_create_ex(uv_thread_t* tid,
const uv_thread_options_t* params,
void (*entry)(void *arg),
void *arg) {
int err;
pthread_attr_t* attr;
pthread_attr_t attr_storage;
size_t pagesize;
size_t stack_size;
/* Used to squelch a -Wcast-function-type warning. */
union {
void (*in)(void*);
void* (*out)(void*);
} f;
stack_size =
params->flags & UV_THREAD_HAS_STACK_SIZE ? params->stack_size : 0;
attr = NULL;
if (stack_size == 0) {
stack_size = thread_stack_size();
} else {
pagesize = (size_t)getpagesize();
/* Round up to the nearest page boundary. */
stack_size = (stack_size + pagesize - 1) &~ (pagesize - 1);
#ifdef PTHREAD_STACK_MIN
if (stack_size < PTHREAD_STACK_MIN)
stack_size = PTHREAD_STACK_MIN;
#endif
}
if (stack_size > 0) {
attr = &attr_storage;
if (pthread_attr_init(attr))
abort();
if (pthread_attr_setstacksize(attr, stack_size))
abort();
}
f.in = entry;
err = pthread_create(tid, attr, f.out, arg);
if (attr != NULL)
pthread_attr_destroy(attr);
return UV__ERR(err);
}复制代码
能够看到建立线程的方法和咱们在POSIX Threads中介绍的差很少,都是经过pthread_create来建立,只不过经过pthread_attr_t设置了一些线程属性罢了,好比线程堆栈的大小。
typedef pthread_mutex_t uv_mutex_t;
int uv_mutex_init(uv_mutex_t* mutex) {
#if defined(NDEBUG) || !defined(PTHREAD_MUTEX_ERRORCHECK)
return UV__ERR(pthread_mutex_init(mutex, NULL));
#else
pthread_mutexattr_t attr;
int err;
if (pthread_mutexattr_init(&attr))
abort();
if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK))
abort();
err = pthread_mutex_init(mutex, &attr);
if (pthread_mutexattr_destroy(&attr))
abort();
return UV__ERR(err);
#endif
}
void uv_mutex_lock(uv_mutex_t* mutex) {
if (pthread_mutex_lock(mutex))
abort();
}
void uv_mutex_unlock(uv_mutex_t* mutex) {
if (pthread_mutex_unlock(mutex))
abort();
}复制代码
互斥锁的API也和咱们POSIX Threads里介绍的差很少。
本文初步经过线程建立、互斥锁和条件变量介绍了POSIX Threads以及libuv自己的线程API,这些是libuv实现线程池的核心,结合上篇《Libuv学习——队列》,咱们已经为下篇libuv线程池打好了基础,全部您有兴趣的话,能够关注咱们微信公众号《方凳雅集》,这样您将会在第一时间看到咱们的文章。