[perl] 多线程理解

Thread:在使用多线程处理比较大的数据量的扫描,遇到读写文件可能死锁的问题。 多线程

Perl 线程的生命周期

1.使用 threads 包的 create() 方法:

use threads; 

sub say_hello { 
    printf("Hello thread! @_.\n"); 
    return( rand(10) ); 
} 

my $t1 = threads->create( \&say_hello, "param1", "param2" ); 
my $t2 = threads->create( "say_hello", "param3", "param4" ); 
my $t3 = threads->create( 
sub { 
    printf("Hello thread! @_\n"); 
    return( rand(10) ); 
}, "param5", "param6" );



2.join 和 detach 方法

线程一旦被成功建立,它就马上开始运行了,这个时候你面临两种选择,分别是 join 或者 detach 这个新建线程。固然你也能够什么都不作,不过这可不是一个好习惯、 函数

从字面上来理解,join 就是把新建立的线程结合到当前的主线程中来,把它当成是主线程的一部分,使他们合二为一。join 会触发两个动做,首先,主线程会索取新建线程执行结束之后的返回值;其次,新建线程在执行完毕并返回结果之后会自动释放它本身所占用的系统资源。例如 ui

join收割新建线程 spa

#!/usr/bin/perl 
# 
use threads; 

sub func { 
 sleep(1); 
 return(rand(10)); 
} 

my $t1 = threads->create( \&func ); 
my $t2 = threads->create( \&func ); 

printf("do something in the main thread\n"); 

my $t1_res = $t1->join(); 
my $t2_res = $t2->join(); 

printf("t1_res = $t1_res\nt2_res = $t2_res\n");

由此咱们不难发现,调用 join 的时机是一个十分有趣的问题。若是调用 join 方法太早,新建线程还没有执行完毕,天然就没法返回任何结果,那么这个时候,主线程就不得不被阻塞,直到新建线程执行完毕以后,才能得到返回值,而后资源会被释放,join 才能结束,这在很大程度上破话了线程之间的并行性。相反,若是调用 join 方法太晚,新建线程早已执行完毕,因为一直没有机会返回结果,它所占用的资源就一直没法获得释放,直到被 join 为止,这在很大程度上浪费了宝贵的系统资源。所以,join 新建线程的最好时机应该是在它刚刚执行完毕的时候,这样既不会阻塞当前线程的执行,又能够及时释放新建线程所占用的系统资源。  线程

foreach ( threads->list(threads::joinable) ){
        $_->join( );
}

咱们再来看看 detach 方法,这也许是最省心省力的处理方法了。从字面上来理解,detach 就是把新建立的线程与当前的主线程剥离开来,让它今后和主线程无关。当你使用 detach 方法的时候,代表主线程并不关心新建线程执行之后返回的结果,新建线程执行完毕后 Perl 会自动释放它所占用的资源。 scala

detach剥离线程 调试

#!/usr/bin/perl 
# 
use threads; 
use Config; 

sub say_hello { 
 my ( $name ) = @_; 
 printf("Hello World! I am $name.\n"); 
} 
my $t1 = threads->create( \&say_hello, "Alex" ); 
$t1->detach(); 
printf("doing something in main thread\n"); 
sleep(1);



一个新建线程一旦被 detach 之后,就没法再 join 了。当你使用 detach 方法剥离线程的时候,有一点须要特别注意,那就是你须要保证被建立的线程先于主线程结束,不然你建立的线程会被迫结束,除非这种结果正是你想要的,不然这也许会形成异常状况的出现,并增长程序调试的难度。

3.线程的消亡 code

大多数状况下,你但愿你建立的线程正常退出,这就意味着线程所对应的函数体在执行完毕后返回并释放资源。例如在清单 5 的示例中,新建线程被 join 之后的退出过程。但是,若是因为 detach 不当或者因为主线因某些意外的异常提早结束了,尽管它所建立的线程可能还没有执行完毕,可是他们仍是会被强制停止,正所谓皮之不存,毛将焉附。这时你也许会获得一个相似于“Perl exited with active threads”的警告。 生命周期

固然,你也能够显示地调用 exit() 方法来结束一个线程,不过值得注意的是,默认状况下,若是你在一个线程中调用了 exit() 方法, 其余线程都会随之一块儿结束,在不少状况下,这也许不是你想要的,若是你但愿 exit() 方法只在调用它的线程内生效,那么你在建立该线程的时候就须要设置’ exit ’ => ’ thread_only ’。例如 队列

为某个线程设置exit属性

#!/usr/bin/perl 
# 
use threads; 

sub say_hello { 
 printf("Hello thread! @_.\n"); 
 sleep(10); 
 printf("Bye\n"); 
} 

sub quick_exit { 
 printf("I will be exit in no time\n"); 
 exit(1); 
} 

my $t1 = threads->create( \&say_hello, "param1", "param2" ); 
my $t2 = threads->create( {'exit'=>'thread_only'}, \&quick_exit ); 

$t1->join(); 
$t2->join();



若是你但愿每一个线程的 exit 方法都只对本身有效,那么在每次建立一个新线程的时候都去要显式设置’ exit ’ => ’ thread_only ’属性显然有些麻烦,你也能够在引入 threads 包的时候设置这个属性在全局范围内有效,例如

use threads ('exit' => 'threads_only'); 

sub func { 
 ... 
 if( $condition ) { 
 exit(1); 
 } 
} 

my $t1 = threads->create( \&func ); 
my $t2 = threads->create( \&func ); 

$t1->join(); 
$t2->join();




共享与同步

threads::shared

#!/usr/bin/perl 
 # 

 use threads; 
 use threads::shared; 
 use strict; 

 my $var   :shared  = 0;       # use :share tag to define 
 my @array :shared = (); # use :share tag to define 
 my %hash = (); 
 share(%hash);                  # use share() funtion to define 


 sub start { 
 $var = 100; 

 @array[0] = 200; 
 @array[1] = 201; 

 $hash{'1'} = 301; 
 $hash{'2'} = 302; 
 } 

 sub verify { 
    sleep(1);                      # make sure thread t1 execute firstly 
    printf("var = $var\n");     # var=100 

 for(my $i = 0; $i < scalar(@array); $i++) { 
        printf("array[$i] = $array[$i]\n");    # array[0]=200; array[1]=201 
 } 

 foreach my $key ( sort( keys(%hash) ) ) { 
 printf("hash{$key} = $hash{$key}\n"); # hash{1}=301; hash{2}=302 
 } 
 } 

 my $t1 = threads->create( \&start ); 
 my $t2 = threads->create( \&verify ); 

 $t1->join(); 
 $t2->join();

死锁常是多线程程序中最隐蔽的问题,每每难以发现与调试,也增长了排查问题的难度。为了不在程序中死锁的问题,在程序中咱们应该尽可能避免同时获取多个共享变量的锁,若是没法避免,那么一是要尽可能使用相同的顺序来获取多个共享变量的锁,另外也要尽量地细化上锁的粒度,减小上锁的时间。

use threads::shared; 

 # in thread 1 
 { 
    lock( $share );        # lock for 3 seconds 
    sleep(3);               # other threads can not lock again 
 } 
 # unlock implicitly now after the block 

 # in thread 2 
 { 
    lock($share);          # will be blocked, as already locked by thread 1 
    $share++;               # after thread 1 quit from the block 
 } 
 # unlock implicitly now after the block



use threads; 
 use threads::shared; 

 { 
    lock(@share);          # the array has been locked 
    lock(%hash);           # the hash has been locked 
    sleep(3);               # other threads can not lock again 
 } 

 { 
    lock($share[1]);     # error will occur 
    lock($hash{key});    # error will occur 
 }



信号量:Thread::Semaphore

my $semaphore = Thread::Semaphore->new( $max_threads ); #信号量
my $mutex = Thread::Semaphore->new( 1 );   #互斥量

区别:

1. 互斥量用于线程的互斥,信号量用于线程的同步。

这是互斥量和信号量的根本区别,也就是互斥和同步之间的区别。

互斥:是指某一资源同时只容许一个访问者对其进行访问,具备惟一性和排它性。但互斥没法限制访问者对资源的访问顺序,即访问是无序的。

同步:是指在互斥的基础上(大多数状况),经过其它机制实现访问者对资源的有序访问。在大多数状况下,同步已经实现了互斥,特别是全部写入资源的状况一定是互斥的。少数状况是指能够容许多个访问者同时访问资源

以上区别是主要想记住的。

note:信号量能够用来实现互斥量的功能

2. 互斥量值只能为0/1,信号量值能够为非负整数。

也就是说,一个互斥量只能用于一个资源的互斥访问,它不能实现多个资源的多线程互斥问题。信号量能够实现多个同类资源的多线程互斥和同步。当信号量为单值信号量是,也能够完成一个资源的互斥访问。

3. 互斥量的加锁和解锁必须由同一线程分别对应使用,信号量能够由一个线程释放,另外一个线程获得。

use threads; 
use threads::shared; 
use Thread::Semaphore; 

my $s = Thread::Semaphore->new(); 
$s->down();                # P operation 
... 
$s->up();                  # V operation



Thread::Queue:生产者-消费者模型对多线程队列的使用。

生产者能够不断地在线程队列上作 enqueue 操做,而消费者只须要不断地在线程队列上作 dequeue 操做,这就很简单地实现了生产者和消费者之间同步的问题。

#!/usr/bin/perl 
 # 

 use threads; 
 use Thread::Queue; 

 my $q = Thread::Queue->new(); 

 sub produce { 
    my $name = shift; 
    while(1) { 
        my $r = int(rand(100)); 
        $q->enqueue($r); 
        printf("$name produce $r\n"); 
        sleep(int(rand(3))); 
    } 
 } 

 sub consume { 
    my $name = shift; 
    while(my $r = $q->dequeue()) { 
        printf("consume $r\n"); 
    } 
 } 

 my $producer1 = threads->create(\&produce, "producer1"); 
 my $producer2 = threads->create(\&produce, "producer2"); 
 my $consumer1 = threads->create(\&consume, "consumer2"); 

 $producer1->join(); 
 $producer2->join(); 
 $consumer1->join();
相关文章
相关标签/搜索