rabbitmq 实现延迟队列的两种方式

ps: 文章里面延迟队列=延时队列html

什么是延迟队列

延迟队列存储的对象确定是对应的延时消息,所谓”延时消息”是指当消息被发送之后,并不想让消费者当即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。java

场景一:在订单系统中,一个用户下单以后一般有30分钟的时间进行支付,若是30分钟以内没有支付成功,那么这个订单将进行一场处理。这是就可使用延时队列将订单信息发送到延时队列。node

场景二:用户但愿经过手机远程遥控家里的智能设备在指定的时间进行工做。这时候就能够将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备。git

RabbitMQ如何实现迟队列

方法一

AMQP协议和RabbitMQ队列自己没有直接支持延迟队列功能,可是能够经过如下特性模拟出延迟队列的功能。 
可是咱们能够经过RabbitMQ的两个特性来曲线实现延迟队列:github

RabbitMQ能够针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,若是超时(二者同时设置以最早到期的时间为准),则消息变为dead letter(死信)web

RabbitMQ针对队列中的消息过时时间有两种方法能够设置。app

  • A: 经过队列属性设置,队列中全部消息都有相同的过时时间。
  • B: 对消息进行单独设置,每条消息TTL能够不一样。

若是同时使用,则消息的过时时间以二者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letteride

RabbitMQ的Queue能够配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,若是队列内出现了dead letter,则按照这两个参数从新路由转发到指定的队列。ui

  • x-dead-letter-exchange:出现dead letter以后将dead letter从新发送到指定exchange
  • x-dead-letter-routing-key:出现dead letter以后将dead letter从新按照指定的routing-key发送

队列出现dead letter的状况有:this

  • 消息或者队列的TTL过时

  • 队列达到最大长度

  • 消息被消费端拒绝(basic.reject or basic.nack)而且requeue=false

综合上述两个特性,设置了TTL规则以后当消息在一个队列中变成死信时,利用DLX特性它能被从新转发到另外一个Exchange或者Routing Key,这时候消息就能够从新被消费了。

设置方法:

第一步:设置TTL产生死信,有两种方式Per-Message TTL和 Queue TTL,第一种能够针对每一条消息设置一个过时时间使用于大多数场景,第二种针对队列设置过时时间、适用于一次性延时任务的场景

还有其余产生死信的方式好比消费者拒绝消费 basic.reject 或者 basic.nack ( 前提要设置消费者的属性requeue=false) 
- Per-Message TTL (对每一条消息设置一个过时时间)(官方文档

java client发送一条只能驻留60秒的消息到队列:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");//设置消息的过时时间为60秒
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
//这条消息发送到相应的队列以后,若是60秒内没有被消费,则变为死信
  • 1
  • 2
  • 3
  • 4
  • 5
  • Queue TTL (对整个队列设置一个过时时间)

建立一个队列,队列的消息过时时间为30分钟(这个队列30分钟内没有消费者消费消息则删除,删除后队列内的消息变为死信)

java client方式:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

rabbitmqctl命令方式(.* 为全部队列, 能够替换为指定队列):
rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues

rabbitmqctl (Windows):
rabbitmqctl set_policy expiry ".*" "{""expires"":1800000}" --apply-to queues
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

第二步:设置死信的转发规则(若是没有任何规则,则直接丢弃死信) 
- Dead Letter Exchanges设置方法(官方文档

Java Client方式:
//声明一个直连模式的exchange
channel.exchangeDeclare("some.exchange.name", "direct");
//声明一个队列,当myqueue队列中有死信产生时,会转发到交换器some.exchange.name
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");

//若是设置死信会以路由键some-routing-key转发到some.exchange.name,若是没设默认为消息发送到本队列时用的routing key
//args.put("x-dead-letter-routing-key", "some-routing-key");
channel.queueDeclare("myqueue", false, false, false, args);

命令行方式(.* 为全部队列, 能够替换为指定队列):
设置 "dead-letter-exchange"
rabbitmqctl:
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
rabbitmqctl (Windows):
rabbitmqctl set_policy DLX ".*" "{""dead-letter-exchange"":""my-dlx""}" --apply-to queues
设置 "dead-letter-routing-key"
rabbitmqctl:
rabbitmqctl set_policy DLX ".*" '{ "dead-letter-routing-key":"my-routing-key"}' --apply-to queues
rabbitmqctl (Windows):
rabbitmqctl set_policy DLX ".*" "{""dead-letter-routing-key"":""my-routing-key""}" --apply-to queues
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

方法二

在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。

插件源码地址: 
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下载地址: 
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

安装:

进入插件安装目录 
{rabbitmq-server}/plugins/(能够查看一下当前已存在的插件) 
下载插件 
rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
  • 1

(若是下载的文件名称不规则就手动重命名一下如: 
rabbitmq_delayed_message_exchange-0.0.1.ez)

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

(关闭插件)
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
  • 1
  • 2
  • 3
  • 4

插件使用

经过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性 
x-delayed-message是插件提供的类型,并非rabbitmq自己的

// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...
  • 1
  • 2
  • 3
  • 4
  • 5

发送消息的时候经过在header添加”x-delay”参数来控制消息的延时时间

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
// ... more code ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

使用示例:

消息发送端:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    // 队列名称
    private final static String EXCHANGE_NAME="delay_exchange";
    private final static String ROUTING_KEY="key_delay";

    @SuppressWarnings("deprecation")
    public static void main(String[] argv) throws Exception {
        /**
         * 建立链接链接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.12.190");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        // 声明x-delayed-type类型的exchange
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true,
                false, args);


        Map<String, Object> headers = new HashMap<String, Object>();
        //设置在2016/11/04,16:45:12向消费端推送本条消息
        Date now = new Date();
        Date timeToPublish = new Date("2016/11/04,16:45:12");

        String readyToPushContent = "publish at " + sf.format(now)
                + " \t deliver at " + sf.format(timeToPublish);

        headers.put("x-delay", timeToPublish.getTime() - now.getTime());

        AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()
                .headers(headers);
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props.build(),
                readyToPushContent.getBytes());

        // 关闭频道和链接
        channel.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

消息接收端:

import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    // 队列名称
    private final static String QUEUE_NAME = "delay_queue";
    private final static String EXCHANGE_NAME="delay_exchange";

    public static void main(String[] argv) throws Exception,
            java.lang.InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.12.190");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.queueDeclare(QUEUE_NAME, true,false,false,null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
        SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        try {
            System.out.println("****************WAIT***************");
            while(true){
                QueueingConsumer.Delivery delivery = queueingConsumer
                        .nextDelivery(); //

                String message = (new String(delivery.getBody()));
                System.out.println("message:"+message);
                System.out.println("now:\t"+sf.format(new Date()));
            }

        } catch (Exception exception) {
            exception.printStackTrace();

        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

启动接收端,启动发送端 
运行结果:

****************WAIT***************
message:publish at 2016-11-04 16:44:16.887   deliver at 2016-11-04 16:45:12.000
now:    2016-11-04 16:45:12.023
  • 1
  • 2
  • 3

结果显示在咱们2016-11-04 16:45:12.023接收到了消息,距离咱们设定的时间2016-11-04 16:45:12.023有23毫秒的延迟

Note:使用rabbitmq-delayed-message-exchange插件时发送到队列的消息数量在web管理界面可能不可见,不影响正常功能使用

Note :使用过程当中发现,当一台启用了rabbitmq-delayed-message-exchange插件的RAM节点在重启的时候会没法启动,查看日志发现了一个Timeout异常,开发者解释说这是节点在启动过程会同步集群相关数据形成启动超时,并建议不要使用Ram节点

插件开发者: 
RAM nodes start blank and need a disk node to sync tables from. In this case it times out.

More importantly, you don’t need RAM nodes. If you’re not sure if you do, you certainly don’t, as don’t 99% of users.

相关文章
相关标签/搜索