虽然 rabbitmq 没有延时队列的功能,可是稍微变更一下也是能够实现的git
基于第一点,我利用的是消息存在过时时间这一特性, 消息一旦过时就会变成dead letter
,能够让单独的消息过时,也能够设置整个队列消息的过时时间
而rabbitmq
会有限取两个值的最小值github
基于第二点,是用到了rabbitmq
的过时消息处理机制:
. x-dead-letter-exchange
将过时的消息发送到指定的 exchange
中
. x-dead-letter-routing-key
将过时的消息发送到自定的 route
当中golang
在这里例子当中,我使用的是 过时消息+转发指定exchangebash
首先是消费者comsumer.go
工具
package main import ( "log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { // 创建连接 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明一个主要使用的 exchange err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 声明一个常规的队列, 其实这个也不必声明,由于 exchange 会默认绑定一个队列 q, err := ch.QueueDeclare( "test_logs", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") /** * 注意,这里是重点!!!!! * 声明一个延时队列, ß咱们的延时消息就是要发送到这里 */ _, errDelay := ch.QueueDeclare( "test_delay", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait amqp.Table{ // 当消息过时时把消息发送到 logs 这个 exchange "x-dead-letter-exchange":"logs", }, // arguments ) failOnError(errDelay, "Failed to declare a delay_queue") err = ch.QueueBind( q.Name, // queue name, 这里指的是 test_logs "", // routing key "logs", // exchange false, nil) failOnError(err, "Failed to bind a queue") // 这里监听的是 test_logs msgs, err := ch.Consume( q.Name, // queue name, 这里指的是 test_logs "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
而后是生产者productor.go
code
package main import ( "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() body := bodyFrom(os.Args) // 将消息发送到延时队列上 err = ch.Publish( "", // exchange 这里为空则不选择 exchange "test_delay", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), Expiration: "5000", // 设置五秒的过时时间 }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }
运行一下:blog
go run comsumer.go go run productor.go
具体看代码和注释就行, 这里的关键点就是将要延时的消息发送到过时队列当中, 而后监听的是过时队列转发到的 exchange 下的队列
正常状况就是始终监听一个队列,而后把过时消息发送到延时队列中,当消息到达时间后就把消息发到正在监听的队列rabbitmq