分享一篇关于使用阿里云消息队列中遇到的坑

相信不少小伙伴都在开发中使用过消息队列,尤为是高并发的状况,通常能够在缓存中操做数据,而后经过消息异步处理业务逻辑,操做数据库等。php

本人所在的公司使用了阿里云的消息队列和RabbitMQ,听说使用阿里云消息队列的一部分缘由是RabbitMQ实现延迟消息比较复杂,要依赖死信...接下来进入主题,说说我是怎么使用消息和遇到的坑吧~redis

通常来说,咱们可使用一个脚原本接收阿里云消息处理业务逻辑,可是若是业务量特别大的话,咱们可能会遇到一个问题,就是脚本处理不过来,消息积攒的数量可能远远超出每秒可以处理的数量。针对这种现象,咱们能够启动多个处理脚原本同时处理消息,这样会明显加快处理消息的速度。mongodb

可是,多个脚本同时从一条消息队列里面取消息的时候,会不会同时取到同一条消息,而后形成消息重复处理的现象呢?我以为确定是会的,消息是第三方服务,咱们没法保证他的100%稳定,因此咱们须要在处理的时候下点功夫了。数据库

咱们发送消息的数据体是json,通常咱们会在每条消息里面加一个taskid,以时间戳(精确到毫秒级) + 随机数组成,这个taskid足够长,咱们得以保证他不会重复(重复的可能性极小,相似于mongodb的主键也是这个原理)。json

接下来看一段代码:数组

<?php缓存

try {
    //伪代码
    $getData    = $mq->receive();
    $getData    = \Zend\Json\Json::decode($getData, true);  //若是不是json数据 咱们能够捕获到异常

    //先检测数据
    $errorMsg   = [];
    if (!isset($getData['taskid']) || $getData['taskid'] == '') {
        $errorMsg[]     = 'taskid不能为空';
    }

    if (!isset($getData['order_id']) || $getData['order_id'] == '') {
        $errorMsg[]     = '订单号不能为空';
    }

    if (!cache()->setex($this->cachePrefix . $getData['taskid'], 1)) {
        $errorMsg[]     = '该任务已被处理';
    }

    if (count($errorMsg) >= 1) {
        //记日志  方便排错
        $this->log('xxxx处理脚本错误 哪一个文件 错误级别 错误缘由' . implode('|', $errorMsg));
        return false;
    }

    //必须有过时时间  否则会把redis撑爆
    cache()->expire($this->cachePrefix . $getData['taskid'], 7 * 86400);

    /**
     * 处理业务逻辑
     */
    
    //业务逻辑处理正常,删除redis锁,删除消息
    cache()->del($this->cachePrefix . $getData['taskid']);
    $mq->ack();
    return true;
}

//捕获到了异常
catch (\Exception $e) {
     //必定要把此次消息删掉 否则会重复进来 日志错误级别记高一点 手动处理问题
    $mq->ack();
    //记日志
    $this->log();
}

小伙伴们知道这段代码哪里有问题吗?并发

相关文章
相关标签/搜索