【操做系统—并发】锁

概念

经过对并发的介绍,咱们看到了并发编程的一个最基本问题:因为单处理器上的中断(或者多个线程在多处理器上并发执行),一些咱们但愿能原子执行的指令并不能正确运行。锁(lock)就是用来解决这一问题最基本的方法。程序员在源代码中加锁,放在临界区周围,保证临界区可以像单条原子指令同样执行。node

锁的基本思想

下面是使用锁的一个简单示例:程序员

1    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
2
3    Pthread_mutex_lock(&lock);    // wrapper for pthread_mutex_lock()
4    balance = balance + 1;
5    Pthread_mutex_unlock(&lock);

锁就是一个变量,这个锁变量保存了锁在某一时刻的状态。它要么是可用的(available,或unlocked,或free),表示没有线程持有锁;要么是被占用的(acquired,或locked,或held),表示有一个线程持有锁,正处于临界区。咱们也能够保存其余的信息,好比持有锁的线程,或请求获取锁的线程队列,但这些信息会隐藏起来,锁的使用者不会发现。编程

锁通常只支持两个操做:lock()和unlock()。调用lock()尝试获取锁,若是没有其余线程持有锁,该线程会得到锁,进入临界区,这个线程被称为锁的持有者(owner)。若是另一个线程对相同的锁变量调用lock(),由于锁被另外一线程持有,该调用不会返回。这样,当持有锁的线程在临界区时,其余线程就没法进入临界区。数据结构

锁的持有者一旦调用unlock(),锁就变成可用了。若是没有其余等待线程(即没有其余线程调用过lock()并卡在那里),锁的状态就变成可用了。若是有等待线程,其中一个会(最终)注意到(或收到通知)锁状态的变化,获取该锁,进入临界区。多线程

如何实现锁

显然,咱们须要硬件和操做系统的帮助来实现一个可用的锁。近些年来,各类计算机体系结构的指令集都增长了一些不一样的硬件原语,咱们可使用它们来实现像锁这样的互斥原语。并发

在实现锁以前,对于锁是否能工做良好,应该事先设立一些标准。首先是锁是否能完成它的基本任务,即提供互斥(mutual exclusion),是否可以阻止多个线程进入临界区。app

第二是公平性(fairness)。当锁可用时,是否每个竞争线程有公平的机会抢到锁?是否有竞争锁的线程会饿死(starve),一直没法得到锁?函数

最后是性能(performance),也即便用锁以后增长的时间开销。有几种场景须要考虑:一种是只有一个线程抢锁、释放锁的开销如何?另一种是一个CPU上多个线程竞争,最后一种是多个CPU、多个线程竞争时的性能。高并发

切入点一:控制中断

最先提供的互斥解决方案之一,就是在临界区关闭中断。这个解决方案是为单处理器系统开发的。经过在进入临界区以前关闭中断(使用特殊的硬件指令),能够保证临界区的代码不会被中断,从而原子地执行。结束以后,咱们从新打开中断,程序正常运行。性能

这个方法的主要优势就是简单,可是缺点不少。首先,这种方法要求咱们容许全部调用线程执行特权操做(打开关闭中断),可是恶意程序可能会利用这点。例如一个恶意程序可能在它开始时就调用lock(),从而独占处理器。系统没法从新得到控制,只能重启系统。

第二,这种方案不支持多处理器。若是多个线程运行在不一样的CPU上,每一个线程都试图进入同一个临界区,关闭中断也没有做用。

第三,关闭中断致使中断丢失,可能会致使严重的系统问题。假如磁盘设备完成了读取请求,但CPU由于关闭了中断错失了这一信号,操做系统如何知道去唤醒等待的进程?

最后一个不过重要的缘由就是效率低。与正常指令执行相比,现代CPU对于关闭和打开中断的代码执行得较慢。

基于以上缘由,只在颇有限的状况下用关闭中断来实现互斥原语。

切入点二:测试并设置指令

由于关闭中断的方法没法工做在多处理器上,因此系统设计者开始让硬件支持锁。最简单的硬件支持是测试并设置指令(test-and-set instruction),也叫做原子交换(atomic exchange)。测试并设置指令的工做大体能够用下面的C代码来定义:

1    int TestAndSet(int *old_ptr, int new) {
2        int old = *old_ptr; // fetch old value at old_ptr
3        *old_ptr = new;    // store 'new' into old_ptr
4        return old;        // return the old value
5    }

它返回old_ptr指向的旧值,同时更新为new的新值。固然,关键是这些代码是原子地(atomically)执行。由于既能够测试旧值,又能够设置新值,因此咱们把这条指令叫做“测试并设置”。

为了理解该指令如何构造一个可用的锁,咱们首先尝试实现一个不依赖它的锁。

失败的尝试

在第一次尝试中,想法很简单:用一个变量来标志锁是否被某些线程占用。第一个线程进入临界区,调用lock(),检查标志是否为1,而后设置标志为1,代表线程持有该锁。结束临界区时,线程调用unlock(),清除标志,表示锁未被持有。

当第一个线程正处于临界区时,若是另外一个线程调用lock(),它会在while循环中自旋等待(spin-wait),直到第一个线程调用unlock()清空标志。而后等待的线程会退出while循环,设置标志,执行临界区代码。

1    typedef struct  lock_t { int flag; } lock_t;
2
3    void init(lock_t *mutex) {
4        // 0 -> lock is available, 1 -> held
5        mutex->flag = 0;
6    }
7
8    void lock(lock_t *mutex) {
9        while (mutex->flag == 1) // TEST the flag
10           ; // spin-wait (do nothing)
11       mutex->flag = 1;         // now SET it!
12   }
13
14   void unlock(lock_t *mutex) {
15       mutex->flag = 0;
16   }

遗憾的是,这段代码并不能正确工做。假设代码按照下表执行,开始时flag=0。

image.png

从这种交替执行能够看出,经过适时的中断,咱们很容易构造出两个线程都将标志设置为1,都能进入临界区的场景。

使用测试并设置指令改进

使用测试并设置指令改进后的代码以下:

1    typedef struct  lock_t {
2        int flag;
3    } lock_t;
4
5    void init(lock_t *lock) {
6        // 0 indicates that lock is available, 1 that it is held
7        lock->flag = 0;
8    }
9 
10   void lock(lock_t *lock) {
11       while (TestAndSet(&lock->flag, 1) == 1)
12           ; // spin-wait (do nothing)
13   }
14 
15   void unlock(lock_t *lock) {
16       lock->flag = 0;
17   }

咱们理解一下这个锁的工做原理。首先假设一个线程在运行,调用lock(),没有其余线程持有锁,因此flag是0。当调用TestAndSet(flag, 1)方法,返回0,线程会跳出while循环,获取锁。同时也会原子地设置flag为1,标志锁已经被持有。当线程离开临界区,调用unlock()将flag清理为0。

当某一个线程已经持有锁时。工做线程调用lock(),而后调用TestAndSet(flag, 1),这一次返回1。只要另外一个线程一直持有锁,TestAndSet()会重复返回1,本线程会一直自旋。当flag终于被改成0,本线程会调用TestAndSet(),返回0而且原子地设置为1,从而得到锁,进入临界区。

这种锁被称为自旋锁(spin lock)。这是最简单的一种锁,一直自旋,利用CPU周期,直到锁可用。在单处理器上,须要抢占式的调度器。不然,自旋锁在单CPU上没法使用,由于一个自旋的线程永远不会放弃CPU。

评价自旋锁

咱们按照以前的标准来评价一下咱们实现的自旋锁。首先是正确性:自旋锁一次只容许一个线程进入临界区,所以能够正确运行。

下一个标准是公平性:答案是自旋锁不提供任何公平性保证。实际上,自旋的线程在竞争条件下可能会永远自旋。自旋锁没有公平性,可能会致使饿死。

最后一个标准是性能。对于自旋锁,在单CPU的状况下,性能开销至关大。假设一个线程持有锁进入临界区时被抢占。调度器可能会运行其余每个线程(假设有N−1个这种线程)。而其余线程都在竞争锁,都会在放弃CPU以前,自旋一个时间片,浪费CPU周期。

可是,在多CPU上,自旋锁性能不错(若是线程数大体等于CPU数)。假设线程A在CPU 1,线程B在CPU 2竞争同一个锁。线程A占有锁时,线程B竞争锁就会自旋。然而,临界区通常都很短,所以很快锁就可用,而后线程B得到锁。自旋等待其余处理器上的锁,并无浪费不少CPU周期,所以效果不错。

切入点三:比较并交换指令

某些系统提供了另外一个硬件原语,即比较并交换指令(compare-and-swap)。下图是这条指令的C语言伪代码。

1    int CompareAndSwap(int *ptr, int expected, int new) {
2        int actual = *ptr;
3        if (actual == expected)
4            *ptr = new;
5        return actual;
6    }

比较并交换的基本思路是检测ptr指向的值是否和expected相等;若是是,更新ptr所指的值为新值。不然,什么也不作。不论哪一种状况,都返回该内存地址的实际值,让调用者可以知道执行是否成功。

有了比较并交换指令,就能够实现一个锁,相似于用测试并设置指令那样。例如,咱们只要用下面的代码替换上面例子中的lock()函数:

1    void lock(lock_t *lock) {
2        while (CompareAndSwap(&lock->flag, 0, 1) == 1)
3            ; // spin
4    }

它的行为以及对其的评价等价于上面分析的自旋锁。

切入点四:获取并增长指令

最后一个硬件原语是获取并增长(fetch-and-add)指令,它能原子地返回特定地址的旧值,而且让该值自增一。获取并增长的C语言伪代码以下:

1    int FetchAndAdd(int *ptr) {
2        int old = *ptr;
3        *ptr = old + 1;
4        return old;
5    }

咱们可使用获取并增长指令,实现一个更有趣的ticket锁:

1    typedef struct  lock_t {
2        int ticket;
3        int turn;
4    } lock_t;
5
6    void init(lock_t *lock) {
7        lock->ticket = 0;
8        lock->turn   = 0;
9    }
10
11   void lock(lock_t *lock) {
12       int myturn = FetchAndAdd(&lock->ticket);
13       while (lock->turn != myturn)
14           ; // spin
15   }
16
17   void unlock(lock_t *lock) {
18       FetchAndAdd(&lock->turn);
19   }

这里不是用一个值,而是使用了ticket和turn变量来构建锁。基本操做也很简单:若是线程但愿获取锁,首先对一个ticket值执行一个原子的获取并相加指令。这个值做为该线程的“turn”(顺位,即myturn)。根据全局共享的lock->turn变量,当某一个线程的(myturn == turn)时,则轮到这个线程进入临界区。unlock则是增长turn,从而下一个等待线程能够进入临界区。

不一样于以前的方法:本方法可以保证全部线程都能抢到锁。只要一个线程得到了ticket值,它最终会被调度。好比基于测试并设置的方法,一个线程有可能一直自旋,即便其余线程在获取和释放锁。

如何避免过多自旋

基于硬件的锁简单并且有效,可是在某些场景下,这些解决方案会效率低下。以两个线程运行在单处理器上为例,当一个线程(线程1)持有锁时,被中断。第二个线程(线程2)去获取锁,发现锁已经被持有。所以,它就一直自旋。最后,时钟中断产生,线程1从新运行,它释放锁。最后,线程2不须要继续自旋了,它获取了锁。

相似的场景下,一个线程会一直自旋检查一个不会改变的值,浪费掉整个时间片。若是有N个线程去竞争一个锁,状况会更糟糕。一样的场景下,会浪费N−1个时间片,只是自旋并等待一个线程释放该锁。所以,咱们的下一个关键问题是:怎样避免没必要要的自旋,浪费CPU时间?

简单方法:让出时间片

第一种简单的方法就是,在要自旋的时候,放弃CPU。下图展现了这种方法。

1    void init() {
2        flag = 0;
3    }
4
5    void lock() {
6        while (TestAndSet(&flag, 1) == 1)
7            yield(); // give up the CPU
8    }
9
10   void unlock() {
11       flag = 0;
12   }

在这种方法中,咱们假定操做系统提供原语yield(),线程能够调用它主动放弃CPU,让其余线程运行。yield()系统调用可以让线程由运行(running)态变为就绪(ready)态,从而容许其余线程运行。所以,让出线程本质上取消调度(deschedules)了它本身。

考虑在单CPU上运行两个线程,基于yield的方法十分有效。一个线程调用lock(),发现锁被占用时,让出CPU,另一个线程运行,完成临界区。在这个简单的例子中,让出方法工做得很是好。

如今来考虑许多线程(例如100个)反复竞争一把锁的状况。在这种状况下,一个线程持有锁,在释放锁以前被抢占。其余99个线程分别调用lock(),发现锁被抢占,而后让出CPU。假定采用某种轮转调度程序,这99个线程会一直处于运行—让出这种模式,直到持有锁的线程再次运行。虽然比原来的浪费99个时间片的自旋方案要好,但这种方法仍然成本很高,上下文切换的成本是实实在在的,所以浪费很大。

更糟的是,咱们尚未考虑饥饿的问题。一个线程可能一直处于让出的循环,而其余线程反复进出临界区。很显然,咱们须要一种方法来解决这个问题。

使用队列:休眠替代自旋

前面一些方法的真正问题是存在太多的偶然性:调度程序决定如何调度线程。若是调度不合理,线程或者一直自旋,或者马上让出CPU。不管哪一种方法,均可能形成浪费,也不能防止饥饿。

所以,咱们必须显式地施加某种控制,决定锁释放时,谁能抢到锁。为了作到这一点,咱们须要操做系统的更多支持,并须要一个队列来保存等待锁的线程。

简单起见,咱们利用Solaris提供的支持,它提供了两个调用:park()可以让调用线程休眠,unpark(threadID)则会唤醒threadID标识的线程。能够用这两个调用来实现锁,让调用者在获取不到锁时睡眠,在锁可用时被唤醒。

1    typedef struct  lock_t {
2        int flag;
3        int guard;
4        queue_t *q;
5    } lock_t;
6
7    void lock_init(lock_t *m) {
8        m->flag = 0;
9        m->guard = 0;
10       queue_init(m->q);
11   }
12
13   void lock(lock_t *m) {
14       while (TestAndSet(&m->guard, 1) == 1)
15           ; //acquire guard lock by spinning
16       if (m->flag == 0) {
17           m->flag = 1; // lock is acquired
18           m->guard = 0;
19       } else {
20           queue_add(m->q, gettid());
21           m->guard = 0;
22           park();
23       }
24   }
25
26   void unlock(lock_t *m) {
27       while (TestAndSet(&m->guard, 1) == 1)
28           ; //acquire guard lock by spinning
29       if (queue_empty(m->q))
30           m->flag = 0; // let go of lock; no one wants it
31       else
32           unpark(queue_remove(m->q)); // hold lock (for next thread!)
33       m->guard = 0;
34   }

在这个例子中,咱们作了两件事。首先,咱们将以前的测试并设置和等待队列结合,实现了一个更高性能的锁。其次,咱们经过队列来控制谁会得到锁,避免饿死。

你可能注意到,guard基本上起到了自旋锁的做用,围绕着flag和队列操做。所以,这个方法并无彻底避免自旋等待。线程在获取锁或者释放锁时可能被中断,从而致使其余线程自旋等待。可是,这个自旋等待时间是颇有限的(不是用户定义的临界区,只是在lock和unlock代码中的几个指令)。

当要唤醒另外一个线程时,flag并无设置为0。为何呢?由于当线程被唤醒时,就像是从park()调用返回。此时它没有持有guard,因此也不能将flag设置为1。所以,咱们就直接把锁从释放的线程传递给下一个得到锁的线程,期间flag没必要设置为0。

不过,代码中仍是存在一点瑕疵。假设一个线程将要调用park休眠,可是不凑巧,系统切换到了正在持有锁的线程。若是该线程随后释放了锁,前面的线程调用park后可能会永远休眠下去。为了不这种状况,咱们须要额外的工做。

Solaris经过增长了第三个系统调用setpark()来解决这一问题。经过setpark(),一个线程代表本身立刻要调用park。若是恰好另外一个线程被调度,而且调用了unpark,那么后续的park调用就会直接返回,而不是一直睡眠。所以,示例代码中获得lock()调用能够作一点小修改:

1    queue_add(m->q, gettid());
2    setpark(); // new code
3    m->guard = 0;

另外一种方案就是将guard传入内核。在这种状况下,内核可以采起预防措施,保证原子地释放锁,把运行线程移出队列。

两阶段锁

两阶段锁(two-phase lock)是一种古老的锁方案,多年来不断被采用。两阶段锁意识到自旋可能颇有用,尤为是在很快就要释放锁的场景。所以,两阶段锁的第一阶段会先自旋一段时间,但愿它能够获取锁。

可是,若是第一个自旋阶段没有得到锁,第二阶段调用者会睡眠,直到锁可用。常见的方式是在循环中自旋固定的次数,而后睡眠。

基于锁的并发数据结构

咱们来讨论一下如何在常见数据结构中使用锁。咱们的挑战是:对于特定数据结构,如何加锁才能让该结构功能正确?以及如何可以保证高性能?

并发计数器

简易版本

实现一个简单的并发计数器很简单,代码以下:

1    typedef struct  counter_t {
2        int            value;
3        pthread_mutex_t lock;
4    } counter_t;
5
6    void init(counter_t *c) {
7        c->value = 0;
8        Pthread_mutex_init(&c->lock,  NULL);
9    }
10
11   void increment(counter_t *c) {
12       Pthread_mutex_lock(&c->lock);
13       c->value++;
14       Pthread_mutex_unlock(&c->lock);
15   }
16
17   void decrement(counter_t *c) {
18       Pthread_mutex_lock(&c->lock);
19       c->value--;
20       Pthread_mutex_unlock(&c->lock);
21   }
22
23   int get(counter_t *c) {
24       Pthread_mutex_lock(&c->lock);
25       int rc = c->value;
26       Pthread_mutex_unlock(&c->lock);
27       return rc;
28   }

这个并发计数器遵循了最简单、最基本的并发数据结构中常见的数据模式:它只是加了一把锁,在调用函数操做该数据结构时获取锁,从调用返回时释放锁

如今让咱们来考察一下它的性能。若是简单的方案就能工做,同时运行速度没有大幅降低。就不须要精巧的设计。

咱们运行了一个基准测试,每一个线程更新同一个共享计数器固定次数,而后咱们改变线程数。下图给出了运行1个线程到4个线程的总耗时,其中每一个线程更新100万次计数器。经过增长CPU,咱们但愿单位时间可以完成更多的任务。从上方的曲线能够看出,同步的计数器扩展性很差。单线程完成100万次更新只须要很短的时间(大约0.03s),而两个线程并发执行,性能降低不少(超过5s!)。线程更多时,性能更差。

image.png

可扩展版本

咱们将介绍一种方法,称为懒惰计数器(sloppy counter)。

懒惰计数器经过多个局部计数器和一个全局计数器来实现一个逻辑计数器,其中每一个CPU核心有一个局部计数器。具体来讲,在4个CPU的机器上,有4个局部计数器和1个全局计数器。除了这些计数器,还有锁:每一个局部计数器有一个锁,全局计数器有一个。

懒惰计数器的基本思想是这样的:若是一个核心上的线程想增长计数器,那就增长它的局部计数器,访问这个局部计数器是经过对应的局部锁同步的。由于每一个CPU有本身的局部计数器,不一样CPU上的线程不会竞争,因此计数器的更新操做可扩展性好。可是,为了保持全局计数器更新,局部值会按期转移给全局计数器,方法是获取全局锁,让全局计数器加上局部计数器的值,而后将局部计数器置零。

局部转全局的频度,取决于一个阈值,这里称为S(sloppiness)。S越小,懒惰计数器则越趋近于上面的同步计数器。S越大,扩展性越强,可是全局计数器与实际计数的误差越大。

上面的基准测试效果图中下方的线,是阈值S为1024时懒惰计数器的性能,4个处理器更新400万次的时间和一个处理器更新100万次的几乎同样。下图展现了随着阈值S的变化,懒惰计数器的性能曲线。懒惰计数器就是在准确性和性能之间折中。

image.png

下面是懒惰计数器的基本实现:

1    typedef struct  counter_t {
2        int             global;            // global count
3        pthread_mutex_t glock;             // global lock
4        int             local[NUMCPUS];    // local count (per cpu)
5        pthread_mutex_t llock[NUMCPUS];    // ... and locks
6        int             threshold;         // update frequency
7    } counter_t;
8
9    // init: record threshold, init locks, init values
10   //       of all local counts and global count
11   void init(counter_t *c, int threshold) {
12       c->threshold = threshold;
13
14       c->global = 0;
15       pthread_mutex_init(&c->glock,  NULL);
16
17       int i;
18       for (i = 0; i < NUMCPUS; i++) {
19           c->local[i] = 0;
20           pthread_mutex_init(&c->llock[i],  NULL);
21       }
22   }
23
24   // update: usually, just grab local lock and update local amount
25   //        once local count has risen by 'threshold', grab global
26   //        lock and transfer local values to it
27   void update(counter_t *c, int threadID, int amt) {
28       pthread_mutex_lock(&c->llock[threadID]);
29       c->local[threadID] += amt;               // assumes amt > 0
30       if (c->local[threadID] >= c->threshold) { // transfer to global
31           pthread_mutex_lock(&c->glock);
32           c->global += c->local[threadID];
33           pthread_mutex_unlock(&c->glock);
34           c->local[threadID] = 0;
35       }
36       pthread_mutex_unlock(&c->llock[threadID]);
37   }
38
39   // get: just return global amount (which may not be perfect)
40   int get(counter_t *c) {
41       pthread_mutex_lock(&c->glock);
42       int val = c->global;
43       pthread_mutex_unlock(&c->glock);
44       return val; // only approximate!
45   }

并发链表

接下来看一个更复杂的数据结构——链表,简单起见,咱们只关注链表的插入操做。

简易版本

下面展现了基本的实现代码:

1    // basic node structure
2    typedef struct  node_t {
3        int                key;
4        struct  node_t        *next;
5    } node_t;
6
7    // basic list structure (one used per list)
8    typedef struct  list_t {
9        node_t                *head;
10       pthread_mutex_t    lock;
11   } list_t;
12
13   void List_Init(list_t *L) {
14       L->head = NULL;
15       pthread_mutex_init(&L->lock,  NULL);
16   }
17
18   int List_Insert(list_t *L, int key) {
19       pthread_mutex_lock(&L->lock);
20       node_t *new = malloc(sizeof(node_t));
21       if (new == NULL) {
22           perror("malloc");
23           pthread_mutex_unlock(&L->lock);
24           return -1; // fail
25       }
26       new->key = key;
27       new->next = L->head;
28       L->head = new;
29       pthread_mutex_unlock(&L->lock);
30       return 0; // success
31   }
32
33   int List_Lookup(list_t *L, int key) {
34       pthread_mutex_lock(&L->lock);
35       node_t *curr = L->head;
36       while (curr) {
37           if (curr->key == key) {
38               pthread_mutex_unlock(&L->lock);
39               return 0; // success
40           }
41           curr = curr->next;
42       }
43       pthread_mutex_unlock(&L->lock);
44       return -1; // failure
45   }
如何扩展

尽管咱们有了基本的并发链表,但又遇到了这个链表扩展性很差的问题。研究人员发现的增长链表并发度的技术中,有一种叫做过手锁(hand-over-hand locking,也叫做锁耦合,lock coupling)。

原理也很简单:每一个节点都有一个锁,替代以前整个链表一个锁。遍历链表的时候,首先抢占下一个节点的锁,而后释放当前节点的锁。

从概念上说,过手锁链表有点道理,它增长了链表操做的并发程度。可是实际上,在遍历的时候,每一个节点获取锁、释放锁的开销巨大,很难比单锁的方法快。即便有大量的线程和很大的链表,这种并发的方案也不必定会比单锁的方案快。也许某种杂合的方案(必定数量的节点用一个锁)值得去尝试。

若是方案带来了大量的开销,那么高并发就没有什么意义。若是简单的方案不多用到高开销的调用,一般会颇有效,增长更多的锁和复杂性可能会拔苗助长。

对于上面的示例代码,还有一个通用建议:注意控制流的变化或其余错误状况致使函数返回和中止执行。由于不少函数开始就会得到锁,分配内存,或者进行其余一些改变状态的操做,若是错误发生,代码须要在返回前恢复各类状态,这容易出错。所以,最好组织好代码,减小这种模式。

并发队列

下面是一个并发队列的实现代码:

1    typedef struct  node_t {
2        int                 value;
3        struct  node_t     *next;
4    } node_t;
5
6    typedef struct  queue_t {
7        node_t            *head;
8        node_t            *tail;
9        pthread_mutex_t    headLock;
10       pthread_mutex_t    tailLock;
11   } queue_t;
12
13   void Queue_Init(queue_t *q) {
14       node_t *tmp = malloc(sizeof(node_t));
15       tmp->next = NULL;
16       q->head = q->tail = tmp;
17       pthread_mutex_init(&q->headLock,  NULL);
18       pthread_mutex_init(&q->tailLock,  NULL);
19   }
20
21   void Queue_Enqueue(queue_t *q, int value) {
22       node_t *tmp = malloc(sizeof(node_t));
23       assert(tmp != NULL);
24       tmp->value = value;
25       tmp->next = NULL;
26
27       pthread_mutex_lock(&q->tailLock);
28       q->tail->next = tmp;
29       q->tail = tmp;
30       pthread_mutex_unlock(&q->tailLock);
31   }
32
33   int Queue_Dequeue(queue_t *q, int *value) {
34       pthread_mutex_lock(&q->headLock);
35       node_t *tmp = q->head;
36       node_t *newHead = tmp->next;
37       if (newHead == NULL) {
38           pthread_mutex_unlock(&q->headLock);
39           return -1; // queue was empty
40       }
41       *value = newHead->value;
42       q->head = newHead;
43       pthread_mutex_unlock(&q->headLock);
44       free(tmp);
45       return 0;
46   }

这段代码中有两个锁,一个负责队列头,另外一个负责队列尾。这两个锁使得入队列操做和出队列操做能够并发执行,由于入队列只访问tail锁,而出队列只访问head锁。这里还有一个技巧,即添加了一个假节点(在队列初始化的代码里分配的),该假节点分开了头和尾操做。

队列在多线程程序里普遍使用。然而,这里的队列一般不能彻底知足这种程序的需求。更完善的有界队列,在队列空或者满时,能让线程等待。

并发散列表

咱们的示例是不须要调整大小的简单散列表。

1    #define BUCKETS (101)
2
3    typedef struct  hash_t {
4        list_t lists[BUCKETS];
5    } hash_t;
6
7    void Hash_Init(hash_t *H) {
8        int i;
9        for (i = 0; i < BUCKETS; i++) {
10           List_Init(&H->lists[i]);
11       }
12   }
13
14   int Hash_Insert(hash_t *H, int key) {
15       int bucket = key % BUCKETS;
16       return List_Insert(&H->lists[bucket], key);
17   }
18
19   int Hash_Lookup(hash_t *H, int key) {
20       int bucket = key % BUCKETS;
21       return List_Lookup(&H->lists[bucket], key);
22   }

本例的散列表使用咱们以前实现的简单并发链表,每一个散列桶(每一个桶都是一个链表)都有一个锁,而不是整个散列表只有一个锁,从而支持许多并发操做

建议

在实现并发数据结构时,先从最简单的方案开始,也就是加一把大锁来同步。若是发现性能问题,那么就改进方法,只要优化到知足须要便可。

许多操做系统,在最初过渡到多处理器时都是用一把大锁,包括Sun和Linux。这个方案在不少年里都颇有效,直到多CPU系统普及,内核只容许一个线程活动成为性能瓶颈。Linux采用了简单的方案,把一个锁换成多个;Sun则实现了一个最开始就能并发的新系统Solaris。

相关文章
相关标签/搜索