php多进程通讯,有名管道(pcntl学习 五)

对于多进程间的通讯,研究了一天的管道,封装了一个管道类,以及处理多进程的类。管道的理论,后期再补上,先上代码。php

<?php
/**
 * 管道封装类
 *
 * @author  mingazi@163.com
 * @link    www.helloworldcoding.com
 * @since   2017-04-18
 */
class fifoPipeClass
{
    /**
     * 管道资源
     */
    protected $handler;
    /**
     * 管道路径
     */
    protected $path;
    /**
     * 是否阻塞,false为非阻塞,true为阻塞
     */
    protected $block = false;
    /**
     * 建立管道
     */
    public function __construct($path = './weicool.pipe', $cover = false, $mode = 0666)
    {
        if (file_exists($path)) {
            if ($cover) {
                unlink($path);
            } else {
                $this->path = $path;
                return $this;
            }
        }
        if (posix_mkfifo($path,$mode)) {
            $this->path = $path;
            return $this;
        } else {
            $this->throwException('create pipe failed');
        }
    }
    /**
     * 抛异常方法
     */
    public function throwException($msg = 'failed')
    {
        throw new \Exception($msg);
    }
    /**
     * 设置阻塞方式
     * 
     * @param   bool   $block false为非阻塞,true为阻塞
     */
    public function setBlock($block = false)
    {
        $this->block = $block;
    }
    /**
     * 指定pipe文件路径
     */
    public function setPath($path)
    {
        if (!file_exists($path)) {
            $msg = $path.' pipe  does not exists';
            $this->throwException($msg);
        }
        $this->path = $path;
    }
    /**
     * 获取pipe文件路径
     */
    public function getPath()
    {
        return $this->path;
    }
    /**
     * 打开一个管道
     *
     * @param   string   $mode  打开类型
     */
    public function pipeOpen($mode = 'r')
    {
        $handler = fopen($this->path, $mode);
        if (!is_resource($handler)) {
            $msg = 'open pipe '.$this->path.' falied';
            $this->throwException($msg);
        }
        // 设置阻塞类型
        stream_set_blocking($handler, $this->block);
        $this->handler = $handler;
        return $this;
    }
    /**
     * 已读的方式打开管道
     *
     * @return resource
     */
    public function readOpen()
    {
        return $this->pipeOpen('r');
    }
    /**
     * 已写的方式打开管道
     *
     * @return resource
     */
    public function writeOpen()
    {
        return $this->pipeOpen('w');
    }
    /**
     * 读取一行,或给定的长度
     */
    public function readOne($byte = 1024)
    {
        $data = fread($this->handler,$byte);
        return $data;
    }
    /**
     * 读取全部的内容
     */
    public function readAll()
    {
        $hd = $this->handler;
        $data = '';
        while (!feof($hd)) {
            $data .= fread($hd,1024);
        }
        return $data;
    }
    /**
     * 写入数据
     */
    public function write($data)
    {
        $hd = $this->handler;
        try {
            fwrite($hd,$data);
        } catch(\Exception $e) {
            $this->throwException($e->getMessage());
        }
        return $this;
    }
    /**
     * 关闭管道
     */
    public function close()
    {
        return fclose($this->handler);
    }
    /**
     * 删除管道
     */
    public function remove()
    {
        return unlink($this->path);
    }
    
}

多进程处理类,利用管道保存各个进程的返回结果,主进程处理最后的结果数组

<?php
require_once './fifoPipeClass.php';
class pipeMultiProcess
{
    protected $process = []; // 子进程
    protected $child   = []; // 子进程pid数组
    protected $result  = []; // 计算的结果
    public function __construct($process = [])
    {
        $this->process  = $process;
    }
    /**
     * 设置子进程
     */
    public function setProcess($process)
    {
        $this->process = $process;
    }
    /**
     * fork 子进程
     */
    public function forkProcess()
    {
        $process  = $this->process;
        foreach($process as $k => $item) {
            $pid = pcntl_fork();
            if ($pid ==  0) {
                $pipe = new fifoPipeClass();
                $id = getmypid();
                $pipe->writeOpen();
                $pipe->write($k.' pid:'.$id.PHP_EOL);
                $pipe->close();
                exit(0);
            } else if ($pid > 0) {
                $this->child[] = $pid;
            }
        }
        return $this;
    }
    /**
     * 等待子进程结束
     */
    public function waiteProcess()
    {
        $child = $this->child;
        $pipe  = new fifoPipeClass();
        $pipe->readOpen();
        echo 'get all begin'.PHP_EOL;
        while(count($child)) {
            foreach($child as $k => $pid){
                $res = pcntl_waitpid($pid,$status,WNOHANG);
                if ( -1 == $res || $res > 0 ) {
                    unset($child[$k]);
                }
            }
            $data = $pipe->readOne();
            if ($data) {
                $this->result[] = $data;
            }
        }
        $pipe->close();
        echo 'get all end'.PHP_EOL;
        $pipe->remove();
        return $this;
    }
    /**
     * 获取返回结果
     */
    public function getResult()
    {
        return $this->result;
    }
}
$obj = new pipeMultiProcess();
$obj->setProcess(['name'=>1,'age'=>2,'sex'=>3]);
$res = $obj->forkProcess()->waiteProcess()->getResult();
print_r($res);

运行结果以下:ui

Array
(
    [0] => age pid:7436

    [1] => sex pid:7437
name pid:7435

)
相关文章
相关标签/搜索