Perl多线程(2):数据共享和线程安全

线程数据共享

在介绍Perl解释器线程的时候一直强调,Perl解释器线程在被建立出来的时候,将从父线程中拷贝数据到子线程中,使得数据是线程私有的,而且数据是线程隔离的。若是真的想要在线程间共享数据,须要显式使用threads::shared模块来扩展threads模块的功能。这个模块必须在先导入了threads模块的状况下使用,不然threads::shared模块里的功能都将没效果。html

use threads;
use threads::shared;

要共享数据,只需使用threads::shared模块的share方法便可,也能够直接将数据标记上:shared属性的方式来共享。例如:python

my $answer = 43;
my @arr = qw(1 2 3);
share($answer);
share(@arr);

my $answer :shared = 43;
my @arr :shared = qw(1 2 3);
my %hash :shared = (one=>1, two=>2, three=>3);

使用share()和:shared的区别在于后面这种标记的方式是在编译期间完成的。另外,使用:shared属性标记的方式能够直接共享引用类型的数据,可是share()不容许,它使用prototype限制了只容许接收变量类型的参数,可使用&share()调用子程序的方式禁用prototype:shell

my $aref = &share([1 2 3]);

例如:数组

#!/usr/bin/perl
use strict;
use warnings;
use 5.010;

use threads;
use threads::shared;

my $foo :shared = 1;
my $bar = 1;
threads->create(
    sub {
        $foo++;
        $bar++;
        say "new thread: \$foo=$foo";   # 2
        say "new thread: \$bar=$bar";   # 2 
    }
)->join();

say "main thread: \$foo=$foo";   # 2
say "main thread: \$bar=$bar";   # 1

什么数据会共享

并不是全部数据均可以共享,只有普通变量、数组、hash以及已共享数据的引用能够共享。也就是:安全

Ordinary scalars
Array refs
Hash refs
Scalar refs
Objects based on the above

例如:数据结构

use threads;
use threads::shared;

my $var = 1;     #  未共享数据
my $svar :shared = 2;    # 共享标量
my @arr :shared = qw(perl python shell);   # 共享数组
my %hash :shared;       # 共享hash

my $thr = threads->new(\&mysub);

sub mysub {
    $hash{a} = 1;       # 成功
    $hash{b} = $var;    # 成功:$var是普通变量
    $hash{c} = \$svar;  # 成功:\$svar是已共享标量
    $hash{d} = @arr;    # 成功:普通数组
    $hash{e} = \@arr;   # 成功:已共享数组的引用

    # $hash{f} = \$var; # 失败并die:$var未共享标量的引用
    # $hash{g} = [];    # 失败:未共享数组的引用
    # $hash{h} = {a=>1};# 失败:未共享hash的引用
}

$thr->join();     # join后文解释
while( my ($key, $value) = each %hash ){
    say "$key => $value";
}

若是共享hash或array类型,那么里面的全部元素都对外可见,但并不意味着里面的元素是共享的。共享hash/array和共享它们里面的元素是独立的,共享hash/array只意味着共享它们自身,但里面的元素会暴露。反之,能够直接共享某个元素,但hash/array自身不共享。(经测试,数组的元素没法共享,hash的元素可正常共享)多线程

数据共享的问题:竞态

当多个线程在某一时刻都访问或修改同一个共享数据时,就会出现竞态问题(race condition),它意味着多线程的数据竞争问题。app

例如:测试

use threads;
use threads::shared;

my $x :shared = 1;
my $thr1 = threads->new(\&sub1);
my $thr2 = threads->new(\&sub2);

$thr1->join();
$thr2->join();
print "$x\n";

sub sub1 { my $foo = $x; $x = $foo + 1; }
sub sub1 { my $bar = $x; $x = $bar + 1; }

执行上面的程序,结果可能会输出2或3,由于两个线程可能都取得x=1的值,也可能后一个线程取得前一个线程加法以后的值。prototype

之因此会发生竞态问题,是由于对多个线程对同个数据的访问和修改时间点没法保证,这个时候数据是线程不安全的,也可称之为线程数据不一样步。

因此,要解决数据竞态问题,必须对共享数据的步骤进行协调,好比修改数据时必须保证只能有一个线程去修改,这能够经过锁的方式来实现。

变量锁

threads::shared模块中提供了一个lock()方法,用来将共享数据进行独占锁定,被锁定的数据没法被其它线程修改,直到释放锁其它线程才能够获取锁并修改数据。

例如:

use threads;
use threads::shared;

my $var :shared = 3;

sub mysub {
    ...
    lock($var);
    ...
}  # 锁在这里自动被释放

没有unlock()这样直接释放锁的方法,而是在退出当前做用域的时候自动释放锁,就像词法变量同样。

另外,锁住hash和数组的时候,仅仅只是锁住它们自身,但lock()没法去锁hash/array中的元素。因此:

lock $myhash{'abc'};    # Error
lock %myhash;           # Correct

若是真想基于容器中元素进行锁定,可使用线程信号量模块Thread::Semaphore,后文会介绍。

另外,lock()是能够递归的,在退出最外层lock()的做用域时释放锁。且递归时重复锁定同一个变量是幂等的。例如:

my $x :shared;
doit();
sub doit {
    {
        {
            lock($x); # Wait for lock
            lock($x); # 没任何做用,由于已经锁过一次了
            {
                lock($x); # 没任何做用,由于已经锁过一次了
                {
                    lock($x); # 没任何做用,由于已经锁过一次了
                    lockit_some_more();
                }
            }
        } # *** Implicit unlock here ***
    }
}
sub lockit_some_more {
    lock($x); # 没任何做用,由于已经锁过一次了
} # Nothing happens here

死锁问题

使用锁来协调共享数据的步骤能解决竞态问题,可是若是协调很差,很容易出现死锁问题。死锁是指两个或更多进程/线程互相等待锁的释放,致使每个进程/线程都没法释放,从而出现无限等待的死循环问题。

例如:

use threads;
use threads::shared;

my $x :shared = 4;
my $y :shared = 'foo';

my $thr1 = threads->create(
    sub {
        lock($x);
        sleep 3;
        lock($y);
    }
);

my $thr2 = threads->create(
    sub {
        lock($y);
        sleep 3;
        lock($x);
    }
);

sleep 10;

上面的例子只要运行,两个线程将会出现死锁问题,由于thr1线程锁住$x、thr2锁住$y后,thr1申请$y的锁将等待thr2先释放,同理thr2申请$x的锁将等待thr1先释放。因而出现了互相等待的僵局,谁也不会也没法释放。

解决死锁最简单且最佳的方式是保证全部线程以相同的顺序去锁住每个数据。例如,全部线程都以先锁住$x,再锁住$y,最后锁住$z的方式去执行代码。

另外一个避免死锁的解决方案是尽量让锁住共享数据的时间段变短,这样出现僵局的概率就会小不少。

可是这两种方式不少时候都派不上用场,由于须要用到锁的状况可能会比较复杂。下面介绍几种方式。

线程队列(Thread::Queue)

(Thread::Queue)队列数据结构(FIFO)是线程安全的,它保证了某些线程从一端写入数据,另外一些线程从另外一端读取数据。只要队列已经满了,写入操做就自动被阻塞直到有空间支持写操做,只要队列空了,读取操做就会自动阻塞直到队列中有数据可读。这种模式自身就保证了线程安全性。

在Perl中要使用线程队列,须要使用Thread::Queue模块,使用方式很简单。以下示例:

#!/usr/bin/perl
use strict;
use warnings;

use threads;
use Thread::Queue;

# 建立一个线程队列
my $DataQueue = Thread::Queue->new();

# 建立线程
my $thr = threads->new(
    sub {
        # 在循环中读取队列
        while (my $DataElement = $DataQueue->dequeue()) {
            print "Poped $DataElement off the queue\n";
        }
    }
);

# 向队列中写入一个数据
$DataQueue->enqueue(12);
sleep 1;

# 再次写入队列3个数据
$DataQueue->enqueue('a','b','c');
sleep 3;

# 关闭队列,让读取端再也不阻塞
$DataQueue->enqueue(undef);

# 等待子线程并为其收尸
$thr->join();

关于Thread::Queue模块的用法,参见:https://www.cnblogs.com/f-ck-need-u/p/10422293.html

Thread::Semaphore

Thread::Semaphore实现了线程信号量,能够经过up()和down()来操做信号量,up()表示增长信号量的值,down()表示减信号量的值,按照锁的角度来看这是申请锁的操做,只要减法操做后信号量的值为负数,此次减法操做就会被阻塞,就像被锁住。

经过Thread::Semaphore的new()方法来建立一个信号量,若是不给任何参数,则默认建立一个信号量值为1的信号量。若是给new()一个整数值N,则表示建立一个信号量值为N的信号量。

使用信号量实现锁机制的示例:

#!/usr/bin/perl

use 5.010;
use threads;
use Thread::Semaphore;

# 新建一个信号量
my $sem = Thread::Semaphore->new();

# 全局共享变量
my $gbvar :shared = 0;

my $thr1 = threads->create(\&mysub, 1);
my $thr2 = threads->create(\&mysub, 2);
my $thr3 = threads->create(\&mysub, 3);

# 每一个线程给全局共享变量依次加10
sub mysub {
    my $thr_id = shift;
    my $try_left = 10;
    my $local_value;
    sleep 1;
    while($try_left--){
        # 至关于获取锁
        $sem->down();

        $local_value = $gbvar;
        say "$try_left tries left for sub $thr_id "."(\$gbvar is $gbvar)";
        sleep 1;
        $local_value++;
        $gbvar = $local_value;
        
        # 至关于释放锁
        $sem->up();
    }
}

$thr1->join();
$thr2->join();
$thr3->join();

因为信号量能够锁住任何片断的代码,因此它的锁机制很是灵活。

实际上,up()和down()每次操做默认都只增、减1个信号量的值,但能够给它们传递参数来一次性请求加N、减N个信号量值,对于减法操做,若是请求减N致使信号量的值为负数,则该减法操做被阻塞,直到有足够的信号量完成此次减法。这时的信号量就像是一个计数器同样。

例如:

use threads;
use Thread::Semaphore;

# 建立一个信号量值为5的信号量
my $sem = Thread::Semaphore->new(5);

my $thr1 = threads->new(\&sub1);
my $thr2 = threads->new(\&sub1);

sub sub1 {
    # 申请锁
    $sem->down(5);  # 一次减5
    ... do something here ...
    $sem->up(5);  # 一次加5
}

$thr1->join();
$thr2->join();
相关文章
相关标签/搜索