在以前的教程中,咱们建立了一个简单的日志系统。咱们可以向许多交换器转发日志消息。html
在本教程中,咱们将添加一个功能——咱们让它仅仅接收咱们感兴趣的日志类别。举例:咱们 实现仅将严重级别的错误日志写入磁盘(为了节省磁盘空间),其他日志级别的日志直接打印到控制台。git
绑定github
以前的章节中咱们已经建立过绑定,你可能还会记得:算法
err = ch.QueueBind( q.Name, // queue name "", // routing key "logs", // exchange false, nil)
绑定是用来维系交换器和队列关系的,这能够被简单地理解为:队列仅仅对从交换器中传的消息感兴趣。3d
绑定有个额外参数叫作routing_key,为了不与Channel.Publish方法中的参数相混淆,咱们称之为binding key(绑定键)。使用绑定键建立绑定以下:日志
err = ch.QueueBind( q.Name, // queue name "black", // routing key "logs", // exchange false, nil)
绑定键的含义取决于交换器的类型。咱们以前使用的fanout类型的交换器,就会直接忽略这个参数。htm
Direct型交换器blog
咱们以前的教程中的日志系统是广播全部的消息到全部消费者。咱们但愿以此拓展来实现根据消息严重性来过滤消息。好比咱们但愿 写日志到硬盘的代码仅仅接收严重级别的,不要浪费磁盘存储在warning或者info级别的日志。教程
以前使用的是fanout类型交换器,没有更好的拓展性或者说灵活性——它只能盲目的广播。rabbitmq
如今 使用direct型交换器替代。Direct型的路由算法 比较简单——消息会被派发到某个队列,该队列的绑定键刚好和消息的路由键一致。
为了阐述,考虑以下设置:
该设置中,能够看到direct型的交换器X被绑定到了两个队列:Q一、Q2。Q1使用绑定键orange绑定,Q2包含两个绑定键:black和green。
基于如上设置的话,使用路由键orange发布的消息会被路由到Q1队列,而使用black或者green路由键的消息均会被路由到Q2,全部其他消息将被丢弃。
备注:这里的交换器X和队列的绑定是多对多的关系,也就是说一个交换器能够到绑定多个队列,一个队列也能够被多个交换器绑定,消息只会被路由一次,不能由于两个绑定键都匹配上了路由键消息就会被路由两次,这种是不存在的。
多个绑定
用相同的绑定键去绑定多个队列是彻底合法的,咱们能够再添加一个black绑定键来绑定X和Q1,这样Q1和Q2都使用black绑定到了交换器X,这其实和fanout类型的交换器直接绑定到队列Q一、Q2功能相同:使用black路由键的消息会被直接路由到Q1和Q2。
发送日志
咱们将使用该模型来构建日志系统。使用direct型的交换器替换fanout型的,咱们将日志的严重级别做为路由键,这样的话接收端程序能够选择日志接收级别进行接收,首先聚焦下日志发送端:
首先建立一个交换器:
err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments )
而后是发送消息:
err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs_direct", // exchange severityFrom(os.Args), // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
为了简单起见,咱们假设日志严重级别以下:'info', 'warning', 'error'。
订阅
接收还和以前章节接收同样,只有一个例外:咱们将为每个感兴趣的严重级别建立一个绑定:
q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [info] [warning] [error]", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", // exchange false, nil) failOnError(err, "Failed to bind a queue") }
糅合在一块儿
发送端:
// rabbitmq_4_emit_log_direct.go project main.go package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //连接队列服务 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() //声明一个channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() //声明一个direct类型交换器 err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) ch.Publish("logs_direct", severityFrom(os.Args), false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } //接收消息发送内容 func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } //接收日志级别,做为路由键使用 func severityFrom(args []string) string { var s string if len(args) < 2 || args[1] == "" { s = "info" } else { s = args[1] } return s }
接收端:
// rabbitmq_4_receive_logs_direct.go project main.go package main import ( "fmt" "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //连接队列服务 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() //声明一个channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() //声明一个direct类型交换器 err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil) failOnError(err, "Failed to declare an exchange") //声明一个队列 q, err := ch.QueueDeclare("", false, false, true, false, nil) failOnError(err, "Failed to declare a queue") //判断cmd窗口接收参数是否足够 if len(os.Args) < 2 { log.Printf("Usage:%s [info] [warning] [error]", os.Args[0]) os.Exit(0) } //cmd窗口输入的多个日志级别,分别循环处理—进行绑定 for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) ch.QueueBind(q.Name, s, "logs_direct", false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
若是您只想保存“警告”和“错误”(而不是“信息”)日志消息到文件,只须要打开一个控制台而后输入:
go run receive_logs_direct.go warning error > logs_from_rabbit.log
若是你想看到全部的日志消息在你的屏幕上,打开一个新的终端,输入:
go run receive_logs_direct.go info warning error
发出一个错误日志消息类型以下:
go run emit_log_direct.go error "Run. Run. Or it will explode."
能够观察到:
消息能够进行分类接收了, 只有error级别的消息才会被存入log日志文件,而info、warning级别的都不存入。
实际效果以下: