im-cloud分布式中间件分析(三)-job节点实现

github: http://github.com/brewlin/im-...

1.概述

job 节点 做为消费端,消费logic生产的数据,而后经过grpc推送至cloud节点,cloud点真正处理客户端数据,job节点默认多进程消费启动4个worker进程,以及默认10个grpc链接池
  • 数据流程图

图片描述

2.@Consumer 消费中心

默认启动4个worker进程消费logic请求,耗时处理投放至task进程处理,并转发至cloud节点

监听worker启动事件

须要在config/queue.php ,config/event.php 注册相应的事件和相关配置php

use App\Consumer\Consumer;
use Core\App;
use Core\Swoole\WorkerStartInterface;
use Swoole\Server as SwooleServer;

class WorkerStartListener implements WorkerStartInterface
{
    const INIT_LOGIC = 1;

    public function onWorkerStart(SwooleServer $server, int $workerId): void
    {
        if(App::isWorkerStatus()){
            //启动的n个 worker进程 分别做为消费者进程消费,每一个进程会直接阻塞直到消费到数据
            consumer()->consume(new Consumer());
        }
    }

}

消费主流程

  • 1.为每一个消费数据请求创建一个协程,处理相关数据
  • 2.将每一个数据投递至worker进程进行真正的grpc与cloud推送请求
Co::create(function()use($data){
    if(empty(CloudClient::$serviceList)){
        Log::error("cancle task deliver discovery cloud node is empty");
        return;
    }
    Task::deliver(Job::class,"push",[CloudClient::$serviceList,$data]);
},false);
return Result::ACK;
  • 3.经过以上作法能加快并发是消费速度,task进程也进行协程处理,增长并行处理能力,若是task进程阻塞也会形成task任务投递阻塞,因此在worker进程也须要加一个协程处理
相关文章
相关标签/搜索