延时队列的做用再也不累述
本文使用rabbitmq的queue能够设置ttl时间,将到期的message设为死信,message会被push到delay_queue,消费delay_queue便可实现延时队列功能
先假设这样一个场景:
小明在外卖平台下个一个订单,若是超过10分钟未支付,则系统自动取消订单,并推送给用户“订单已取消”信息。php
开发思路:
下订单时就将订单orderId push到订单队列order_queue,并设置次条message的有效期为10分钟,当10分钟后此条message到期,会将此条message转化为死信push到exchange,将exchange和queue进行绑定,开一个/多个消费者消费queue,并判断queue中message订单是否已支付,若未支付则推送通知,取消订单。git
流程图,未考虑消息消费失败的状况github
对RabbitMQ进行简单的封装json
<?php namespace RabbitMQ; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class RabbitMQ { private $host = '127.0.0.1'; private $port = 5672; private $user = 'guest'; private $password = 'guest'; protected $connection; protected $channel; /** * RabbitMQ constructor. */ public function __construct() { $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password); $this->channel = $this->connection->channel(); } /** * 生成信息 * @param $message */ public function sendMessage($message, $routeKey, $exchange = '', $properties = []) { $data = new AMQPMessage( $message, $properties ); $this->channel->basic_publish($data, $exchange, $routeKey); } /** * 消费消息 * @param $queueName * @param $callback * @throws \ErrorException */ public function consumeMessage($queueName,$callback) { $this->channel->basic_consume($queueName, '', false, false, false, false, $callback); while ($this->channel->is_consuming()) { $this->channel->wait(); } } /** * @throws \Exception */ public function __destruct() { $this->channel->close(); $this->connection->close(); } }
建立延时队列ui
<?php namespace RabbitMQ; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; /** * 使用RabbitMQ实现延时队列功能 * Class DelayQueue * @package RabbitMQ */ class DelayQueue extends RabbitMQ { /** * 建立延时队列 * @param $ttl * @param $delayExName * @param $delayQueueName * @param $queueName */ public function createQueue($ttl, $delayExName, $delayQueueName, $queueName) { $args = new AMQPTable([ 'x-dead-letter-exchange' => $delayExName, 'x-message-ttl' => $ttl, //消息存活时间 'x-dead-letter-routing-key' => $queueName ]); $this->channel->queue_declare($queueName, false, true, false, false, false, $args); //绑定死信queue $this->channel->exchange_declare($delayExName, AMQPExchangeType::DIRECT, false, true, false); $this->channel->queue_declare($delayQueueName, false, true, false, false); $this->channel->queue_bind($delayQueueName, $delayExName, $queueName, false); } }
生产者,代码很简单,看看运行以后的效果,订单的message愈来愈多this
<?php require_once '../vendor/autoload.php'; // 生产者 $delay = new \RabbitMQ\DelayQueue(); $ttl = 1000 * 100;//订单100s后超时 $delayExName = 'delay-order-exchange';//超时exchange $delayQueueName = 'delay-order-queue';//超时queue $queueName = 'ttl-order-queue';//订单queue $delay->createQueue($ttl, $delayExName, $delayQueueName, $queueName); //100个订单信息,每一个订单超时时间都是10s for ($i = 0; $i < 100; $i++) { $data = [ 'order_id' => $i + 1, 'remark' => 'this is a order test' ]; $delay->sendMessage(json_encode($data), $queueName); sleep(1); }
消费者,看看消费以后的,过一会会观察到,已经有到期message被push到了delay_order_queue
消费者也消费到了messagespa
<?php require_once '../vendor/autoload.php'; // 消费者 $delay = new \RabbitMQ\DelayQueue(); $delayQueueName = 'delay-order-queue'; $callback = function ($msg) { echo $msg->body . PHP_EOL; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //处理订单超时逻辑,给用户推送提醒等等。。。 sleep(10); }; /** * 消费已经超时的订单信息,进行处理 */ $delay->consumeMessage($delayQueueName, $callback);
代码见:https://github.com/jiaoyang3/...code