C++11 并发编程教程 - Part 3 : 锁的进阶与条件变量

上一篇文章中咱们学习了如何使用互斥量来解决一些线程同步问题。这一讲咱们将进一步讨论互斥量的话题,并向你们介绍 C++11 并发库中的另外一种同步机制 —— 条件变量。 多线程


递归锁
并发

考虑下面这个简单类:
app

1
2
3
4
5
6
7
8
9
10
11
12
13
struct  Complex {
    std::mutex mutex;
    int  i;
    Complex() : i(0) {}
    void  mul(int  x){
        std::lock_guard<std::mutex> lock(mutex);
        i *= x;
    }
    void  div(int  x){
        std::lock_guard<std::mutex> lock(mutex);
        i /= x;
    }
};

如今你想添加一个操做以便无误地一并执行上述两项操做,因而你添加了一个函数: 函数

1
2
3
4
5
void  both(int  x,  int  y){
    std::lock_guard<std::mutex> lock(mutex);
    mul(x);
    div(y);
}

让咱们来测试这个函数:
学习

1
2
3
4
5
int  main(){
    Complex complex;
    complex.both(32, 23);
    return  0;
}

若是你运行上述测试,你会发现这个程序将永远不会结束。缘由很简单,在 both() 函数中,线程将申请锁,而后调用 mul() 函数,在这个函数[译注:指 mul() ]中,线程将再次申请该锁,但该锁已经被锁住了。这是死锁的一种状况。默认状况下,一个线程不能重复申请同一个互斥量上的锁。 测试

这里有一个简单的解决办法:std::recursive_mutex 。这个互斥量可以被同一个线程重复上锁,下面就是 Complex 结构体的正确实现:
fetch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
struct  Complex {
    std::recursive_mutex mutex;
    int  i;
    Complex() : i(0) {}
    void  mul(int  x){
        std::lock_guard<std::recursive_mutex> lock(mutex);
        i *= x;
    }
    void  div(int  x){
        std::lock_guard<std::recursive_mutex> lock(mutex);
        i /= x;
    }
    void  both(int  x,  int  y){
        std::lock_guard<std::recursive_mutex> lock(mutex);
        mul(x);
        div(y);
    }
};

这样一来,程序就能正常的结束了。
this


计时锁
spa

有些时候,你并不想某个线程永无止境地去等待某个互斥量上的锁。譬如说你的线程但愿在等待某个锁的时候作点其余的事情。为了达到这一目的,标准库提供了一套解决方案:std::timed_mutex std::recursive_timed_mutex (若是你的锁须要具有递归性的话)。他们具有与 std::mutex 相同的函数:lock()  unlock(),同时还提供了两个新的函数:try_lock_for()  try_lock_until()  线程

第一个函数,也是最有用的一个,它容许你设置一个超时参数,一旦超时,就算当前尚未得到锁,函数也会自动返回。该函数在得到锁以后返回 true,不然 false。下面咱们来看一个简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::timed_mutex mutex;
void  work(){
    std::chrono::milliseconds timeout(100);
    while(true){
        if(mutex.try_lock_for(timeout)){
            std::cout << std::this_thread::get_id() <<  ": do work with the mutex"  << std::endl;
            std::chrono::milliseconds sleepDuration(250);
            std::this_thread::sleep_for(sleepDuration);
            mutex.unlock();
            std::this_thread::sleep_for(sleepDuration);
        }  else  {
            std::cout << std::this_thread::get_id() <<  ": do work without mutex"  << std::endl;
            std::chrono::milliseconds sleepDuration(100);
            std::this_thread::sleep_for(sleepDuration);
        }
    }
}
int  main(){
    std::thread  t1(work);
    std::thread  t2(work);
    t1.join();
    t2.join();
    return  0;
}

(这个示例在实践中是毫无用处的)

值得注意的是示例中时间间隔声明:std::chrono::milliseconds 。它一样是 C++11 的新特性。你能够获得多种时间单位:纳秒、微妙、毫秒、秒、分以及小时。咱们使用上述某个时间单位以设置try_lock_for() 函数的超时参数。咱们一样可使用它们并经过 std::this_thread::sleep_for() 函数来设置线程的睡眠时间。示例中剩下的代码就没什么使人激动的了,只是一些使得结果可见的打印语句。注意:这段示例永远不会结束,你须要本身把他 kill 掉。


Call Once

有时候你但愿某个函数在多线程环境中只被执行一次。譬如一个由两部分组成的函数,第一部分只能被执行一次,而第二部分则在该函数每次被调用时都应该被执行。咱们可使用 std::call_once 函数垂手可得地实现这一功能。下面是针对这一机制的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
std::once_flag flag;
void  do_something(){
    std::call_once(flag, [](){std::cout <<  "Called once"  << std::endl;});
    std::cout <<  "Called each time"  << std::endl;
}
int  main(){
    std::thread  t1(do_something);
    std::thread  t2(do_something);
    std::thread  t3(do_something);
    std::thread  t4(do_something);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    return  0;
}

每个 std::call_once 函数都有一个 std::once_flag 变量与之匹配。在上例中我使用了 Lambda 表达式[译注:此处意译]来做为只被执行一次的代码,而使用函数指针以及 std::function 对象也一样可行。


条件变量

条件变量维护着一个线程列表,列表中的线程都在等待该条件变量上的另外某个线程将其唤醒。[译注:原文对于如下内容的阐释有误,故译者参照cppreference.com `条件变量` 一节进行翻译] 每一个想要在 std::condition_variable 上等待的线程都必须首先得到一个 std::unique_lock 锁。[译注:条件变量的] wait 操做会自动地释放锁并挂起对应的线程。当条件变量被通知时,挂起的线程将被唤醒,锁将会被再次申请。

一个很是好的例子就是有界缓冲区。它是一个环形缓冲,拥有肯定的容量、起始位置以及结束位置。下面就是使用条件变量实现的一个有界缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
struct  BoundedBuffer {
    int* buffer;
    int  capacity;
    int  front;
    int  rear;
    int  count;
    std::mutex lock;
    std::condition_variable not_full;
    std::condition_variable not_empty;
    BoundedBuffer(int  capacity) : capacity(capacity), front(0), rear(0), count(0) {
        buffer =  new  int[capacity];
    }
    ~BoundedBuffer(){
        delete[] buffer;
    }
    void  deposit(int  data){
        std::unique_lock<std::mutex> l(lock);
        not_full.wait(l, [&count, &capacity](){return  count != capacity; });
        buffer[rear] = data;
        rear = (rear + 1) % capacity;
        ++count;
        not_empty.notify_one();
    }
    int  fetch(){
        std::unique_lock<std::mutex> l(lock);
        not_empty.wait(l, [&count](){return  count != 0; });
        int  result = buffer[front];
        front = (front + 1) % capacity;
        --count;
        not_full.notify_one();
        return  result;
    }
};

类中互斥量由 std::unique_lock 接管,它是用于管理锁的 Wrapper,是使用条件变量的必要条件。咱们使用 notify_one() 函数唤醒等待在条件变量上的某个线程。而函数 wait() 就有些特别了,其第一个参数是咱们的 std::unique_lock,而第二个参数是一个断言。要想持续等待的话,这个断言就必须返回false,这就有点像 while(!predicate()) { cv.wait(l); } 的形式。上例剩下的部分就没什么好说的了。

咱们可使用上例的缓冲区解决“多消费者/多生产者”问题。这是一个很是广泛的同步问题,许多线程(消费者)在等待由其余一些线程(生产者)生产的数据。下面就是一个使用这个缓冲区的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void  consumer(int  id, BoundedBuffer& buffer){
    for(int  i = 0; i < 50; ++i){
        int  value = buffer.fetch();
        std::cout <<  "Consumer "  << id <<  " fetched "  << value << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(250));
    }
}
void  producer(int  id, BoundedBuffer& buffer){
    for(int  i = 0; i < 75; ++i){
        buffer.deposit(i);
        std::cout <<  "Produced "  << id <<  " produced "  << i << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}
int  main(){
    BoundedBuffer buffer(200);
    std::thread  c1(consumer, 0, std::ref(buffer));
    std::thread  c2(consumer, 1, std::ref(buffer));
    std::thread  c3(consumer, 2, std::ref(buffer));
    std::thread  p1(producer, 0, std::ref(buffer));
    std::thread  p2(producer, 1, std::ref(buffer));
    c1.join();
    c2.join();
    c3.join();
    p1.join();
    p2.join();
    return  0;
}

三个消费者线程和两个生产者线程被建立后就不断地对缓冲区进行查询。值得关注的是例子中使用std::ref 来传递缓冲区的引用,以避免形成对缓冲区的拷贝。


总结

这一节咱们讲到了许多东西,首先,咱们看到如何使用递归锁实现某个线程对同一锁的屡次加锁。接下来知道了如何在加锁时设定一个超时属性。而后咱们学习了一种调用某个函数有且只有一次的方法。最后咱们使用条件变量解决了“多生产者/多消费者”同步问题。


下篇

下一节咱们将讲到 C++11同步库中另外一个新特性 —— 原子量。

相关文章
相关标签/搜索