本文翻译自RabbitMQ官网的Go语言客户端系列教程,本文首发于个人我的博客:liwenzhou.com,教程共分为六篇,本文是第五篇——Topic。html
这些教程涵盖了使用RabbitMQ建立消息传递应用程序的基础知识。
你须要安装RabbitMQ服务器才能完成这些教程,请参阅安装指南或使用Docker镜像。
这些教程的代码是开源的,官方网站也是如此。git
本教程假设RabbitMQ已安装并运行在本机上的标准端口(5672)。若是你使用不一样的主机、端口或凭据,则须要调整链接设置。github
发送到topic
交换器的消息不能具备随意的routing_key
——它必须是单词列表,以点分隔。这些词能够是任何东西,但一般它们指定与消息相关的某些功能。一些有效的routing_key
示例:“stock.usd.nyse
”,“nyse.vmw
”,“quick.orange.rabbit
”。routing_key
中能够包含任意多个单词,最多255个字节。web
绑定键也必须采用相同的形式。topic
交换器背后的逻辑相似于direct
交换器——用特定路由键发送的消息将传递到全部匹配绑定键绑定的队列。可是,绑定键有两个重要的特殊状况:docker
*
(星号)能够代替一个单词。#
(井号)能够替代零个或多个单词。经过下面这个示例能够很容易看明白这一点:bash
在这个例子中,咱们将发送一些都是描述动物的信息。将使用包含三个词(两个点)的路由密钥发送消息。路由键中的第一个单词将描述速度,第二个是颜色,第三个是种类:“<speed>.<colour>.<species>
”。服务器
咱们建立了三个绑定关系:Q1与绑定键“*.orange.*
”绑定,Q2与“*.*.rabbit
”和“lazy.#
”绑定。并发
这些绑定能够总结为:post
路由键设置为“quick.orange.rabbit
”的消息将传递到两个队列。消息“lazy.orange.elephant
”也将发送给他们两个。另外一方面,“quick.orange.fox
”将仅进入第一个队列,而“lazy.brown.fox
”将仅进入第二个队列。即便“lazy.pink.rabbit
”与两个绑定匹配(匹配Q2的两个绑定),也只会传递到第二个队列一次。 “quick.brown.fox
”与任何绑定都不匹配,所以将被丢弃。网站
若是咱们打破约定并发送一个或四个单词的消息,例如“orange
”或“quick.orange.male.rabbit
”,会发生什么?好吧,这些消息将不匹配任何绑定,而且将会丢失。
另外,“lazy.orange.male.rabbit
”即便有四个单词,也将匹配最后一个绑定,并将其传送到第二个队列。
topic交换器
topic交换器功能强大,能够像其余交换器同样运行。
当队列用“
#
”(井号)绑定键绑定时,它将接收全部消息,而与路由键无关,就像在fanout
交换器中同样。当在绑定中不使用特殊字符“
*
”(星号)和“#
”(井号)时,topic交换器的行为就像direct
交换器同样。
咱们将在日志记录系统中使用topic
交换器。咱们将从一个可行的假设开始,即日志的路由键将包含两个词:“<facility>.<severity>
”。
该代码与上一教程中的代码几乎相同。
emit_log_topic.go
的代码:
package main import ( "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs_topic", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "anonymous.info" } else { s = os.Args[1] } return s }
receive_logs_topic.go
的代码:
package main import ( "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [binding_key]...", os.Args[0]) os.Exit(0) } // 绑定topic for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s) err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_topic", // exchange false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
想要接收全部的日志:
go run receive_logs_topic.go "#"
要从“kern
”接收全部日志:
go run receive_logs_topic.go "kern.*"
或者,若是你只想接收“critical
”日志:
go run receive_logs_topic.go "*.critical"
你能够建立多个绑定:
go run receive_logs_topic.go "kern.*" "*.critical"
并发出带有路由键“kern.critical
”的日志:
go run emit_log_topic.go "kern.critical" "A critical kernel error"
你能够本身尝试玩一下这个程序。请注意,代码没有对路由键或绑定键进行任何假设,你可能但愿使用两个以上的路由键参数。
(关于emit_log_topic.go和receive_logs_topic.go的完整源代码)
接下来,咱们将在教程6中了解如何将往返消息用做远程过程调用。