pthreads v3下的worker和pool的使用

有些人会想,明明用thread已经能够很好的工做了,为何还要搞个worker和pool?php

之因此要用到worker和pool仍是由于效率,由于系统建立一个新线程代价是比较昂贵,每一个建立的线程会复制当前执行的整个上下文。mysql

尽量的重用线程可让咱们的程序更高效。sql

一个简单的worker例子:数据库

<?php

//建立自定义work类,给work取个名字,方便查看
class Work extends Worker
{
    private $name;

    public function __construct($name)
    {
        $this->name = $name;
    }

    public function getName()
    {
        return $this->name;
    }
}

class Task extends Thread
{
    private $num;

    public function __construct($num)
    {
        $this->num = $num;
    }

    public function run()
    {
        //计算累加和
        $total = 0;
        for ($i = 0; $i < $this->num; $i++) {
            $total += $i;
        }
        echo "work : {$this->worker->getName()} task : {$total} \n";
        sleep(1);
    }
}

//建立一个worker线程
$work = new Work('a');

$work->start();

for ($i = 1; $i <= 10; $i++) {
    //将Task对象压栈到worker线程中
    //这个时候Task对象就可使用worker线程上下文(变量,函数等)
    $work->stack(new Task($i));
}

//循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
while ($work->collect()) ;

//关闭worker
$work->shutdown();

上面代码在运行的时候,计算结果会每隔一秒出来一条,也就是10个task对象是运行在1个worker线程上的。函数

若是10个task对象是分别在独立空间运行的,sleep()函数就不会起做用,他们各自sleep并不会影响其余线程。fetch

把上面的代码修改一下:this

<?php

//建立自定义work类,给work取个名字,方便查看
class Work extends Worker
{
    private $name;

    public function __construct($name)
    {
        $this->name = $name;
    }

    public function getName()
    {
        return $this->name;
    }
}

class Task extends Thread
{
    private $num;

    public function __construct($num)
    {
        $this->num = $num;
    }

    public function run()
    {
        //计算累加和
        $total = 0;
        for ($i = 0; $i < $this->num; $i++) {
            $total += $i;
        }
        echo "work : {$this->worker->getName()} task : {$total} \n";
        sleep(1);
    }
}

//建立二个worker线程
$work1 = new Work('a');
$work2 = new Work('b');

$work1->start();
$work2->start();

for ($i = 1; $i <= 10; $i++) {
    if ($i <= 5) {
        $work1->stack(new Task($i));
    } else {
        $work2->stack(new Task($i));
    }
}

//循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
while ($work1->collect() || $work2->collect()) ;

//关闭worker
$work1->shutdown();
$work2->shutdown();

这里咱们建立2个worker线程,让10个task对象分别压栈到2个worker中。线程

这时能够看到,计算结果是一对一对的出来,说明10个task对象跑在了2个worker线程上。对象

至于须要建立多少个worker线程,和多少个task对象,就看自已的需求了。blog

worker还有一个好处就是能够重用worker中的对象和方法。咱们能够在worker中建立一个链接数据库对象,方便各task调用。

<?php

class DB extends Worker
{
    //注意这里设置为静态成员,pdo链接自己是不能在上下文中共享的
    //声明为静态成员,让每一个worker有自已的pdo链接
    private static $db = null;
    public $msg = 'i from db';

    public function run()
    {
        self::$db = new PDO('mysql:host=192.168.33.226;port=3306;dbname=test;charset=utf8', 'root', '');
    }

    public function getDb()
    {
        return self::$db;
    }
}

class Task extends Thread
{
    private $id;
    //注意,这里不要给成员设置默认值,$result成员是线程对象是不可变的,不能被改写
    private $result;

    public function __construct($id)
    {
        $this->id = $id;
    }

    public function run()
    {
        //获取worker中的数据库链接
        $db = $this->worker->getDb();
        $ret = $db->query("select * from tb_user where id = {$this->id}");
        $this->result = $ret->fetch(PDO::FETCH_ASSOC);
        //访问worker中的成员变量msg
        echo "data : {$this->result['id']} {$this->result['name']} \t worker data : {$this->worker->msg} \n";
    }
}

//建立一个worker线程
$work = new DB();

$work->start();

for ($i = 1; $i <= 5; $i++) {
    $work->stack(new Task($i));
}

//循环的清理任务,会阻塞主线程,直到栈中任务都执行完毕
while ($work->collect()) ;

//关闭worker
$work->shutdown();

tb_user表你们能够随意建立,我这里为了演示只建立了id和name字段

运行结果以下:

 

若是说worker是对线程的重用,那么pool就是对worker更高的抽象了,能够同时管理多个worker。

<?php

//之因此要建立一个Id线程类,主要是为了给work取个不一样的ID,方便查看,哪些task线程属于哪一个work中
class Id extends Thread
{
    private $id;

    public function getId()
    {
        //防止出现id混乱,这里使用同步操做
        $this->synchronized(function () {
            ++$this->id;
        });
        return $this->id;
    }
}

class Work extends Worker
{
    private $id;

    public function __construct(Id $obj)
    {
        $this->id = $obj->getId();
    }

    public function getId()
    {
        return $this->id;
    }
}

class Task extends Thread
{
    private $num = 0;

    public function __construct($num)
    {
        $this->num = $num;
    }

    //计算累加和
    public function run()
    {
        $total = 0;
        for ($i = 0; $i < $this->num; $i++) {
            $total += $i;
        }
        echo "work id : {$this->worker->getId()} task : {$total} \n";
    }
}

//建立pool,可容纳3个work对象
$pool = new Pool(3, 'Work', [new Id()]);

//循环的把20个task线程提交到pool中的work对象上运行
for ($i = 1; $i <= 20; $i++) {
    $pool->submit(new Task($i));
}

//循环的清理任务,会阻塞主线程,直到任务都执行完毕
while ($pool->collect()) ;

//关闭pool
$pool->shutdown();

运行结果以下:

相关文章
相关标签/搜索