普通的延迟队列不细说了,不管是设置统一的队列TTL,仍是设置消息的TTL,全都是利用DeadLetterQueue:消息失效后扔到死信队列,消费者从死信队列里读消息。但在消息失效的过程当中存在一个问题,好比以下场景:php
延迟队列中依次收到以下消息 Message A: TTL 2000 Message B: TTL 100 Message C: TTL 5000 当延迟队列中消息超时后,移至死信队列
实际执行状况是rabbitMQ从队列头取到消息A,等待2秒,超时,发至死信队列 -> 判断消息B,发现已经超时,当即发至死信队列。缘由是TTL被存在消息的内部,rabbitMQ一直去扫描每条消息的TTL,而是只判断队列头消息是否失效,因而消息B实际失效时间是2000ms。git
目前rabbitMQ是不支持任意超时时间的(听说rocketMQ提供有限支持,没用过),但能够经过安装一个插件来解决:github
https://github.com/rabbitmq/rabbitmq-delayed-message-exchangejson
安装过程很简单,下载下来,在rabbitMQ的sbin目录下执行安装bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
插件使用Exchange交换机来实现的TTL,而不是rabbitMQ那样经过死信队列:首先设置一个可延迟的Exchange,该Exchange会将收到的消息存放,按照延迟时间排序,直到达到延迟,才被转发到实际的执行队列。app
启用插件后,进入rabbitMQ的管理页面进行配置:post
新建一个Exchange,Type必须选择x-delayed-message,添加参数x-delayed-type=direct,而后绑定到执行队列测试
$msg = new AMQPMessage(json_encode([ 'event' => $eventName, 'params' => $params ]), [ 'delivery_mode' => 2, ]); $headers = new AMQPTable(['x-delay' => $delay]); $msg->set('application_headers', $headers); $this->channel->basic_publish($msg, self::EXCHANGE_DELAY_ANY_NAME, self::EXCHANGE_KEY_NOW);
接下来,为投递到该Exchange的消息添加插件所需的header:x-delay,值就是要延迟的时间,单位毫秒,若是不添加这个头,全部流入该交换机的消息都会当即被转发到执行队列。注意若是使用php-amqplib的话,须要用AMQPTable设置这个headerthis
接下来按照刚才的三条消息测试一下,发现B消息成功比A消息进入队列了。搞定spa
转载:http://www.xuyanzhe.cn/?p=115
具体实现参考连接: http://www.javashuo.com/article/p-fjwpmtdg-dw.html