Libuv学习——线程基础

前言

原文:Unlike network I/O, there are no platform-specific file I/O primitives libuv could rely on, so the current approach is to run blocking file I/O operations in a thread pool。 翻译:不像网络IO,libuv没有特定平台的异步IO原语能够依赖,因此当前是在线程池中执行阻塞(同步)IO来实现异步的。

根据libuv官网对其架构的介绍,咱们能够知道它并非单线程的,它有一个线程池,用来处理文件IODSN查询等操做。在介绍线程池以前,先经过POSIX Threads介绍一下线程的基本操做,为下一篇文章介绍线程池打下基础。若是您对libuv的总体架构感兴趣,能够访问下面连接了解,固然之后我也会写文章介绍的。html

Design overview - libuv documentation​
docs.libuv.org图标

POSIX Threads

相信你们对线程的概念都有或多或少的了解,这里就不介绍了,下面将直接经过API和demo来学习。因为不一样平台线程的规范和原语不同,而我对Linux比较熟悉,因此接下来将经过Linux中的POSIX Threads来说解。libuv线程池主要涉及到线程建立互斥锁条件变量3个东西,下面将分别介绍它们。api

线程建立

让咱们首先了解一下如何建立线程,代码和输出以下:数组

#include <stdio.h>
#include <pthread.h>

#define NUM_THREADS 5

int sum = 0;

void * thread_task(void * args) {
    int max = (int)args;

    for (int i = 0; i <= max; ++i) {
        sum += i;
    }
    printf("sum: %i\n", sum);
    pthread_exit(NULL);
}

int main() {
    pthread_t threads[NUM_THREADS];
    
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setstacksize(&attr, 8192);
    
    for (int i = 0; i < NUM_THREADS; ++i) {
        pthread_t thread = threads[i];
        int result = pthread_create(&thread, &attr, thread_task, (void *)10);
        if (result) {
            printf("线程建立失败 errCode:%i", result);
            return -1;
        }
    }
    pthread_attr_destroy(&attr);
    
    for (int i = 0; i < NUM_THREADS; ++i) {
        pthread_t thread = threads[i];
        int result = pthread_join(thread, NULL);
        if (result == 3) {
            printf("线程%i已经结束了\n", i);
            continue;
        }
    }
    
    printf("main函数运行结束, sum: %i\n", sum);
    return 0;
}复制代码

上面代码很简单,建立5个线程,每一个线程将传入数据做为最大值max,而后从0,1,2,3,...,max加到sum上。接下来粗略讲解一下每行代码的含义:bash

#include <stdio.h>
#include <pthread.h>

#define NUM_THREADS 5

int sum = 0;复制代码

前4行代码引入了stdio.h、pthread.h两个头文件,函数printf在stdio.h中定义,线程相关的api在pthread.h中定义;定义了一个常量NUM_THREADS和一个变量sum,常量NUM_THREADS表示要建立的线程数,变量sum用来计算总和。微信

void * thread_task(void * args) {
    int max = (int)args;

    for (int i = 0; i <= max; ++i) {
        sum += i;
    }
    printf("sum: %i\n", sum);
    pthread_exit(NULL);
}复制代码

接下来定义了一个函数thread_task,该函数会被每一个线程执行。函数很简单,将输入的int参数做为max,将0,1,2,3,...,max依次加到sum上,并将当前sum输出到控制台。最后执行pthread_exist结束线程。网络

最后让咱们看下main函数里面的内容,架构

int main() {
    pthread_t threads[NUM_THREADS];
    
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setstacksize(&attr, 8192);
    
    for (int i = 0; i < NUM_THREADS; ++i) {
        pthread_t thread = threads[i];
        int result = pthread_create(&thread, &attr, thread_task, (void *)10);
        if (result) {
            printf("线程建立失败 errCode:%i", result);
            return -1;
        }
    }
    pthread_attr_destroy(&attr);
    
    for (int i = 0; i < NUM_THREADS; ++i) {
        pthread_t thread = threads[i];
        int result = pthread_join(thread, NULL);
        if (result == 3) {
            printf("线程%i已经结束了\n", i);
        }
    }
    
    printf("main func end\n");
    return 0;
}复制代码

先定义了5个表明线程的数组threads;接着定义线程属性变量attr,将线程的栈设为8192个字节;以后经过pthread_create建立线程,每一个线程将会执行thread_task函数,并经过第3个参数将10传递给thread_task;最后经过pthread_join告诉main函数等到全部线程执行完以后再继续执行。app

互斥锁

若是足够仔细,相信你可能已经发现上面的输出不符合预期,按理说应该输出275才对,为啥只输出了249呢?异步

咱们再运行一下程序看看,结果又正常了。函数

让咱们简单经过2个线程来分析一下,假设此刻sum值为120,线程1中i循环到3,线程2循环到6,下表展现了致使sum错误的可能状况:

经过上表能够发现之因此出现问题是由于将i加到sum这个操做不是原子的,若是从读取sum、将i加到sum整个过程变成原子操做,就不会有问题了。解决该问题的经常使用方法之一就是互斥锁,让咱们简单修改一下代码:

...
pthread_mutex_t mutex;

void * thread_task(void * args) {
    int max = (int)args;

    for (int i = 0; i <= max; ++i) {
        pthread_mutex_lock(&mutex);
        sum += i;
        pthread_mutex_unlock(&mutex);
    }
    printf("sum: %i\n", sum);
    pthread_exit(NULL);
}

int main() {
    pthread_mutex_init(&mutex, NULL);
    ...
    pthread_mutex_destroy(&mutex);
}

复制代码

从代码的角度来看,修改后的代码增长了一个全局互斥锁mutex,并在main函数初始化。在sum += i;前面加了一句代码pthread_mutex_lock(&mutex),它告诉线程尝试获取锁,获取失败就挂起,等待其余线程释放锁;获取成功就继续执行代码,并经过pthread_mutex_unlock(&mutex)将获取的锁给释放掉。

条件变量

互斥锁只解决了多个线程修改共享变量的问题,对于下面场景它是没法办法解决的。一个线程须要知足一个条件才能执行下去,而这个条件由另外一个线程知足的。好比如今有一个变量i和2个线程,当i为0时第一个线程输出一段内容,并将i变成1;当i为1时,第二个线程输出一段内容,并将i变成0;两个线程依次交替执行。对于这个问题,咱们能够经过条件变量来实现。下面是实现的代码和输出。

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

int i = 0;

pthread_mutex_t mutex;
pthread_cond_t cond0;
pthread_cond_t cond1;

void * thread_task0(void * args) {
    while(1) {
        pthread_mutex_lock(&mutex);
        while (i != 0) {
            pthread_cond_wait(&cond0, &mutex);
        }
        sleep(1);

        printf("**************thread_task0 i: %i\n", i);
        i = 1;
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond1);
    }
}

void * thread_task1(void * args) {
    while(1) {
        pthread_mutex_lock(&mutex);
        while (i != 1) {
            pthread_cond_wait(&cond1, &mutex);

        }
        sleep(1);
        printf("################thread_task1 i: %i\n", i);
        i = 0;

        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond0);
               
    }
}

int main() {
    pthread_t thread0;
    pthread_t thread1;
    
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&cond0, NULL);
    pthread_cond_init(&cond1, NULL);
    
    pthread_create(&thread0, NULL, thread_task0, NULL);
    pthread_create(&thread1, NULL, thread_task1, NULL);
    
    pthread_join(thread0, NULL);
    pthread_join(thread1, NULL);
    
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond0);
    pthread_cond_destroy(&cond1);
    return 0;
}复制代码

让咱们简单分析一下代码吧,前3行引入了3个头文件,前2个已经介绍过了,第3个头文件中有sleep函数的定义,后面会用到。

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>复制代码

随后定义了变量i,互斥锁mutex和2个条件变量cond0、cond1,这里须要注意一下条件变量是须要和互斥锁一块儿使用的。

int i = 0;

pthread_mutex_t mutex;
pthread_cond_t cond0;
pthread_cond_t cond1;复制代码

紧接着咱们定义了2个函数,分别由2个线程执行,因为这两个函数文字解释比较麻烦,下面经过表格来表示两个线程的执行过程。这里须要注意的是,pthread_cond_wait会放弃当前线程得到的锁,并进入挂起状态。当其余线程经过pthread_cond_signal通知该线程时,该线程会被唤起,从新得到锁。

void * thread_task0(void * args) {
    while(1) {
        pthread_mutex_lock(&mutex);
        while (i != 0) {
            pthread_cond_wait(&cond0, &mutex);
        }
        sleep(1);

        printf("**************thread_task0 i: %i\n", i);
        i = 1;
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond1);
    }
}

void * thread_task1(void * args) {
    while(1) {
        pthread_mutex_lock(&mutex);
        while (i != 1) {
            pthread_cond_wait(&cond1, &mutex);

        }
        sleep(1);
        printf("################thread_task1 i: %i\n", i);
        i = 0;
        
        pthread_mutex_unlock(&mutex);
        pthread_cond_signal(&cond0);
    }
}复制代码

main函数只是用来启动上面介绍的两个线程,因此这里就不解释了。

Libuv的线程

上面介绍了POSIX Threads,接下来让咱们粗略的看下libuv的线程吧,libuv官网也给出了对应的API文档,有兴趣的同窗能够看下:

Threading and synchronization utilities​
docs.libuv.org

经过翻看源码,咱们能够在src/unix/thread.c和src/win/thread.c文件下看到libuv线程的实现,很简单,就是对各个平台原有线程API进行包装,使得API统一化,下面经过src/unix/thread.c稍稍看下它的实现吧。

线程建立API

typedef pthread_t uv_thread_t;

int uv_thread_create_ex(uv_thread_t* tid,
                        const uv_thread_options_t* params,
                        void (*entry)(void *arg),
                        void *arg) {
  int err;
  pthread_attr_t* attr;
  pthread_attr_t attr_storage;
  size_t pagesize;
  size_t stack_size;

  /* Used to squelch a -Wcast-function-type warning. */
  union {
    void (*in)(void*);
    void* (*out)(void*);
  } f;

  stack_size =
      params->flags & UV_THREAD_HAS_STACK_SIZE ? params->stack_size : 0;

  attr = NULL;
  if (stack_size == 0) {
    stack_size = thread_stack_size();
  } else {
    pagesize = (size_t)getpagesize();
    /* Round up to the nearest page boundary. */
    stack_size = (stack_size + pagesize - 1) &~ (pagesize - 1);
#ifdef PTHREAD_STACK_MIN
    if (stack_size < PTHREAD_STACK_MIN)
      stack_size = PTHREAD_STACK_MIN;
#endif
  }

  if (stack_size > 0) {
    attr = &attr_storage;

    if (pthread_attr_init(attr))
      abort();

    if (pthread_attr_setstacksize(attr, stack_size))
      abort();
  }

  f.in = entry;
  err = pthread_create(tid, attr, f.out, arg);

  if (attr != NULL)
    pthread_attr_destroy(attr);

  return UV__ERR(err);
}复制代码

能够看到建立线程的方法和咱们在POSIX Threads中介绍的差很少,都是经过pthread_create来建立,只不过经过pthread_attr_t设置了一些线程属性罢了,好比线程堆栈的大小。

互斥量API

typedef pthread_mutex_t uv_mutex_t;

int uv_mutex_init(uv_mutex_t* mutex) {
#if defined(NDEBUG) || !defined(PTHREAD_MUTEX_ERRORCHECK)
  return UV__ERR(pthread_mutex_init(mutex, NULL));
#else
  pthread_mutexattr_t attr;
  int err;

  if (pthread_mutexattr_init(&attr))
    abort();

  if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK))
    abort();

  err = pthread_mutex_init(mutex, &attr);

  if (pthread_mutexattr_destroy(&attr))
    abort();

  return UV__ERR(err);
#endif
}

void uv_mutex_lock(uv_mutex_t* mutex) {
  if (pthread_mutex_lock(mutex))
    abort();
}

void uv_mutex_unlock(uv_mutex_t* mutex) {
  if (pthread_mutex_unlock(mutex))
    abort();
}复制代码

互斥锁的API也和咱们POSIX Threads里介绍的差很少。

总结

本文初步经过线程建立互斥锁条件变量介绍了POSIX Threads以及libuv自己的线程API,这些是libuv实现线程池的核心,结合上篇《Libuv学习——队列》,咱们已经为下篇libuv线程池打好了基础,全部您有兴趣的话,能够关注咱们微信公众号《方凳雅集》,这样您将会在第一时间看到咱们的文章。


相关文章
相关标签/搜索