本节内容咱们将对发布订阅增长一个特性:订阅子集。好比咱们将一些危险的错误消息保存进硬盘中,同时在控制台仍然可以读取全部的消息git
上一节内容咱们将队列跟交换器进行binging:github
err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil)
一个binging是将交换器跟队列进行关联,能够简单理解为,绑定好的队列只会接收来自这个交换器的消息日志
Bindings可以携带一个额外的参数:routing_key,为了不跟Channel.Publish中的参数混淆咱们称这个routing_key为binding key。因此咱们能够建立一个binding:code
err = ch.QueueBind( q.Name, // queue name "black", // routing key "logs", // exchange false, nil)
binding key的含义取决交换类型,像以前咱们使用的fanout类型就彻底忽略了binding key的价值blog
上一节中咱们的发布订阅模式是将全部的消息广播给消费者,接下来咱们进行扩展:容许根据消息的严重性过滤消息。举个例子,对于严重性错误的消息咱们直接写硬盘,对于通常的提醒消息或日志则不须要浪费硬盘空间rabbitmq
咱们以前使用的fanout交换类型则不具有这些灵活操做,它只可以将消息不加过滤的进行广播。想要达到上面的灵活性咱们使用direct交换模式来替代fanout,它可以将消息传递到binding key跟routing key彻底匹配的队列队列
如上图所示,direct类型的交换器绑定了两个队列,第一个队列使用了绑定键是orange,第二个队列使用了两个绑定键,一个是black,另一个是green路由
在这样的设置中,使用路由键orange发布到交换器的消息将被路由到队列Q1,使用路由键black或者green则会被路由到队列Q2,其余的消息则会被丢弃string
咱们也可使用同一个绑定键来绑定交换器跟不一样队列,在这种状况下,direct模式跟fanout有点类似,使用路由键black发布到交换器的消息将被路由到队列Q1跟Q2it
接下来咱们来实现如下配置的消息系统:
完整的代码以下所示:
emit_log_direct脚本
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs_direct", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "info" } else { s = os.Args[1] } return s }
receive_logs_direct脚本
package main import ( "fmt" "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [info] [warning] [error]", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", // exchange false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
使用下面命令运行:
go run receive_logs_direct.go info warning error go run receive_logs_direct.go warning error go run emit_log_direct.go error "Run. Run. Or it will explode."