在以前咱们将交换器的类型从fanout设置为direct后可以根据咱们的选择得到响应的消息,虽然改良咱们的消息日志系统,可是还有不少局限性,好比它不能基于多个标准进行路由ui
在咱们的日志系统中咱们可能不单单是依据消息的严重性进行订阅,还有可能同时基于消息的危险等级和消息来源,好比咱们监听来自cron的危险错误和来自kern的全部日志。经过topic咱们能够来实现以上功能日志
消息若是发送到主题交换器的话不能使用任何的routing_key,它必须是由点分隔的单词列表。单词能够是任意的,但一般它们是与消息相关的一些特性code
binding key必须是具备相同格式,topic交换器背后的逻辑跟direct交换器的逻辑相似,一个指定了routing key的消息将会被投递到全部使用binding key并与routing key 相匹配的队列中。binding key有两种特殊状况:blog
*
能够表明代替一个单词#
能够代替0个或多个单词在本例子中,咱们将发送全部描述动物的消息,这些消息将使用由三个单词(两个点)组成的routing_key发送。routing_key第一个单词描述速度,第二个表示颜色,第三个表示物种rabbitmq
队列Q1使用binding_key:*.orange.*
,队列Q2使用binding_key*.*.rabbit
和lazy.#
。总结以下:队列
quick.orange.rabbit
,lazy.orange.elephant
,quick.orange.fox
,quick.orange.rabbit
,lazy.orange.elephant
,lazy.brown.fox
,lazy.pink.rabbit
quick.brown.fox
跟以上两个队列的routing_key都不匹配因此该消息会被丢弃lazy.orange.male.rabbit
则只能匹配队列Q2topic 交换器很是灵活而且能够表现为其余交换器 好比设置队列的binding_key为#,则队列会接收全部的消息无论routing_key是什么 当binding_key不使用特殊字段`*`和`#`的时候,此时topic交换器跟direct交换器同样
func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs_topic", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "anonymous.info" } else { s = os.Args[1] } return s }
func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [binding_key]...", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s) err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_topic", // exchange false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
go run receiveLogsTopic.go "#"
go run receiveLogsTopic.go "kern.*"
go run receiveLogsTopic.go "*.critical"
go run receiveLogsTopic.go "kern.*" "*.critical"
go run receiveLogsTopic.go "kern.critical" "A critical kernel error"