15分钟让你了解如何实现并发中的Barrier

说到Barrier,不少语言中已是标准库中自带的概念,通常状况下,只须要直接使用就好了。而最近一些机缘巧合的机会,我须要在c++中使用这么个玩意儿。可是c++标准库里尚未这个概念,只有boost里面有这样现成的东西,而我又不想为了这么一个小东西引入个boost。因此,我借着这个机会研究了下,发现其实这些多线程/并发中的东西仍是蛮有意思的。ios

 

阅读本文你可能须要以下的一些知识:c++

  1. 多线程编程的概念。编程

  2. c++的基本语法和有关多线程的语法。多线程

 

第二条可能也没有那么重要,由于若是理解了多线程的这些东西,什么语言均可以实现其核心概念。好了,废话少扯,进入正题。并发

 

1、什么是Barrier?函数

 

首先,得介绍下Barrier的概念,Barrier从字面理解是屏障的意思,主要是用做集合线程,而后再一块儿往下执行。再具体一点,在Barrier以前,若干个thread各自执行,而后到了Barrier的时候停下,等待规定数目的全部的其余线程到达这个Barrier,以后再一块儿经过这个Barrier各自干本身的事情。性能

 

这个概念特别像小时候集体活动的过程,你们从各自的家里到学校集合,待人数都到齐以后,以后再一块儿坐车出去,到达指定地点后一块儿行动或者各自行动。网站

 

而在计算机的世界里,Barrier能够解决的问题不少,好比,一个程序有若干个线程并发的从网站上下载一个大型xml文件,这个过程能够相互独立,由于一个文件的各个部分并不相关。而在处理这个文件的时候,可能须要一个完整的文件,因此,须要有一条虚拟的线让这些并发的部分集合一下从而能够拼接成为一个完整的文件,多是为了后续处理也多是为了计算hash值来验证文件的完整性。然后,再交由下一步处理。this

 

2、如何实现一个Barrier?spa

 

并发的不少东西都拥有一个坏处就是你很难证实某种实现不是错误的,由于不少时候确实状况太多了,不管是死锁,饥饿对于人脑都是太大的负担。而反过来,对于我扯这篇文章,也是一个好处,正由于很难证实不是错误的,因此个人扯淡能够更放心一点。

 

在研究Barrier的实现中,我查阅了蛮多的资料的。说实话,其实现方式挺多的。在剔除了一些我能明确证实其有多是错误的,我选择了我本身以为最容易理解的一种。

 

第一节说过,barrier很像是之前的班级集合,站在一个老师的角度,你须要知道的东西至少有这两个:

  1. 班级有多少人。

  2. 目前已经到了多少人。

     

只有当目前已经到了的人等于班级人数以后才能出发。

 

因此若是按照这个类比,实现一个barrier至少须要如下的几个变量:

  1. 须要同时在barrier等待的线程的个数。

  2. 当前到达barrier的线程的个数。

 

而按照barrier的逻辑,主要应该有这些操做:

  1. 当一个线程到达barrier的时候,增长计数。

  2. 若是个数不等于当前须要等待的线程个数,等待。

  3. 若是个数达到了须要等待的线程个数,通知/唤醒全部等待的进程,让全部进程经过barrier。

 

在不考虑加锁的状况下,按照上面的逻辑,伪代码大概应该像这样:

thread_count = n; <-- n是须要一块儿等待的线程的个数
arrived_count = 0; <-- 到达线程的个数
-------------------------------------------------------------
 以上是全局变量,只会初始化一次,如下是barrier开始的代码
-------------------------------------------------------------
arrived_count += 1;
if(arrived_count == thread_count)
    notify_all_threads_and_unblok();
else
    block_and_wait();

而在多线程环境下,很明显arrived_count这种全局变量更新须要加锁。因此,对于这个代码,综合稍微再改动一下,伪代码能够更新下成为这样:

thread_count = n; <-- n是须要一块儿等待的线程的个数
arrived_count = 0; <-- 到达线程的个数
-------------------------------------------------------------
 以上是全局变量,只会初始化一次,如下是barrier开始的代码
-------------------------------------------------------------
lock();
    arrived_count += 1;
unlock();
if(arrived_count == thread_count)
    notify_all_threads_and_unblok();
else
    block_and_wait();

这里,在有的语言中,锁的粒度可能小了点,取决于notify_all_threads和wait在这个语言中的定义,可是做为伪代码,为了可能展现起来比较方便。

 

而若是你有并发编程的知识,你应该敏感的认识到notify_all_threads_and_unblock,block_and_wait这种在这里虽然是简单的几个单词,可是其包含的操做步骤明显不止一个,更别说背后的机器指令了。因此做为一个并发概念下运行的程序,不能够简单的就放这样一个操做在这里,若是都是任何函数,指令,代码都是自带原子性的,那么写多线程/并发程序也没有啥好研究的了。因此对于这两个操做,咱们必须具体的扩展下。

 

对于notify_all_threads_and_unblock和block_and_wait包含至关多的操做,因此下面,得把这两个操做具体的展开。

 1 thread_count = n; <-- n是须要一块儿等待的线程的个数
 2 arrived_count = 0; <-- 到达线程的个数
 3 could_release = false; 
 4 -------------------------------------------------------------
 5  以上是全局变量,只会初始化一次,如下是barrier开始的代码
 6 -------------------------------------------------------------
 7 lock();
 8     if(arrived_count == 0)
 9        could_release = false; 
10     
11     arrived_count += 1;
12 unlock();
13 if(arrived_count == thread_count)
14     could_realse = true;    
15     arrived_count = 0; 
16 else
17     while(could_release == false)
18         spin()

这里多了一个变量could_release完成上面说的两个操做。原理也很简单,若是等待的个数没有到达指定数目,这个值始终是false,在代码中使用循环让线程阻塞在spin处(固然,假设spin是原子性的)。若是到达了thread_count,改变could_release的值,这样循环条件不知足,代码能够继续执行。而在13行的if里面把arrived_count从新设置为0是由于若是不这样作,那么这个barrier就只能用一次,由于没有地方再把这个表示到达线程数目变量的初始值从新设置了。

 

我以为这里须要停一下,来思一下上面的代码,首先,这个代码有不少看起来很像有问题的地方。好比对于could_release和arrived_count的重置处,这都是赋值,而在并发程序中,任何写操做都须要仔细思考是否须要加锁,在这里,加锁固然没问题。可是盲目的加锁会致使性能损失。

 

多线程程序最可怕的就是陷入细节,因此,我通常都是总体的思考下是否是有问题。对于一个barrier,错误就是指没有等全部的线程都到达了就中止了等待,人没来齐就发车了。而怎么会致使这样的状况呢?只有当arrived_count值在两个线程不一样步才会致使错误。秉承这个原则,看看上面的代码,arrived_count的更新是加锁的,因此在到达if以前其值是能够信赖的。而if这段判断自己是读操做,其判断就是能够信赖的,由于arrived_count的值更新是可靠的,因此进来的线程要么进入if,要么进入else。不存在线程1更新了arrived_count的值而线程2读到了arrived_count的值而致使没有到thread_count就更新了could_release的状况。

 

没办法,这类的程序就是很绕,因此我通常都不陷入细节。

 

如今看起来,一切都很完美,可是多线程程序最恶心的地方就是可能的死锁,饥饿等等。而这些又很难证实,而上面这段代码,在某些状况下就是会致使死锁。考虑thread_count等于2,也就是这个barrier须要等待两个线程一块儿经过。

 

如今有两个线程,t1和t2,t1先执行直到17行,卡住,这时候t2得到宝贵的cpu机会。很明显,这时会进入14行,更新could_release的值。若是这个时候t1得到执行机会,万事大吉,t1会离开while区域,继续执行。直到下次再次到达这个barrier。

 

可是若是这个时候t1并无得到执行机会,t2一直执行,虽然唤醒了could_relase,可是t1会一直停留在18行。要知道,这个含有barrier的代码多是在一个循环之中,若是t2再次到达barrier的区域,这时候arrived_count等于0(由于arrived_count在上一次t2进入13行以后重置了),这个时候could_relase会变成false。如今t1,t2都在18行了,没有人有机会去更新could_relase的值,线程死锁了。

 

怎么办?仔细思考下,是唤醒机制有问题,很明显,若是可以在唤醒的时候原子式的唤醒全部的线程,那么上面所说的问题就不存在了。在不少语言里都有这样的方法能够完成上面说的原子性的唤醒全部线程,好比c++里面的notify_all。可是,若是没有这个函数,该如何实现呢?

 

上面死锁问题的诞生在于一个线程不恰当的更新了全局的could_relase,致使所有的判断条件跟着错误的改变。解决这样的问题,须要的是一个只有每一个线程各自能看到,能够独立更新,互相不干扰而又能被使用的变量。幸亏,在设计多线程概念时,有一个概念叫作thread local,恰好可以知足这个要求。而运用这样的变量,上述的概念能够表述成为:

 1 thread_count = n; <-- n是须要一块儿等待的线程的个数
 2 arrived_count = 0; <-- 到达线程的个数
 3 could_release = false;
 4 thread_local_flag = could_release; <-- 线程局部变量,每一个线程独立更新 
 5 -------------------------------------------------------------
 6  以上是全局变量,只会初始化一次,如下是barrier开始的代码
 7 -------------------------------------------------------------
 8 thread_local_flag = !thread_local_flag
 9 lock();
10     arrived_count += 1;
11 unlock();
12 if(arrived_count == thread_count)
13     could_realse = thread_local_flag;    
14     arrived_count = 0; 
15 else
16     while(could_release != thread_local_flag)
17         spin()

这里要着重解释下,为何不会死锁,因为thread_local_flag是每一个线程独立更新的,因此很明显,其是不用加锁的。其他代码和上面的伪代码相似,不一样的是,若是发生上面同样的状况,t2更新thread_local_flag的时候,只有其局部的变量会被置反而不会影响其他的线程的变量,而由于could_realse是全局变量,在t2第一次执行到13行的时候已经设置成thread_local_flag同样的值了。这个时候, 哪怕t2再次执行到16行也会由于其内部变量已经被置反而阻塞在这个while循环之中。而t1只要得到执行机会,就能够经过这个barrier。

 

有点绕,可是仔细想一想仍是蛮有意思的。

 

3、如何运用c++实现Barrier?

 

虽然上面说了那么多,可是c++中实现Barrier不须要这么复杂,这要感谢c++ 11中已经自带了不少原子性的操做,好比上面说的notify_all。因此,代码就没有那么复杂了,固然,c++也有thread_local,若是不畏劳苦,能够真的从最基础的写起。

#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>using namespace std;
​
class TestBarrier{
public:
    TestBarrier(int nThreadCount):
        m_threadCount(nThreadCount),
        m_count(0),
        m_release(0)
    {}
​
    void wait1(){
        unique_lock<mutex> lk(m_lock);
        if(m_count == 0){
            m_release = 0;
        }
        m_count++;
        if(m_count == m_threadCount){
            m_count = 0;
            m_release = 1;
            m_cv.notify_all();
        }
        else{
            m_cv.wait(lk, [&]{return m_release == 1;});
        } 
    }
​
private:
    mutex m_lock;
    condition_variable m_cv;
    unsigned int m_threadCount;
    unsigned int m_count; 
    unsigned int m_release;
};

这里多亏了c++标准库中引进的condition_variable,使得上面的概念能够简单高效而又放心的实现,你也不须要操心什么线程局部量。而关于c++并发相关的种种知识可能须要专门的若干篇幅才能说清楚,若是你并不熟悉c++,能够跳过这些不知所云的部分。验证上述代码可使用以下代码:

unsigned int threadWaiting = 5;
TestBarrier barrier(5);
​
void func1(){
    this_thread::sleep_for(chrono::seconds(3));
    cout<<"func1"<<endl;
    barrier.wait1();
    cout<<"func1 has awakended!"<<endl;
}
​
void func2(){
    cout<<"func2"<<endl;
    barrier.wait1();
    cout<<"func2 has awakended!"<<endl;
}
​
void func3(){
    this_thread::sleep_for(chrono::seconds(1));
    cout<<"func3"<<endl;
    barrier.wait1();
    cout<<"func3 has awakended!"<<endl;
}
​
int main(){
    for(int i = 0; i < 5; i++){
        thread t1(func1);
        thread t2(func3);
        thread t3(func2);
        thread t4(func3);
        thread t5(func2);
        t1.join();
        t2.join();
        t3.join();
        t4.join();
        t5.join();
    }
}

好了,在我机器上的运行结果是这样的,因为输出没有同步,因此输出可能并无想象的那么整洁。可是不影响总体结果,能够看到,全部线程到齐以后才各自执行各自后面的代码:


这篇文章也在个人公众号同步发表,个人这个公众号嘛,佛系更新,固然,本质上是想到一个话题不容易(懒的好借口),欢迎关注哦:

相关文章
相关标签/搜索