MySql实现事务型消息队列以及php多进程消费设计

因公司业务须要,最近在设计一个通用队列功能模块,主体要求两大点:php

  • 用MySql实现事务型消息队列(固然,主流的队列服务可以使用redis或者rabbitmq等,此处讨论的是mysql实现)
  • php多进程消费队列消息

用MySql实现事务型消息队列

消息队列的做用有:异步化、解耦和消除峰值等。目前异步化对于我来讲使用最频繁,在不少业务场景下,咱们能够将实时性要求较低的请求转为异步处理,减少系统负载压力,提升系统稳定性。在离线数据异步处理过程当中,消息队列要知足如下要求:mysql

  • 消息不能丢失,即便在系统失败的状况下。消息一旦被插入就必定会被至少处理一次(只被处理一次是最好的,可是实现起来有难度,因此只要求at-least-once semantic);
  • FIFO顺序。(mysql id自增可知足此特性。固然,能够设计特殊参数作特殊处理)
  • 支持多生产者(mysql支持并发操做,支持此特色)
  • 支持多消费者。每一个消息只能被其中一个消费者处理(业务的处理须要考虑幂等性)。

以上是队列实现的说明,具体用MySql实现事务型消息队列能够参考文章
https://spockwangs.github.io/...git

这次设计的表结构以下:github

CREATE TABLE `comom_queue` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '队列类型,代码业务备注',
  `conn_id` int(11) NOT NULL DEFAULT '0' COMMENT '消费者标识',
  `param_content` text COMMENT '队列入参',
  `callback` varchar(255) NOT NULL DEFAULT '' COMMENT '队列消费回调函数',
  `status` tinyint(2) NOT NULL DEFAULT '0' COMMENT '0新建 1消费中 2成功 3失败 4需重试',
  `create_time` int(11) NOT NULL DEFAULT '0' COMMENT '建立时间',
  `update_time` int(11) NOT NULL DEFAULT '0' COMMENT '状态变动时间',
  `preexec_time` int(11) NOT NULL DEFAULT '0' COMMENT '预消费时间',
  `p_key` varchar(100) NOT NULL DEFAULT '' COMMENT '业务惟一标识key,查询用',
  `mark` varchar(255) NOT NULL DEFAULT '' COMMENT '备注',
  PRIMARY KEY (`id`),
  KEY `indx_s` (`p_key`,`type`) USING BTREE,
  KEY `indx_exec` (`conn_id`,`status`) USING BTREE,
  KEY `indx_ty` (`type`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

说明下几个字段的设计:redis

  • callback 队列中不一样的业务消息有不一样的业务处理,利用callback值回调对应的业务方法
  • type 队列业务类型,区分不一样的业务,可用不一样的消费者分开消费。在FIFO的特色外,可单独开消费者对有特殊要求(消息优先级高)的业务消息进行消费
  • preexec_time 预消费时间,有的业务消息有消费时间要求,可设置出队列时间

php多进程消费设计

这次php多进程的实现依赖pcntl,posix扩展,读者可自行检查是否安装了此拓展。queue队列服务设计和实现包括如下功能点:sql

  • 主进程和子进程的运行时间可配
  • 主进程(master进程)建立和监听子进程行为
  • 建立定时器信号,主进程(master进程)定时监听队列信息,可用于消息堆积通知等
  • 子进程(worker进程)消费消息
  • 针对不一样的业务消息可配置不一样数量的子进程
  • 各个业务子进程数可配置正常拉起数和最大进程数,根据队列积压状况,子进程动态启动进程数(暂未实现,后续添加)

很少说了,直接看代码,抽离出来的queue服务类代码以下:数组

<?php

/**
 * Created by PhpStorm.
 * User: Javion
 * Date: 2018/12/7
 * Time: 15:10
 */
abstract class queue
{
    protected $process = []; // 子进程数组  ['type' => 'process_num']
    protected $child   = []; // 子进程pid数组
    protected $result  = []; // 计算的结果
    protected $overTime = 0; //主进程超时时间
    protected $startTime; //主进程运行时间
    protected $childOverTime = 3600; //子进程超时时间
    protected $alarm_time = 2;
    public function __construct($process = [], $overTime = 0, $childOverTime = 3600)
    {
        if (!function_exists('pcntl_fork')) {
            die("pcntl_fork not existing");
        }
        $this->process  = $process;
        $this->overTime = $overTime;
        $this->childOverTime = $childOverTime;
        $this->startTime = time();
    }
    /**
     * 设置子进程
     */
    public function setProcess($process)
    {
        $this->process = $process;
    }

    /**
     * 设置检测时间间隔 单位s
     */
    public function setAlarmTime($time){
        $this->alarm_time = $time;
    }

    /**
     * fork 子进程
     */
    protected function forkProcess()
    {
        //循环建立每一个type 的消费子进程
        $process  = $this->process;
        foreach($process as $key => $num) {
            for ($i = 0; $i < $num; $i++){
                $this->forkOneProcess($key);
            }
        }
        return $this;
    }

    /**
     * 建立子进程操做
     * @param $key
     * @return $this
     */
    private function forkOneProcess($key)
    {
        $pid = pcntl_fork();
        if ($pid == 0) {
            $id = getmypid();
            $this->processDo($id, $key);
            exit(0);
        } else if ($pid > 0) {
            //记录子进程信息
            $childProcess = array(
                'pid' => $pid,
                'type' => $key,
                'create_time' => time()
            );
            $this->child[$pid] = $childProcess;
        }
        return $this;
    }

    /**
     * 子进程作的事情,消费者
     */
    abstract protected function processDo($id, $key);

    /**
     * 队列数量检测
     */
    abstract protected function checkQueueNum();

    /**
     * 等待子进程结束
     */
    protected function waiteProcess()
    {
        while(count($this->child)) {
            foreach($this->child as $pid => $item){
                $res = pcntl_waitpid($pid,$status,WNOHANG);
                pcntl_signal_dispatch();
                if ( -1 == $res || $res > 0 ) {
                    unset($this->child[$pid]);
                    echo "pid $pid 退出", PHP_EOL;
                    //判断主进程是否超时 未超时拉起新的子进程
                    $leftTime = time() - $this->startTime;
                    if ($this->overTime > $leftTime){
                        $this->forkOneProcess($item['type']);
                        echo "建立新进程", PHP_EOL;
                    }
                }//判断子进程是否存在且超时,超过期限20分钟则强制退出
                elseif (posix_kill($pid, 0) && (time() - $item['create_time'] - 20*60) > $this->childOverTime){
                    posix_kill($pid, SIGUSR1);
                    echo "pid $pid 退出2", PHP_EOL;
                }
            }
        }

        return $this;
    }

    /**
     * 队列检测
     */
    protected function timeHandler(){
        $this->checkQueueNum();
        pcntl_alarm($this->alarm_time);
    }

    /**
     * 启动
     */
    public function runProcess() {
        //注册信号
        pcntl_signal(SIGALRM, array($this, 'timeHandler'));
        pcntl_alarm($this->alarm_time);
        $leftTime = time() - $this->startTime;
        while(($this->overTime ==0 || $this->overTime > $leftTime)){
            echo "新进程processlist", PHP_EOL;
            $this->forkProcess()->waiteProcess();
            $leftTime = time() - $this->startTime;
        }
    }
}

最后一个功能点:各个业务子进程数可配置正常拉起数和最大进程数,根据队列积压状况,子进程动态启动进程数 暂未实现。目前的queue服务设计如上,请各位看官多多指教!并发

相关文章
相关标签/搜索